##// END OF EJS Templates
lfs: use progess helper...
Martin von Zweigbergk -
r38424:76a08cec default
parent child Browse files
Show More
@@ -1,580 +1,581 b''
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import hashlib
12 12 import json
13 13 import os
14 14 import re
15 15 import socket
16 16
17 17 from mercurial.i18n import _
18 18
19 19 from mercurial import (
20 20 error,
21 21 pathutil,
22 22 pycompat,
23 23 url as urlmod,
24 24 util,
25 25 vfs as vfsmod,
26 26 worker,
27 27 )
28 28
29 29 from ..largefiles import lfutil
30 30
31 31 # 64 bytes for SHA256
32 32 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
33 33
34 34 class lfsvfs(vfsmod.vfs):
35 35 def join(self, path):
36 36 """split the path at first two characters, like: XX/XXXXX..."""
37 37 if not _lfsre.match(path):
38 38 raise error.ProgrammingError('unexpected lfs path: %s' % path)
39 39 return super(lfsvfs, self).join(path[0:2], path[2:])
40 40
41 41 def walk(self, path=None, onerror=None):
42 42 """Yield (dirpath, [], oids) tuple for blobs under path
43 43
44 44 Oids only exist in the root of this vfs, so dirpath is always ''.
45 45 """
46 46 root = os.path.normpath(self.base)
47 47 # when dirpath == root, dirpath[prefixlen:] becomes empty
48 48 # because len(dirpath) < prefixlen.
49 49 prefixlen = len(pathutil.normasprefix(root))
50 50 oids = []
51 51
52 52 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
53 53 onerror=onerror):
54 54 dirpath = dirpath[prefixlen:]
55 55
56 56 # Silently skip unexpected files and directories
57 57 if len(dirpath) == 2:
58 58 oids.extend([dirpath + f for f in files
59 59 if _lfsre.match(dirpath + f)])
60 60
61 61 yield ('', [], oids)
62 62
63 63 class nullvfs(lfsvfs):
64 64 def __init__(self):
65 65 pass
66 66
67 67 def exists(self, oid):
68 68 return False
69 69
70 70 def read(self, oid):
71 71 # store.read() calls into here if the blob doesn't exist in its
72 72 # self.vfs. Raise the same error as a normal vfs when asked to read a
73 73 # file that doesn't exist. The only difference is the full file path
74 74 # isn't available in the error.
75 75 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
76 76
77 77 def walk(self, path=None, onerror=None):
78 78 return ('', [], [])
79 79
80 80 def write(self, oid, data):
81 81 pass
82 82
83 83 class filewithprogress(object):
84 84 """a file-like object that supports __len__ and read.
85 85
86 86 Useful to provide progress information for how many bytes are read.
87 87 """
88 88
89 89 def __init__(self, fp, callback):
90 90 self._fp = fp
91 91 self._callback = callback # func(readsize)
92 92 fp.seek(0, os.SEEK_END)
93 93 self._len = fp.tell()
94 94 fp.seek(0)
95 95
96 96 def __len__(self):
97 97 return self._len
98 98
99 99 def read(self, size):
100 100 if self._fp is None:
101 101 return b''
102 102 data = self._fp.read(size)
103 103 if data:
104 104 if self._callback:
105 105 self._callback(len(data))
106 106 else:
107 107 self._fp.close()
108 108 self._fp = None
109 109 return data
110 110
111 111 class local(object):
112 112 """Local blobstore for large file contents.
113 113
114 114 This blobstore is used both as a cache and as a staging area for large blobs
115 115 to be uploaded to the remote blobstore.
116 116 """
117 117
118 118 def __init__(self, repo):
119 119 fullpath = repo.svfs.join('lfs/objects')
120 120 self.vfs = lfsvfs(fullpath)
121 121
122 122 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
123 123 self.cachevfs = nullvfs()
124 124 else:
125 125 usercache = lfutil._usercachedir(repo.ui, 'lfs')
126 126 self.cachevfs = lfsvfs(usercache)
127 127 self.ui = repo.ui
128 128
129 129 def open(self, oid):
130 130 """Open a read-only file descriptor to the named blob, in either the
131 131 usercache or the local store."""
132 132 # The usercache is the most likely place to hold the file. Commit will
133 133 # write to both it and the local store, as will anything that downloads
134 134 # the blobs. However, things like clone without an update won't
135 135 # populate the local store. For an init + push of a local clone,
136 136 # the usercache is the only place it _could_ be. If not present, the
137 137 # missing file msg here will indicate the local repo, not the usercache.
138 138 if self.cachevfs.exists(oid):
139 139 return self.cachevfs(oid, 'rb')
140 140
141 141 return self.vfs(oid, 'rb')
142 142
143 143 def download(self, oid, src):
144 144 """Read the blob from the remote source in chunks, verify the content,
145 145 and write to this local blobstore."""
146 146 sha256 = hashlib.sha256()
147 147
148 148 with self.vfs(oid, 'wb', atomictemp=True) as fp:
149 149 for chunk in util.filechunkiter(src, size=1048576):
150 150 fp.write(chunk)
151 151 sha256.update(chunk)
152 152
153 153 realoid = sha256.hexdigest()
154 154 if realoid != oid:
155 155 raise LfsCorruptionError(_('corrupt remote lfs object: %s')
156 156 % oid)
157 157
158 158 self._linktousercache(oid)
159 159
160 160 def write(self, oid, data):
161 161 """Write blob to local blobstore.
162 162
163 163 This should only be called from the filelog during a commit or similar.
164 164 As such, there is no need to verify the data. Imports from a remote
165 165 store must use ``download()`` instead."""
166 166 with self.vfs(oid, 'wb', atomictemp=True) as fp:
167 167 fp.write(data)
168 168
169 169 self._linktousercache(oid)
170 170
171 171 def _linktousercache(self, oid):
172 172 # XXX: should we verify the content of the cache, and hardlink back to
173 173 # the local store on success, but truncate, write and link on failure?
174 174 if (not self.cachevfs.exists(oid)
175 175 and not isinstance(self.cachevfs, nullvfs)):
176 176 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
177 177 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
178 178
179 179 def read(self, oid, verify=True):
180 180 """Read blob from local blobstore."""
181 181 if not self.vfs.exists(oid):
182 182 blob = self._read(self.cachevfs, oid, verify)
183 183
184 184 # Even if revlog will verify the content, it needs to be verified
185 185 # now before making the hardlink to avoid propagating corrupt blobs.
186 186 # Don't abort if corruption is detected, because `hg verify` will
187 187 # give more useful info about the corruption- simply don't add the
188 188 # hardlink.
189 189 if verify or hashlib.sha256(blob).hexdigest() == oid:
190 190 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
191 191 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
192 192 else:
193 193 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
194 194 blob = self._read(self.vfs, oid, verify)
195 195 return blob
196 196
197 197 def _read(self, vfs, oid, verify):
198 198 """Read blob (after verifying) from the given store"""
199 199 blob = vfs.read(oid)
200 200 if verify:
201 201 _verify(oid, blob)
202 202 return blob
203 203
204 204 def verify(self, oid):
205 205 """Indicate whether or not the hash of the underlying file matches its
206 206 name."""
207 207 sha256 = hashlib.sha256()
208 208
209 209 with self.open(oid) as fp:
210 210 for chunk in util.filechunkiter(fp, size=1048576):
211 211 sha256.update(chunk)
212 212
213 213 return oid == sha256.hexdigest()
214 214
215 215 def has(self, oid):
216 216 """Returns True if the local blobstore contains the requested blob,
217 217 False otherwise."""
218 218 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
219 219
220 220 class _gitlfsremote(object):
221 221
222 222 def __init__(self, repo, url):
223 223 ui = repo.ui
224 224 self.ui = ui
225 225 baseurl, authinfo = url.authinfo()
226 226 self.baseurl = baseurl.rstrip('/')
227 227 useragent = repo.ui.config('experimental', 'lfs.user-agent')
228 228 if not useragent:
229 229 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
230 230 self.urlopener = urlmod.opener(ui, authinfo, useragent)
231 231 self.retry = ui.configint('lfs', 'retry')
232 232
233 233 def writebatch(self, pointers, fromstore):
234 234 """Batch upload from local to remote blobstore."""
235 235 self._batch(_deduplicate(pointers), fromstore, 'upload')
236 236
237 237 def readbatch(self, pointers, tostore):
238 238 """Batch download from remote to local blostore."""
239 239 self._batch(_deduplicate(pointers), tostore, 'download')
240 240
241 241 def _batchrequest(self, pointers, action):
242 242 """Get metadata about objects pointed by pointers for given action
243 243
244 244 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
245 245 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
246 246 """
247 247 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
248 248 requestdata = json.dumps({
249 249 'objects': objects,
250 250 'operation': action,
251 251 })
252 252 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
253 253 data=requestdata)
254 254 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
255 255 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
256 256 try:
257 257 rsp = self.urlopener.open(batchreq)
258 258 rawjson = rsp.read()
259 259 except util.urlerr.httperror as ex:
260 260 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
261 261 % (ex, action))
262 262 try:
263 263 response = json.loads(rawjson)
264 264 except ValueError:
265 265 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
266 266 % rawjson)
267 267
268 268 if self.ui.debugflag:
269 269 self.ui.debug('Status: %d\n' % rsp.status)
270 270 # lfs-test-server and hg serve return headers in different order
271 271 self.ui.debug('%s\n'
272 272 % '\n'.join(sorted(str(rsp.info()).splitlines())))
273 273
274 274 if 'objects' in response:
275 275 response['objects'] = sorted(response['objects'],
276 276 key=lambda p: p['oid'])
277 277 self.ui.debug('%s\n'
278 278 % json.dumps(response, indent=2,
279 279 separators=('', ': '), sort_keys=True))
280 280
281 281 return response
282 282
283 283 def _checkforservererror(self, pointers, responses, action):
284 284 """Scans errors from objects
285 285
286 286 Raises LfsRemoteError if any objects have an error"""
287 287 for response in responses:
288 288 # The server should return 404 when objects cannot be found. Some
289 289 # server implementation (ex. lfs-test-server) does not set "error"
290 290 # but just removes "download" from "actions". Treat that case
291 291 # as the same as 404 error.
292 292 if 'error' not in response:
293 293 if (action == 'download'
294 294 and action not in response.get('actions', [])):
295 295 code = 404
296 296 else:
297 297 continue
298 298 else:
299 299 # An error dict without a code doesn't make much sense, so
300 300 # treat as a server error.
301 301 code = response.get('error').get('code', 500)
302 302
303 303 ptrmap = {p.oid(): p for p in pointers}
304 304 p = ptrmap.get(response['oid'], None)
305 305 if p:
306 306 filename = getattr(p, 'filename', 'unknown')
307 307 errors = {
308 308 404: 'The object does not exist',
309 309 410: 'The object was removed by the owner',
310 310 422: 'Validation error',
311 311 500: 'Internal server error',
312 312 }
313 313 msg = errors.get(code, 'status code %d' % code)
314 314 raise LfsRemoteError(_('LFS server error for "%s": %s')
315 315 % (filename, msg))
316 316 else:
317 317 raise LfsRemoteError(
318 318 _('LFS server error. Unsolicited response for oid %s')
319 319 % response['oid'])
320 320
321 321 def _extractobjects(self, response, pointers, action):
322 322 """extract objects from response of the batch API
323 323
324 324 response: parsed JSON object returned by batch API
325 325 return response['objects'] filtered by action
326 326 raise if any object has an error
327 327 """
328 328 # Scan errors from objects - fail early
329 329 objects = response.get('objects', [])
330 330 self._checkforservererror(pointers, objects, action)
331 331
332 332 # Filter objects with given action. Practically, this skips uploading
333 333 # objects which exist in the server.
334 334 filteredobjects = [o for o in objects if action in o.get('actions', [])]
335 335
336 336 return filteredobjects
337 337
338 338 def _basictransfer(self, obj, action, localstore):
339 339 """Download or upload a single object using basic transfer protocol
340 340
341 341 obj: dict, an object description returned by batch API
342 342 action: string, one of ['upload', 'download']
343 343 localstore: blobstore.local
344 344
345 345 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
346 346 basic-transfers.md
347 347 """
348 348 oid = pycompat.bytestr(obj['oid'])
349 349
350 350 href = pycompat.bytestr(obj['actions'][action].get('href'))
351 351 headers = obj['actions'][action].get('header', {}).items()
352 352
353 353 request = util.urlreq.request(href)
354 354 if action == 'upload':
355 355 # If uploading blobs, read data from local blobstore.
356 356 if not localstore.verify(oid):
357 357 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
358 358 hint=_('run hg verify'))
359 359 request.data = filewithprogress(localstore.open(oid), None)
360 360 request.get_method = lambda: 'PUT'
361 361 request.add_header('Content-Type', 'application/octet-stream')
362 362
363 363 for k, v in headers:
364 364 request.add_header(k, v)
365 365
366 366 response = b''
367 367 try:
368 368 req = self.urlopener.open(request)
369 369
370 370 if self.ui.debugflag:
371 371 self.ui.debug('Status: %d\n' % req.status)
372 372 # lfs-test-server and hg serve return headers in different order
373 373 self.ui.debug('%s\n'
374 374 % '\n'.join(sorted(str(req.info()).splitlines())))
375 375
376 376 if action == 'download':
377 377 # If downloading blobs, store downloaded data to local blobstore
378 378 localstore.download(oid, req)
379 379 else:
380 380 while True:
381 381 data = req.read(1048576)
382 382 if not data:
383 383 break
384 384 response += data
385 385 if response:
386 386 self.ui.debug('lfs %s response: %s' % (action, response))
387 387 except util.urlerr.httperror as ex:
388 388 if self.ui.debugflag:
389 389 self.ui.debug('%s: %s\n' % (oid, ex.read()))
390 390 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
391 391 % (ex, oid, action))
392 392
393 393 def _batch(self, pointers, localstore, action):
394 394 if action not in ['upload', 'download']:
395 395 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
396 396
397 397 response = self._batchrequest(pointers, action)
398 398 objects = self._extractobjects(response, pointers, action)
399 399 total = sum(x.get('size', 0) for x in objects)
400 400 sizes = {}
401 401 for obj in objects:
402 402 sizes[obj.get('oid')] = obj.get('size', 0)
403 403 topic = {'upload': _('lfs uploading'),
404 404 'download': _('lfs downloading')}[action]
405 405 if len(objects) > 1:
406 406 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
407 407 % (len(objects), util.bytecount(total)))
408 self.ui.progress(topic, 0, total=total)
408 progress = self.ui.makeprogress(topic, total=total)
409 progress.update(0)
409 410 def transfer(chunk):
410 411 for obj in chunk:
411 412 objsize = obj.get('size', 0)
412 413 if self.ui.verbose:
413 414 if action == 'download':
414 415 msg = _('lfs: downloading %s (%s)\n')
415 416 elif action == 'upload':
416 417 msg = _('lfs: uploading %s (%s)\n')
417 418 self.ui.note(msg % (obj.get('oid'),
418 419 util.bytecount(objsize)))
419 420 retry = self.retry
420 421 while True:
421 422 try:
422 423 self._basictransfer(obj, action, localstore)
423 424 yield 1, obj.get('oid')
424 425 break
425 426 except socket.error as ex:
426 427 if retry > 0:
427 428 self.ui.note(
428 429 _('lfs: failed: %r (remaining retry %d)\n')
429 430 % (ex, retry))
430 431 retry -= 1
431 432 continue
432 433 raise
433 434
434 435 # Until https multiplexing gets sorted out
435 436 if self.ui.configbool('experimental', 'lfs.worker-enable'):
436 437 oids = worker.worker(self.ui, 0.1, transfer, (),
437 438 sorted(objects, key=lambda o: o.get('oid')))
438 439 else:
439 440 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
440 441
441 442 processed = 0
442 443 blobs = 0
443 444 for _one, oid in oids:
444 445 processed += sizes[oid]
445 446 blobs += 1
446 self.ui.progress(topic, processed, total=total)
447 progress.update(processed)
447 448 self.ui.note(_('lfs: processed: %s\n') % oid)
448 self.ui.progress(topic, pos=None, total=total)
449 progress.complete()
449 450
450 451 if blobs > 0:
451 452 if action == 'upload':
452 453 self.ui.status(_('lfs: uploaded %d files (%s)\n')
453 454 % (blobs, util.bytecount(processed)))
454 455 elif action == 'download':
455 456 self.ui.status(_('lfs: downloaded %d files (%s)\n')
456 457 % (blobs, util.bytecount(processed)))
457 458
458 459 def __del__(self):
459 460 # copied from mercurial/httppeer.py
460 461 urlopener = getattr(self, 'urlopener', None)
461 462 if urlopener:
462 463 for h in urlopener.handlers:
463 464 h.close()
464 465 getattr(h, "close_all", lambda : None)()
465 466
466 467 class _dummyremote(object):
467 468 """Dummy store storing blobs to temp directory."""
468 469
469 470 def __init__(self, repo, url):
470 471 fullpath = repo.vfs.join('lfs', url.path)
471 472 self.vfs = lfsvfs(fullpath)
472 473
473 474 def writebatch(self, pointers, fromstore):
474 475 for p in _deduplicate(pointers):
475 476 content = fromstore.read(p.oid(), verify=True)
476 477 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
477 478 fp.write(content)
478 479
479 480 def readbatch(self, pointers, tostore):
480 481 for p in _deduplicate(pointers):
481 482 with self.vfs(p.oid(), 'rb') as fp:
482 483 tostore.download(p.oid(), fp)
483 484
484 485 class _nullremote(object):
485 486 """Null store storing blobs to /dev/null."""
486 487
487 488 def __init__(self, repo, url):
488 489 pass
489 490
490 491 def writebatch(self, pointers, fromstore):
491 492 pass
492 493
493 494 def readbatch(self, pointers, tostore):
494 495 pass
495 496
496 497 class _promptremote(object):
497 498 """Prompt user to set lfs.url when accessed."""
498 499
499 500 def __init__(self, repo, url):
500 501 pass
501 502
502 503 def writebatch(self, pointers, fromstore, ui=None):
503 504 self._prompt()
504 505
505 506 def readbatch(self, pointers, tostore, ui=None):
506 507 self._prompt()
507 508
508 509 def _prompt(self):
509 510 raise error.Abort(_('lfs.url needs to be configured'))
510 511
511 512 _storemap = {
512 513 'https': _gitlfsremote,
513 514 'http': _gitlfsremote,
514 515 'file': _dummyremote,
515 516 'null': _nullremote,
516 517 None: _promptremote,
517 518 }
518 519
519 520 def _deduplicate(pointers):
520 521 """Remove any duplicate oids that exist in the list"""
521 522 reduced = util.sortdict()
522 523 for p in pointers:
523 524 reduced[p.oid()] = p
524 525 return reduced.values()
525 526
526 527 def _verify(oid, content):
527 528 realoid = hashlib.sha256(content).hexdigest()
528 529 if realoid != oid:
529 530 raise LfsCorruptionError(_('detected corrupt lfs object: %s') % oid,
530 531 hint=_('run hg verify'))
531 532
532 533 def remote(repo, remote=None):
533 534 """remotestore factory. return a store in _storemap depending on config
534 535
535 536 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
536 537 infer the endpoint, based on the remote repository using the same path
537 538 adjustments as git. As an extension, 'http' is supported as well so that
538 539 ``hg serve`` works out of the box.
539 540
540 541 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
541 542 """
542 543 lfsurl = repo.ui.config('lfs', 'url')
543 544 url = util.url(lfsurl or '')
544 545 if lfsurl is None:
545 546 if remote:
546 547 path = remote
547 548 elif util.safehasattr(repo, '_subtoppath'):
548 549 # The pull command sets this during the optional update phase, which
549 550 # tells exactly where the pull originated, whether 'paths.default'
550 551 # or explicit.
551 552 path = repo._subtoppath
552 553 else:
553 554 # TODO: investigate 'paths.remote:lfsurl' style path customization,
554 555 # and fall back to inferring from 'paths.remote' if unspecified.
555 556 path = repo.ui.config('paths', 'default') or ''
556 557
557 558 defaulturl = util.url(path)
558 559
559 560 # TODO: support local paths as well.
560 561 # TODO: consider the ssh -> https transformation that git applies
561 562 if defaulturl.scheme in (b'http', b'https'):
562 563 if defaulturl.path and defaulturl.path[:-1] != b'/':
563 564 defaulturl.path += b'/'
564 565 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
565 566
566 567 url = util.url(bytes(defaulturl))
567 568 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
568 569
569 570 scheme = url.scheme
570 571 if scheme not in _storemap:
571 572 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
572 573 return _storemap[scheme](repo, url)
573 574
574 575 class LfsRemoteError(error.RevlogError):
575 576 pass
576 577
577 578 class LfsCorruptionError(error.Abort):
578 579 """Raised when a corrupt blob is detected, aborting an operation
579 580
580 581 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now