##// END OF EJS Templates
lfs: fix the stall and corruption issue when concurrently uploading blobs...
Matt Harbison -
r44746:43eea17a default
parent child Browse files
Show More
@@ -1,776 +1,765 b''
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import errno
12 12 import hashlib
13 13 import json
14 14 import os
15 15 import re
16 16 import socket
17 17
18 18 from mercurial.i18n import _
19 19 from mercurial.pycompat import getattr
20 20
21 21 from mercurial import (
22 22 encoding,
23 23 error,
24 httpconnection as httpconnectionmod,
24 25 node,
25 26 pathutil,
26 27 pycompat,
27 28 url as urlmod,
28 29 util,
29 30 vfs as vfsmod,
30 31 worker,
31 32 )
32 33
33 34 from mercurial.utils import stringutil
34 35
35 36 from ..largefiles import lfutil
36 37
37 38 # 64 bytes for SHA256
38 39 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
39 40
40 41
41 42 class lfsvfs(vfsmod.vfs):
42 43 def join(self, path):
43 44 """split the path at first two characters, like: XX/XXXXX..."""
44 45 if not _lfsre.match(path):
45 46 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
46 47 return super(lfsvfs, self).join(path[0:2], path[2:])
47 48
48 49 def walk(self, path=None, onerror=None):
49 50 """Yield (dirpath, [], oids) tuple for blobs under path
50 51
51 52 Oids only exist in the root of this vfs, so dirpath is always ''.
52 53 """
53 54 root = os.path.normpath(self.base)
54 55 # when dirpath == root, dirpath[prefixlen:] becomes empty
55 56 # because len(dirpath) < prefixlen.
56 57 prefixlen = len(pathutil.normasprefix(root))
57 58 oids = []
58 59
59 60 for dirpath, dirs, files in os.walk(
60 61 self.reljoin(self.base, path or b''), onerror=onerror
61 62 ):
62 63 dirpath = dirpath[prefixlen:]
63 64
64 65 # Silently skip unexpected files and directories
65 66 if len(dirpath) == 2:
66 67 oids.extend(
67 68 [dirpath + f for f in files if _lfsre.match(dirpath + f)]
68 69 )
69 70
70 71 yield (b'', [], oids)
71 72
72 73
73 74 class nullvfs(lfsvfs):
74 75 def __init__(self):
75 76 pass
76 77
77 78 def exists(self, oid):
78 79 return False
79 80
80 81 def read(self, oid):
81 82 # store.read() calls into here if the blob doesn't exist in its
82 83 # self.vfs. Raise the same error as a normal vfs when asked to read a
83 84 # file that doesn't exist. The only difference is the full file path
84 85 # isn't available in the error.
85 86 raise IOError(
86 87 errno.ENOENT,
87 88 pycompat.sysstr(b'%s: No such file or directory' % oid),
88 89 )
89 90
90 91 def walk(self, path=None, onerror=None):
91 92 return (b'', [], [])
92 93
93 94 def write(self, oid, data):
94 95 pass
95 96
96 97
97 class lfsuploadfile(object):
98 """a file-like object that supports __len__ and read.
98 class lfsuploadfile(httpconnectionmod.httpsendfile):
99 """a file-like object that supports keepalive.
99 100 """
100 101
101 def __init__(self, fp):
102 self._fp = fp
103 fp.seek(0, os.SEEK_END)
104 self._len = fp.tell()
105 fp.seek(0)
106
107 def __len__(self):
108 return self._len
102 def __init__(self, ui, filename):
103 super(lfsuploadfile, self).__init__(ui, filename, b'rb')
104 self.read = self._data.read
109 105
110 def read(self, size):
111 if self._fp is None:
112 return b''
113 return self._fp.read(size)
114
115 def close(self):
116 if self._fp is not None:
117 self._fp.close()
118 self._fp = None
106 def _makeprogress(self):
107 return None # progress is handled by the worker client
119 108
120 109
121 110 class local(object):
122 111 """Local blobstore for large file contents.
123 112
124 113 This blobstore is used both as a cache and as a staging area for large blobs
125 114 to be uploaded to the remote blobstore.
126 115 """
127 116
128 117 def __init__(self, repo):
129 118 fullpath = repo.svfs.join(b'lfs/objects')
130 119 self.vfs = lfsvfs(fullpath)
131 120
132 121 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
133 122 self.cachevfs = nullvfs()
134 123 else:
135 124 usercache = lfutil._usercachedir(repo.ui, b'lfs')
136 125 self.cachevfs = lfsvfs(usercache)
137 126 self.ui = repo.ui
138 127
139 128 def open(self, oid):
140 129 """Open a read-only file descriptor to the named blob, in either the
141 130 usercache or the local store."""
142 131 return open(self.path(oid), b'rb')
143 132
144 133 def path(self, oid):
145 134 """Build the path for the given blob ``oid``.
146 135
147 136 If the blob exists locally, the path may point to either the usercache
148 137 or the local store. If it doesn't, it will point to the local store.
149 138 This is meant for situations where existing code that isn't LFS aware
150 139 needs to open a blob. Generally, prefer the ``open`` method on this
151 140 class.
152 141 """
153 142 # The usercache is the most likely place to hold the file. Commit will
154 143 # write to both it and the local store, as will anything that downloads
155 144 # the blobs. However, things like clone without an update won't
156 145 # populate the local store. For an init + push of a local clone,
157 146 # the usercache is the only place it _could_ be. If not present, the
158 147 # missing file msg here will indicate the local repo, not the usercache.
159 148 if self.cachevfs.exists(oid):
160 149 return self.cachevfs.join(oid)
161 150
162 151 return self.vfs.join(oid)
163 152
164 153 def download(self, oid, src, content_length):
165 154 """Read the blob from the remote source in chunks, verify the content,
166 155 and write to this local blobstore."""
167 156 sha256 = hashlib.sha256()
168 157 size = 0
169 158
170 159 with self.vfs(oid, b'wb', atomictemp=True) as fp:
171 160 for chunk in util.filechunkiter(src, size=1048576):
172 161 fp.write(chunk)
173 162 sha256.update(chunk)
174 163 size += len(chunk)
175 164
176 165 # If the server advertised a length longer than what we actually
177 166 # received, then we should expect that the server crashed while
178 167 # producing the response (but the server has no way of telling us
179 168 # that), and we really don't need to try to write the response to
180 169 # the localstore, because it's not going to match the expected.
181 170 if content_length is not None and int(content_length) != size:
182 171 msg = (
183 172 b"Response length (%s) does not match Content-Length "
184 173 b"header (%d): likely server-side crash"
185 174 )
186 175 raise LfsRemoteError(_(msg) % (size, int(content_length)))
187 176
188 177 realoid = node.hex(sha256.digest())
189 178 if realoid != oid:
190 179 raise LfsCorruptionError(
191 180 _(b'corrupt remote lfs object: %s') % oid
192 181 )
193 182
194 183 self._linktousercache(oid)
195 184
196 185 def write(self, oid, data):
197 186 """Write blob to local blobstore.
198 187
199 188 This should only be called from the filelog during a commit or similar.
200 189 As such, there is no need to verify the data. Imports from a remote
201 190 store must use ``download()`` instead."""
202 191 with self.vfs(oid, b'wb', atomictemp=True) as fp:
203 192 fp.write(data)
204 193
205 194 self._linktousercache(oid)
206 195
207 196 def linkfromusercache(self, oid):
208 197 """Link blobs found in the user cache into this store.
209 198
210 199 The server module needs to do this when it lets the client know not to
211 200 upload the blob, to ensure it is always available in this store.
212 201 Normally this is done implicitly when the client reads or writes the
213 202 blob, but that doesn't happen when the server tells the client that it
214 203 already has the blob.
215 204 """
216 205 if not isinstance(self.cachevfs, nullvfs) and not self.vfs.exists(oid):
217 206 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
218 207 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
219 208
220 209 def _linktousercache(self, oid):
221 210 # XXX: should we verify the content of the cache, and hardlink back to
222 211 # the local store on success, but truncate, write and link on failure?
223 212 if not self.cachevfs.exists(oid) and not isinstance(
224 213 self.cachevfs, nullvfs
225 214 ):
226 215 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
227 216 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
228 217
229 218 def read(self, oid, verify=True):
230 219 """Read blob from local blobstore."""
231 220 if not self.vfs.exists(oid):
232 221 blob = self._read(self.cachevfs, oid, verify)
233 222
234 223 # Even if revlog will verify the content, it needs to be verified
235 224 # now before making the hardlink to avoid propagating corrupt blobs.
236 225 # Don't abort if corruption is detected, because `hg verify` will
237 226 # give more useful info about the corruption- simply don't add the
238 227 # hardlink.
239 228 if verify or node.hex(hashlib.sha256(blob).digest()) == oid:
240 229 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
241 230 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
242 231 else:
243 232 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
244 233 blob = self._read(self.vfs, oid, verify)
245 234 return blob
246 235
247 236 def _read(self, vfs, oid, verify):
248 237 """Read blob (after verifying) from the given store"""
249 238 blob = vfs.read(oid)
250 239 if verify:
251 240 _verify(oid, blob)
252 241 return blob
253 242
254 243 def verify(self, oid):
255 244 """Indicate whether or not the hash of the underlying file matches its
256 245 name."""
257 246 sha256 = hashlib.sha256()
258 247
259 248 with self.open(oid) as fp:
260 249 for chunk in util.filechunkiter(fp, size=1048576):
261 250 sha256.update(chunk)
262 251
263 252 return oid == node.hex(sha256.digest())
264 253
265 254 def has(self, oid):
266 255 """Returns True if the local blobstore contains the requested blob,
267 256 False otherwise."""
268 257 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
269 258
270 259
271 260 def _urlerrorreason(urlerror):
272 261 '''Create a friendly message for the given URLError to be used in an
273 262 LfsRemoteError message.
274 263 '''
275 264 inst = urlerror
276 265
277 266 if isinstance(urlerror.reason, Exception):
278 267 inst = urlerror.reason
279 268
280 269 if util.safehasattr(inst, b'reason'):
281 270 try: # usually it is in the form (errno, strerror)
282 271 reason = inst.reason.args[1]
283 272 except (AttributeError, IndexError):
284 273 # it might be anything, for example a string
285 274 reason = inst.reason
286 275 if isinstance(reason, pycompat.unicode):
287 276 # SSLError of Python 2.7.9 contains a unicode
288 277 reason = encoding.unitolocal(reason)
289 278 return reason
290 279 elif getattr(inst, "strerror", None):
291 280 return encoding.strtolocal(inst.strerror)
292 281 else:
293 282 return stringutil.forcebytestr(urlerror)
294 283
295 284
296 285 class lfsauthhandler(util.urlreq.basehandler):
297 286 handler_order = 480 # Before HTTPDigestAuthHandler (== 490)
298 287
299 288 def http_error_401(self, req, fp, code, msg, headers):
300 289 """Enforces that any authentication performed is HTTP Basic
301 290 Authentication. No authentication is also acceptable.
302 291 """
303 292 authreq = headers.get('www-authenticate', None)
304 293 if authreq:
305 294 scheme = authreq.split()[0]
306 295
307 296 if scheme.lower() != 'basic':
308 297 msg = _(b'the server must support Basic Authentication')
309 298 raise util.urlerr.httperror(
310 299 req.get_full_url(),
311 300 code,
312 301 encoding.strfromlocal(msg),
313 302 headers,
314 303 fp,
315 304 )
316 305 return None
317 306
318 307
319 308 class _gitlfsremote(object):
320 309 def __init__(self, repo, url):
321 310 ui = repo.ui
322 311 self.ui = ui
323 312 baseurl, authinfo = url.authinfo()
324 313 self.baseurl = baseurl.rstrip(b'/')
325 314 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
326 315 if not useragent:
327 316 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
328 317 self.urlopener = urlmod.opener(ui, authinfo, useragent)
329 318 self.urlopener.add_handler(lfsauthhandler())
330 319 self.retry = ui.configint(b'lfs', b'retry')
331 320
332 321 def writebatch(self, pointers, fromstore):
333 322 """Batch upload from local to remote blobstore."""
334 323 self._batch(_deduplicate(pointers), fromstore, b'upload')
335 324
336 325 def readbatch(self, pointers, tostore):
337 326 """Batch download from remote to local blostore."""
338 327 self._batch(_deduplicate(pointers), tostore, b'download')
339 328
340 329 def _batchrequest(self, pointers, action):
341 330 """Get metadata about objects pointed by pointers for given action
342 331
343 332 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
344 333 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
345 334 """
346 335 objects = [
347 336 {'oid': pycompat.strurl(p.oid()), 'size': p.size()}
348 337 for p in pointers
349 338 ]
350 339 requestdata = pycompat.bytesurl(
351 340 json.dumps(
352 341 {'objects': objects, 'operation': pycompat.strurl(action),}
353 342 )
354 343 )
355 344 url = b'%s/objects/batch' % self.baseurl
356 345 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
357 346 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
358 347 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
359 348 try:
360 349 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
361 350 rawjson = rsp.read()
362 351 except util.urlerr.httperror as ex:
363 352 hints = {
364 353 400: _(
365 354 b'check that lfs serving is enabled on %s and "%s" is '
366 355 b'supported'
367 356 )
368 357 % (self.baseurl, action),
369 358 404: _(b'the "lfs.url" config may be used to override %s')
370 359 % self.baseurl,
371 360 }
372 361 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
373 362 raise LfsRemoteError(
374 363 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
375 364 hint=hint,
376 365 )
377 366 except util.urlerr.urlerror as ex:
378 367 hint = (
379 368 _(b'the "lfs.url" config may be used to override %s')
380 369 % self.baseurl
381 370 )
382 371 raise LfsRemoteError(
383 372 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
384 373 )
385 374 try:
386 375 response = pycompat.json_loads(rawjson)
387 376 except ValueError:
388 377 raise LfsRemoteError(
389 378 _(b'LFS server returns invalid JSON: %s')
390 379 % rawjson.encode("utf-8")
391 380 )
392 381
393 382 if self.ui.debugflag:
394 383 self.ui.debug(b'Status: %d\n' % rsp.status)
395 384 # lfs-test-server and hg serve return headers in different order
396 385 headers = pycompat.bytestr(rsp.info()).strip()
397 386 self.ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
398 387
399 388 if 'objects' in response:
400 389 response['objects'] = sorted(
401 390 response['objects'], key=lambda p: p['oid']
402 391 )
403 392 self.ui.debug(
404 393 b'%s\n'
405 394 % pycompat.bytesurl(
406 395 json.dumps(
407 396 response,
408 397 indent=2,
409 398 separators=('', ': '),
410 399 sort_keys=True,
411 400 )
412 401 )
413 402 )
414 403
415 404 def encodestr(x):
416 405 if isinstance(x, pycompat.unicode):
417 406 return x.encode('utf-8')
418 407 return x
419 408
420 409 return pycompat.rapply(encodestr, response)
421 410
422 411 def _checkforservererror(self, pointers, responses, action):
423 412 """Scans errors from objects
424 413
425 414 Raises LfsRemoteError if any objects have an error"""
426 415 for response in responses:
427 416 # The server should return 404 when objects cannot be found. Some
428 417 # server implementation (ex. lfs-test-server) does not set "error"
429 418 # but just removes "download" from "actions". Treat that case
430 419 # as the same as 404 error.
431 420 if b'error' not in response:
432 421 if action == b'download' and action not in response.get(
433 422 b'actions', []
434 423 ):
435 424 code = 404
436 425 else:
437 426 continue
438 427 else:
439 428 # An error dict without a code doesn't make much sense, so
440 429 # treat as a server error.
441 430 code = response.get(b'error').get(b'code', 500)
442 431
443 432 ptrmap = {p.oid(): p for p in pointers}
444 433 p = ptrmap.get(response[b'oid'], None)
445 434 if p:
446 435 filename = getattr(p, 'filename', b'unknown')
447 436 errors = {
448 437 404: b'The object does not exist',
449 438 410: b'The object was removed by the owner',
450 439 422: b'Validation error',
451 440 500: b'Internal server error',
452 441 }
453 442 msg = errors.get(code, b'status code %d' % code)
454 443 raise LfsRemoteError(
455 444 _(b'LFS server error for "%s": %s') % (filename, msg)
456 445 )
457 446 else:
458 447 raise LfsRemoteError(
459 448 _(b'LFS server error. Unsolicited response for oid %s')
460 449 % response[b'oid']
461 450 )
462 451
463 452 def _extractobjects(self, response, pointers, action):
464 453 """extract objects from response of the batch API
465 454
466 455 response: parsed JSON object returned by batch API
467 456 return response['objects'] filtered by action
468 457 raise if any object has an error
469 458 """
470 459 # Scan errors from objects - fail early
471 460 objects = response.get(b'objects', [])
472 461 self._checkforservererror(pointers, objects, action)
473 462
474 463 # Filter objects with given action. Practically, this skips uploading
475 464 # objects which exist in the server.
476 465 filteredobjects = [
477 466 o for o in objects if action in o.get(b'actions', [])
478 467 ]
479 468
480 469 return filteredobjects
481 470
482 471 def _basictransfer(self, obj, action, localstore):
483 472 """Download or upload a single object using basic transfer protocol
484 473
485 474 obj: dict, an object description returned by batch API
486 475 action: string, one of ['upload', 'download']
487 476 localstore: blobstore.local
488 477
489 478 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
490 479 basic-transfers.md
491 480 """
492 481 oid = obj[b'oid']
493 482 href = obj[b'actions'][action].get(b'href')
494 483 headers = obj[b'actions'][action].get(b'header', {}).items()
495 484
496 485 request = util.urlreq.request(pycompat.strurl(href))
497 486 if action == b'upload':
498 487 # If uploading blobs, read data from local blobstore.
499 488 if not localstore.verify(oid):
500 489 raise error.Abort(
501 490 _(b'detected corrupt lfs object: %s') % oid,
502 491 hint=_(b'run hg verify'),
503 492 )
504 493
505 494 for k, v in headers:
506 495 request.add_header(pycompat.strurl(k), pycompat.strurl(v))
507 496
508 497 try:
509 498 if action == b'upload':
510 request.data = lfsuploadfile(localstore.open(oid))
499 request.data = lfsuploadfile(self.ui, localstore.path(oid))
511 500 request.get_method = lambda: 'PUT'
512 501 request.add_header('Content-Type', 'application/octet-stream')
513 request.add_header('Content-Length', len(request.data))
502 request.add_header('Content-Length', request.data.length)
514 503
515 504 with contextlib.closing(self.urlopener.open(request)) as res:
516 505 contentlength = res.info().get(b"content-length")
517 506 ui = self.ui # Shorten debug lines
518 507 if self.ui.debugflag:
519 508 ui.debug(b'Status: %d\n' % res.status)
520 509 # lfs-test-server and hg serve return headers in different
521 510 # order
522 511 headers = pycompat.bytestr(res.info()).strip()
523 512 ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
524 513
525 514 if action == b'download':
526 515 # If downloading blobs, store downloaded data to local
527 516 # blobstore
528 517 localstore.download(oid, res, contentlength)
529 518 else:
530 519 blocks = []
531 520 while True:
532 521 data = res.read(1048576)
533 522 if not data:
534 523 break
535 524 blocks.append(data)
536 525
537 526 response = b"".join(blocks)
538 527 if response:
539 528 ui.debug(b'lfs %s response: %s' % (action, response))
540 529 except util.urlerr.httperror as ex:
541 530 if self.ui.debugflag:
542 531 self.ui.debug(
543 532 b'%s: %s\n' % (oid, ex.read())
544 533 ) # XXX: also bytes?
545 534 raise LfsRemoteError(
546 535 _(b'LFS HTTP error: %s (oid=%s, action=%s)')
547 536 % (stringutil.forcebytestr(ex), oid, action)
548 537 )
549 538 except util.urlerr.urlerror as ex:
550 539 hint = _(b'attempted connection to %s') % pycompat.bytesurl(
551 540 util.urllibcompat.getfullurl(request)
552 541 )
553 542 raise LfsRemoteError(
554 543 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
555 544 )
556 545 finally:
557 546 if request.data:
558 547 request.data.close()
559 548
560 549 def _batch(self, pointers, localstore, action):
561 550 if action not in [b'upload', b'download']:
562 551 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
563 552
564 553 response = self._batchrequest(pointers, action)
565 554 objects = self._extractobjects(response, pointers, action)
566 555 total = sum(x.get(b'size', 0) for x in objects)
567 556 sizes = {}
568 557 for obj in objects:
569 558 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
570 559 topic = {
571 560 b'upload': _(b'lfs uploading'),
572 561 b'download': _(b'lfs downloading'),
573 562 }[action]
574 563 if len(objects) > 1:
575 564 self.ui.note(
576 565 _(b'lfs: need to transfer %d objects (%s)\n')
577 566 % (len(objects), util.bytecount(total))
578 567 )
579 568
580 569 def transfer(chunk):
581 570 for obj in chunk:
582 571 objsize = obj.get(b'size', 0)
583 572 if self.ui.verbose:
584 573 if action == b'download':
585 574 msg = _(b'lfs: downloading %s (%s)\n')
586 575 elif action == b'upload':
587 576 msg = _(b'lfs: uploading %s (%s)\n')
588 577 self.ui.note(
589 578 msg % (obj.get(b'oid'), util.bytecount(objsize))
590 579 )
591 580 retry = self.retry
592 581 while True:
593 582 try:
594 583 self._basictransfer(obj, action, localstore)
595 584 yield 1, obj.get(b'oid')
596 585 break
597 586 except socket.error as ex:
598 587 if retry > 0:
599 588 self.ui.note(
600 589 _(b'lfs: failed: %r (remaining retry %d)\n')
601 590 % (stringutil.forcebytestr(ex), retry)
602 591 )
603 592 retry -= 1
604 593 continue
605 594 raise
606 595
607 596 # Until https multiplexing gets sorted out
608 597 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
609 598 oids = worker.worker(
610 599 self.ui,
611 600 0.1,
612 601 transfer,
613 602 (),
614 603 sorted(objects, key=lambda o: o.get(b'oid')),
615 604 )
616 605 else:
617 606 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
618 607
619 608 with self.ui.makeprogress(
620 609 topic, unit=_(b"bytes"), total=total
621 610 ) as progress:
622 611 progress.update(0)
623 612 processed = 0
624 613 blobs = 0
625 614 for _one, oid in oids:
626 615 processed += sizes[oid]
627 616 blobs += 1
628 617 progress.update(processed)
629 618 self.ui.note(_(b'lfs: processed: %s\n') % oid)
630 619
631 620 if blobs > 0:
632 621 if action == b'upload':
633 622 self.ui.status(
634 623 _(b'lfs: uploaded %d files (%s)\n')
635 624 % (blobs, util.bytecount(processed))
636 625 )
637 626 elif action == b'download':
638 627 self.ui.status(
639 628 _(b'lfs: downloaded %d files (%s)\n')
640 629 % (blobs, util.bytecount(processed))
641 630 )
642 631
643 632 def __del__(self):
644 633 # copied from mercurial/httppeer.py
645 634 urlopener = getattr(self, 'urlopener', None)
646 635 if urlopener:
647 636 for h in urlopener.handlers:
648 637 h.close()
649 638 getattr(h, "close_all", lambda: None)()
650 639
651 640
652 641 class _dummyremote(object):
653 642 """Dummy store storing blobs to temp directory."""
654 643
655 644 def __init__(self, repo, url):
656 645 fullpath = repo.vfs.join(b'lfs', url.path)
657 646 self.vfs = lfsvfs(fullpath)
658 647
659 648 def writebatch(self, pointers, fromstore):
660 649 for p in _deduplicate(pointers):
661 650 content = fromstore.read(p.oid(), verify=True)
662 651 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
663 652 fp.write(content)
664 653
665 654 def readbatch(self, pointers, tostore):
666 655 for p in _deduplicate(pointers):
667 656 with self.vfs(p.oid(), b'rb') as fp:
668 657 tostore.download(p.oid(), fp, None)
669 658
670 659
671 660 class _nullremote(object):
672 661 """Null store storing blobs to /dev/null."""
673 662
674 663 def __init__(self, repo, url):
675 664 pass
676 665
677 666 def writebatch(self, pointers, fromstore):
678 667 pass
679 668
680 669 def readbatch(self, pointers, tostore):
681 670 pass
682 671
683 672
684 673 class _promptremote(object):
685 674 """Prompt user to set lfs.url when accessed."""
686 675
687 676 def __init__(self, repo, url):
688 677 pass
689 678
690 679 def writebatch(self, pointers, fromstore, ui=None):
691 680 self._prompt()
692 681
693 682 def readbatch(self, pointers, tostore, ui=None):
694 683 self._prompt()
695 684
696 685 def _prompt(self):
697 686 raise error.Abort(_(b'lfs.url needs to be configured'))
698 687
699 688
700 689 _storemap = {
701 690 b'https': _gitlfsremote,
702 691 b'http': _gitlfsremote,
703 692 b'file': _dummyremote,
704 693 b'null': _nullremote,
705 694 None: _promptremote,
706 695 }
707 696
708 697
709 698 def _deduplicate(pointers):
710 699 """Remove any duplicate oids that exist in the list"""
711 700 reduced = util.sortdict()
712 701 for p in pointers:
713 702 reduced[p.oid()] = p
714 703 return reduced.values()
715 704
716 705
717 706 def _verify(oid, content):
718 707 realoid = node.hex(hashlib.sha256(content).digest())
719 708 if realoid != oid:
720 709 raise LfsCorruptionError(
721 710 _(b'detected corrupt lfs object: %s') % oid,
722 711 hint=_(b'run hg verify'),
723 712 )
724 713
725 714
726 715 def remote(repo, remote=None):
727 716 """remotestore factory. return a store in _storemap depending on config
728 717
729 718 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
730 719 infer the endpoint, based on the remote repository using the same path
731 720 adjustments as git. As an extension, 'http' is supported as well so that
732 721 ``hg serve`` works out of the box.
733 722
734 723 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
735 724 """
736 725 lfsurl = repo.ui.config(b'lfs', b'url')
737 726 url = util.url(lfsurl or b'')
738 727 if lfsurl is None:
739 728 if remote:
740 729 path = remote
741 730 elif util.safehasattr(repo, b'_subtoppath'):
742 731 # The pull command sets this during the optional update phase, which
743 732 # tells exactly where the pull originated, whether 'paths.default'
744 733 # or explicit.
745 734 path = repo._subtoppath
746 735 else:
747 736 # TODO: investigate 'paths.remote:lfsurl' style path customization,
748 737 # and fall back to inferring from 'paths.remote' if unspecified.
749 738 path = repo.ui.config(b'paths', b'default') or b''
750 739
751 740 defaulturl = util.url(path)
752 741
753 742 # TODO: support local paths as well.
754 743 # TODO: consider the ssh -> https transformation that git applies
755 744 if defaulturl.scheme in (b'http', b'https'):
756 745 if defaulturl.path and defaulturl.path[:-1] != b'/':
757 746 defaulturl.path += b'/'
758 747 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
759 748
760 749 url = util.url(bytes(defaulturl))
761 750 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
762 751
763 752 scheme = url.scheme
764 753 if scheme not in _storemap:
765 754 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
766 755 return _storemap[scheme](repo, url)
767 756
768 757
769 758 class LfsRemoteError(error.StorageError):
770 759 pass
771 760
772 761
773 762 class LfsCorruptionError(error.Abort):
774 763 """Raised when a corrupt blob is detected, aborting an operation
775 764
776 765 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now