##// END OF EJS Templates
lfs: default the User-Agent header for blob transfers to 'git-lfs'...
Matt Harbison -
r35751:3d48ae1a default
parent child Browse files
Show More
@@ -1,461 +1,461 b''
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import json
12 12 import os
13 13 import re
14 14 import socket
15 15
16 16 from mercurial.i18n import _
17 17
18 18 from mercurial import (
19 19 error,
20 20 pathutil,
21 21 url as urlmod,
22 22 util,
23 23 vfs as vfsmod,
24 24 worker,
25 25 )
26 26
27 27 from ..largefiles import lfutil
28 28
29 29 # 64 bytes for SHA256
30 30 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
31 31
32 32 class lfsvfs(vfsmod.vfs):
33 33 def join(self, path):
34 34 """split the path at first two characters, like: XX/XXXXX..."""
35 35 if not _lfsre.match(path):
36 36 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 37 return super(lfsvfs, self).join(path[0:2], path[2:])
38 38
39 39 def walk(self, path=None, onerror=None):
40 40 """Yield (dirpath, [], oids) tuple for blobs under path
41 41
42 42 Oids only exist in the root of this vfs, so dirpath is always ''.
43 43 """
44 44 root = os.path.normpath(self.base)
45 45 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 46 # because len(dirpath) < prefixlen.
47 47 prefixlen = len(pathutil.normasprefix(root))
48 48 oids = []
49 49
50 50 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 51 onerror=onerror):
52 52 dirpath = dirpath[prefixlen:]
53 53
54 54 # Silently skip unexpected files and directories
55 55 if len(dirpath) == 2:
56 56 oids.extend([dirpath + f for f in files
57 57 if _lfsre.match(dirpath + f)])
58 58
59 59 yield ('', [], oids)
60 60
61 61 class filewithprogress(object):
62 62 """a file-like object that supports __len__ and read.
63 63
64 64 Useful to provide progress information for how many bytes are read.
65 65 """
66 66
67 67 def __init__(self, fp, callback):
68 68 self._fp = fp
69 69 self._callback = callback # func(readsize)
70 70 fp.seek(0, os.SEEK_END)
71 71 self._len = fp.tell()
72 72 fp.seek(0)
73 73
74 74 def __len__(self):
75 75 return self._len
76 76
77 77 def read(self, size):
78 78 if self._fp is None:
79 79 return b''
80 80 data = self._fp.read(size)
81 81 if data:
82 82 if self._callback:
83 83 self._callback(len(data))
84 84 else:
85 85 self._fp.close()
86 86 self._fp = None
87 87 return data
88 88
89 89 class local(object):
90 90 """Local blobstore for large file contents.
91 91
92 92 This blobstore is used both as a cache and as a staging area for large blobs
93 93 to be uploaded to the remote blobstore.
94 94 """
95 95
96 96 def __init__(self, repo):
97 97 fullpath = repo.svfs.join('lfs/objects')
98 98 self.vfs = lfsvfs(fullpath)
99 99 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 100 self.cachevfs = lfsvfs(usercache)
101 101 self.ui = repo.ui
102 102
103 103 def open(self, oid):
104 104 """Open a read-only file descriptor to the named blob, in either the
105 105 usercache or the local store."""
106 106 # The usercache is the most likely place to hold the file. Commit will
107 107 # write to both it and the local store, as will anything that downloads
108 108 # the blobs. However, things like clone without an update won't
109 109 # populate the local store. For an init + push of a local clone,
110 110 # the usercache is the only place it _could_ be. If not present, the
111 111 # missing file msg here will indicate the local repo, not the usercache.
112 112 if self.cachevfs.exists(oid):
113 113 return self.cachevfs(oid, 'rb')
114 114
115 115 return self.vfs(oid, 'rb')
116 116
117 117 def download(self, oid, src):
118 118 """Read the blob from the remote source in chunks, verify the content,
119 119 and write to this local blobstore."""
120 120 sha256 = hashlib.sha256()
121 121
122 122 with self.vfs(oid, 'wb', atomictemp=True) as fp:
123 123 for chunk in util.filechunkiter(src, size=1048576):
124 124 fp.write(chunk)
125 125 sha256.update(chunk)
126 126
127 127 realoid = sha256.hexdigest()
128 128 if realoid != oid:
129 129 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
130 130
131 131 # XXX: should we verify the content of the cache, and hardlink back to
132 132 # the local store on success, but truncate, write and link on failure?
133 133 if not self.cachevfs.exists(oid):
134 134 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
135 135 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
136 136
137 137 def write(self, oid, data):
138 138 """Write blob to local blobstore.
139 139
140 140 This should only be called from the filelog during a commit or similar.
141 141 As such, there is no need to verify the data. Imports from a remote
142 142 store must use ``download()`` instead."""
143 143 with self.vfs(oid, 'wb', atomictemp=True) as fp:
144 144 fp.write(data)
145 145
146 146 # XXX: should we verify the content of the cache, and hardlink back to
147 147 # the local store on success, but truncate, write and link on failure?
148 148 if not self.cachevfs.exists(oid):
149 149 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
150 150 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
151 151
152 152 def read(self, oid, verify=True):
153 153 """Read blob from local blobstore."""
154 154 if not self.vfs.exists(oid):
155 155 blob = self._read(self.cachevfs, oid, verify)
156 156
157 157 # Even if revlog will verify the content, it needs to be verified
158 158 # now before making the hardlink to avoid propagating corrupt blobs.
159 159 # Don't abort if corruption is detected, because `hg verify` will
160 160 # give more useful info about the corruption- simply don't add the
161 161 # hardlink.
162 162 if verify or hashlib.sha256(blob).hexdigest() == oid:
163 163 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
164 164 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
165 165 else:
166 166 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
167 167 blob = self._read(self.vfs, oid, verify)
168 168 return blob
169 169
170 170 def _read(self, vfs, oid, verify):
171 171 """Read blob (after verifying) from the given store"""
172 172 blob = vfs.read(oid)
173 173 if verify:
174 174 _verify(oid, blob)
175 175 return blob
176 176
177 177 def has(self, oid):
178 178 """Returns True if the local blobstore contains the requested blob,
179 179 False otherwise."""
180 180 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
181 181
182 182 class _gitlfsremote(object):
183 183
184 184 def __init__(self, repo, url):
185 185 ui = repo.ui
186 186 self.ui = ui
187 187 baseurl, authinfo = url.authinfo()
188 188 self.baseurl = baseurl.rstrip('/')
189 189 useragent = repo.ui.config('experimental', 'lfs.user-agent')
190 190 if not useragent:
191 useragent = 'mercurial/%s git/2.15.1' % util.version()
191 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
192 192 self.urlopener = urlmod.opener(ui, authinfo, useragent)
193 193 self.retry = ui.configint('lfs', 'retry')
194 194
195 195 def writebatch(self, pointers, fromstore):
196 196 """Batch upload from local to remote blobstore."""
197 197 self._batch(pointers, fromstore, 'upload')
198 198
199 199 def readbatch(self, pointers, tostore):
200 200 """Batch download from remote to local blostore."""
201 201 self._batch(pointers, tostore, 'download')
202 202
203 203 def _batchrequest(self, pointers, action):
204 204 """Get metadata about objects pointed by pointers for given action
205 205
206 206 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
207 207 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
208 208 """
209 209 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
210 210 requestdata = json.dumps({
211 211 'objects': objects,
212 212 'operation': action,
213 213 })
214 214 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
215 215 data=requestdata)
216 216 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
217 217 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
218 218 try:
219 219 rawjson = self.urlopener.open(batchreq).read()
220 220 except util.urlerr.httperror as ex:
221 221 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
222 222 % (ex, action))
223 223 try:
224 224 response = json.loads(rawjson)
225 225 except ValueError:
226 226 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
227 227 % rawjson)
228 228 return response
229 229
230 230 def _checkforservererror(self, pointers, responses, action):
231 231 """Scans errors from objects
232 232
233 233 Raises LfsRemoteError if any objects have an error"""
234 234 for response in responses:
235 235 # The server should return 404 when objects cannot be found. Some
236 236 # server implementation (ex. lfs-test-server) does not set "error"
237 237 # but just removes "download" from "actions". Treat that case
238 238 # as the same as 404 error.
239 239 notfound = (response.get('error', {}).get('code') == 404
240 240 or (action == 'download'
241 241 and action not in response.get('actions', [])))
242 242 if notfound:
243 243 ptrmap = {p.oid(): p for p in pointers}
244 244 p = ptrmap.get(response['oid'], None)
245 245 if p:
246 246 filename = getattr(p, 'filename', 'unknown')
247 247 raise LfsRemoteError(
248 248 _(('LFS server error. Remote object '
249 249 'for "%s" not found: %r')) % (filename, response))
250 250 else:
251 251 raise LfsRemoteError(
252 252 _('LFS server error. Unsolicited response for oid %s')
253 253 % response['oid'])
254 254 if 'error' in response:
255 255 raise LfsRemoteError(_('LFS server error: %r') % response)
256 256
257 257 def _extractobjects(self, response, pointers, action):
258 258 """extract objects from response of the batch API
259 259
260 260 response: parsed JSON object returned by batch API
261 261 return response['objects'] filtered by action
262 262 raise if any object has an error
263 263 """
264 264 # Scan errors from objects - fail early
265 265 objects = response.get('objects', [])
266 266 self._checkforservererror(pointers, objects, action)
267 267
268 268 # Filter objects with given action. Practically, this skips uploading
269 269 # objects which exist in the server.
270 270 filteredobjects = [o for o in objects if action in o.get('actions', [])]
271 271
272 272 return filteredobjects
273 273
274 274 def _basictransfer(self, obj, action, localstore):
275 275 """Download or upload a single object using basic transfer protocol
276 276
277 277 obj: dict, an object description returned by batch API
278 278 action: string, one of ['upload', 'download']
279 279 localstore: blobstore.local
280 280
281 281 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
282 282 basic-transfers.md
283 283 """
284 284 oid = str(obj['oid'])
285 285
286 286 href = str(obj['actions'][action].get('href'))
287 287 headers = obj['actions'][action].get('header', {}).items()
288 288
289 289 request = util.urlreq.request(href)
290 290 if action == 'upload':
291 291 # If uploading blobs, read data from local blobstore.
292 292 with localstore.open(oid) as fp:
293 293 _verifyfile(oid, fp)
294 294 request.data = filewithprogress(localstore.open(oid), None)
295 295 request.get_method = lambda: 'PUT'
296 296
297 297 for k, v in headers:
298 298 request.add_header(k, v)
299 299
300 300 response = b''
301 301 try:
302 302 req = self.urlopener.open(request)
303 303 if action == 'download':
304 304 # If downloading blobs, store downloaded data to local blobstore
305 305 localstore.download(oid, req)
306 306 else:
307 307 while True:
308 308 data = req.read(1048576)
309 309 if not data:
310 310 break
311 311 response += data
312 312 if response:
313 313 self.ui.debug('lfs %s response: %s' % (action, response))
314 314 except util.urlerr.httperror as ex:
315 315 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
316 316 % (ex, oid, action))
317 317
318 318 def _batch(self, pointers, localstore, action):
319 319 if action not in ['upload', 'download']:
320 320 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
321 321
322 322 response = self._batchrequest(pointers, action)
323 323 objects = self._extractobjects(response, pointers, action)
324 324 total = sum(x.get('size', 0) for x in objects)
325 325 sizes = {}
326 326 for obj in objects:
327 327 sizes[obj.get('oid')] = obj.get('size', 0)
328 328 topic = {'upload': _('lfs uploading'),
329 329 'download': _('lfs downloading')}[action]
330 330 if len(objects) > 1:
331 331 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
332 332 % (len(objects), util.bytecount(total)))
333 333 self.ui.progress(topic, 0, total=total)
334 334 def transfer(chunk):
335 335 for obj in chunk:
336 336 objsize = obj.get('size', 0)
337 337 if self.ui.verbose:
338 338 if action == 'download':
339 339 msg = _('lfs: downloading %s (%s)\n')
340 340 elif action == 'upload':
341 341 msg = _('lfs: uploading %s (%s)\n')
342 342 self.ui.note(msg % (obj.get('oid'),
343 343 util.bytecount(objsize)))
344 344 retry = self.retry
345 345 while True:
346 346 try:
347 347 self._basictransfer(obj, action, localstore)
348 348 yield 1, obj.get('oid')
349 349 break
350 350 except socket.error as ex:
351 351 if retry > 0:
352 352 self.ui.note(
353 353 _('lfs: failed: %r (remaining retry %d)\n')
354 354 % (ex, retry))
355 355 retry -= 1
356 356 continue
357 357 raise
358 358
359 359 # Until https multiplexing gets sorted out
360 360 if self.ui.configbool('experimental', 'lfs.worker-enable'):
361 361 oids = worker.worker(self.ui, 0.1, transfer, (),
362 362 sorted(objects, key=lambda o: o.get('oid')))
363 363 else:
364 364 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
365 365
366 366 processed = 0
367 367 for _one, oid in oids:
368 368 processed += sizes[oid]
369 369 self.ui.progress(topic, processed, total=total)
370 370 self.ui.note(_('lfs: processed: %s\n') % oid)
371 371 self.ui.progress(topic, pos=None, total=total)
372 372
373 373 def __del__(self):
374 374 # copied from mercurial/httppeer.py
375 375 urlopener = getattr(self, 'urlopener', None)
376 376 if urlopener:
377 377 for h in urlopener.handlers:
378 378 h.close()
379 379 getattr(h, "close_all", lambda : None)()
380 380
381 381 class _dummyremote(object):
382 382 """Dummy store storing blobs to temp directory."""
383 383
384 384 def __init__(self, repo, url):
385 385 fullpath = repo.vfs.join('lfs', url.path)
386 386 self.vfs = lfsvfs(fullpath)
387 387
388 388 def writebatch(self, pointers, fromstore):
389 389 for p in pointers:
390 390 content = fromstore.read(p.oid(), verify=True)
391 391 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
392 392 fp.write(content)
393 393
394 394 def readbatch(self, pointers, tostore):
395 395 for p in pointers:
396 396 with self.vfs(p.oid(), 'rb') as fp:
397 397 tostore.download(p.oid(), fp)
398 398
399 399 class _nullremote(object):
400 400 """Null store storing blobs to /dev/null."""
401 401
402 402 def __init__(self, repo, url):
403 403 pass
404 404
405 405 def writebatch(self, pointers, fromstore):
406 406 pass
407 407
408 408 def readbatch(self, pointers, tostore):
409 409 pass
410 410
411 411 class _promptremote(object):
412 412 """Prompt user to set lfs.url when accessed."""
413 413
414 414 def __init__(self, repo, url):
415 415 pass
416 416
417 417 def writebatch(self, pointers, fromstore, ui=None):
418 418 self._prompt()
419 419
420 420 def readbatch(self, pointers, tostore, ui=None):
421 421 self._prompt()
422 422
423 423 def _prompt(self):
424 424 raise error.Abort(_('lfs.url needs to be configured'))
425 425
426 426 _storemap = {
427 427 'https': _gitlfsremote,
428 428 'http': _gitlfsremote,
429 429 'file': _dummyremote,
430 430 'null': _nullremote,
431 431 None: _promptremote,
432 432 }
433 433
434 434 def _verify(oid, content):
435 435 realoid = hashlib.sha256(content).hexdigest()
436 436 if realoid != oid:
437 437 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
438 438 hint=_('run hg verify'))
439 439
440 440 def _verifyfile(oid, fp):
441 441 sha256 = hashlib.sha256()
442 442 while True:
443 443 data = fp.read(1024 * 1024)
444 444 if not data:
445 445 break
446 446 sha256.update(data)
447 447 realoid = sha256.hexdigest()
448 448 if realoid != oid:
449 449 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
450 450 hint=_('run hg verify'))
451 451
452 452 def remote(repo):
453 453 """remotestore factory. return a store in _storemap depending on config"""
454 454 url = util.url(repo.ui.config('lfs', 'url') or '')
455 455 scheme = url.scheme
456 456 if scheme not in _storemap:
457 457 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
458 458 return _storemap[scheme](repo, url)
459 459
460 460 class LfsRemoteError(error.RevlogError):
461 461 pass
General Comments 0
You need to be logged in to leave comments. Login now