##// END OF EJS Templates
lfs: handle paths that don't end with '/' when inferring the blob store...
Matt Harbison -
r37583:9c7a25ef default
parent child Browse files
Show More
@@ -1,570 +1,575
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import hashlib
12 12 import json
13 13 import os
14 14 import re
15 15 import socket
16 16
17 17 from mercurial.i18n import _
18 18
19 19 from mercurial import (
20 20 error,
21 21 pathutil,
22 22 pycompat,
23 23 url as urlmod,
24 24 util,
25 25 vfs as vfsmod,
26 26 worker,
27 27 )
28 28
29 29 from ..largefiles import lfutil
30 30
31 31 # 64 bytes for SHA256
32 32 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
33 33
34 34 class lfsvfs(vfsmod.vfs):
35 35 def join(self, path):
36 36 """split the path at first two characters, like: XX/XXXXX..."""
37 37 if not _lfsre.match(path):
38 38 raise error.ProgrammingError('unexpected lfs path: %s' % path)
39 39 return super(lfsvfs, self).join(path[0:2], path[2:])
40 40
41 41 def walk(self, path=None, onerror=None):
42 42 """Yield (dirpath, [], oids) tuple for blobs under path
43 43
44 44 Oids only exist in the root of this vfs, so dirpath is always ''.
45 45 """
46 46 root = os.path.normpath(self.base)
47 47 # when dirpath == root, dirpath[prefixlen:] becomes empty
48 48 # because len(dirpath) < prefixlen.
49 49 prefixlen = len(pathutil.normasprefix(root))
50 50 oids = []
51 51
52 52 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
53 53 onerror=onerror):
54 54 dirpath = dirpath[prefixlen:]
55 55
56 56 # Silently skip unexpected files and directories
57 57 if len(dirpath) == 2:
58 58 oids.extend([dirpath + f for f in files
59 59 if _lfsre.match(dirpath + f)])
60 60
61 61 yield ('', [], oids)
62 62
63 63 class nullvfs(lfsvfs):
64 64 def __init__(self):
65 65 pass
66 66
67 67 def exists(self, oid):
68 68 return False
69 69
70 70 def read(self, oid):
71 71 # store.read() calls into here if the blob doesn't exist in its
72 72 # self.vfs. Raise the same error as a normal vfs when asked to read a
73 73 # file that doesn't exist. The only difference is the full file path
74 74 # isn't available in the error.
75 75 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
76 76
77 77 def walk(self, path=None, onerror=None):
78 78 return ('', [], [])
79 79
80 80 def write(self, oid, data):
81 81 pass
82 82
83 83 class filewithprogress(object):
84 84 """a file-like object that supports __len__ and read.
85 85
86 86 Useful to provide progress information for how many bytes are read.
87 87 """
88 88
89 89 def __init__(self, fp, callback):
90 90 self._fp = fp
91 91 self._callback = callback # func(readsize)
92 92 fp.seek(0, os.SEEK_END)
93 93 self._len = fp.tell()
94 94 fp.seek(0)
95 95
96 96 def __len__(self):
97 97 return self._len
98 98
99 99 def read(self, size):
100 100 if self._fp is None:
101 101 return b''
102 102 data = self._fp.read(size)
103 103 if data:
104 104 if self._callback:
105 105 self._callback(len(data))
106 106 else:
107 107 self._fp.close()
108 108 self._fp = None
109 109 return data
110 110
111 111 class local(object):
112 112 """Local blobstore for large file contents.
113 113
114 114 This blobstore is used both as a cache and as a staging area for large blobs
115 115 to be uploaded to the remote blobstore.
116 116 """
117 117
118 118 def __init__(self, repo):
119 119 fullpath = repo.svfs.join('lfs/objects')
120 120 self.vfs = lfsvfs(fullpath)
121 121
122 122 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
123 123 self.cachevfs = nullvfs()
124 124 else:
125 125 usercache = lfutil._usercachedir(repo.ui, 'lfs')
126 126 self.cachevfs = lfsvfs(usercache)
127 127 self.ui = repo.ui
128 128
129 129 def open(self, oid):
130 130 """Open a read-only file descriptor to the named blob, in either the
131 131 usercache or the local store."""
132 132 # The usercache is the most likely place to hold the file. Commit will
133 133 # write to both it and the local store, as will anything that downloads
134 134 # the blobs. However, things like clone without an update won't
135 135 # populate the local store. For an init + push of a local clone,
136 136 # the usercache is the only place it _could_ be. If not present, the
137 137 # missing file msg here will indicate the local repo, not the usercache.
138 138 if self.cachevfs.exists(oid):
139 139 return self.cachevfs(oid, 'rb')
140 140
141 141 return self.vfs(oid, 'rb')
142 142
143 143 def download(self, oid, src):
144 144 """Read the blob from the remote source in chunks, verify the content,
145 145 and write to this local blobstore."""
146 146 sha256 = hashlib.sha256()
147 147
148 148 with self.vfs(oid, 'wb', atomictemp=True) as fp:
149 149 for chunk in util.filechunkiter(src, size=1048576):
150 150 fp.write(chunk)
151 151 sha256.update(chunk)
152 152
153 153 realoid = sha256.hexdigest()
154 154 if realoid != oid:
155 155 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
156 156
157 157 self._linktousercache(oid)
158 158
159 159 def write(self, oid, data):
160 160 """Write blob to local blobstore.
161 161
162 162 This should only be called from the filelog during a commit or similar.
163 163 As such, there is no need to verify the data. Imports from a remote
164 164 store must use ``download()`` instead."""
165 165 with self.vfs(oid, 'wb', atomictemp=True) as fp:
166 166 fp.write(data)
167 167
168 168 self._linktousercache(oid)
169 169
170 170 def _linktousercache(self, oid):
171 171 # XXX: should we verify the content of the cache, and hardlink back to
172 172 # the local store on success, but truncate, write and link on failure?
173 173 if (not self.cachevfs.exists(oid)
174 174 and not isinstance(self.cachevfs, nullvfs)):
175 175 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
176 176 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
177 177
178 178 def read(self, oid, verify=True):
179 179 """Read blob from local blobstore."""
180 180 if not self.vfs.exists(oid):
181 181 blob = self._read(self.cachevfs, oid, verify)
182 182
183 183 # Even if revlog will verify the content, it needs to be verified
184 184 # now before making the hardlink to avoid propagating corrupt blobs.
185 185 # Don't abort if corruption is detected, because `hg verify` will
186 186 # give more useful info about the corruption- simply don't add the
187 187 # hardlink.
188 188 if verify or hashlib.sha256(blob).hexdigest() == oid:
189 189 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
190 190 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
191 191 else:
192 192 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
193 193 blob = self._read(self.vfs, oid, verify)
194 194 return blob
195 195
196 196 def _read(self, vfs, oid, verify):
197 197 """Read blob (after verifying) from the given store"""
198 198 blob = vfs.read(oid)
199 199 if verify:
200 200 _verify(oid, blob)
201 201 return blob
202 202
203 203 def verify(self, oid):
204 204 """Indicate whether or not the hash of the underlying file matches its
205 205 name."""
206 206 sha256 = hashlib.sha256()
207 207
208 208 with self.open(oid) as fp:
209 209 for chunk in util.filechunkiter(fp, size=1048576):
210 210 sha256.update(chunk)
211 211
212 212 return oid == sha256.hexdigest()
213 213
214 214 def has(self, oid):
215 215 """Returns True if the local blobstore contains the requested blob,
216 216 False otherwise."""
217 217 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
218 218
219 219 class _gitlfsremote(object):
220 220
221 221 def __init__(self, repo, url):
222 222 ui = repo.ui
223 223 self.ui = ui
224 224 baseurl, authinfo = url.authinfo()
225 225 self.baseurl = baseurl.rstrip('/')
226 226 useragent = repo.ui.config('experimental', 'lfs.user-agent')
227 227 if not useragent:
228 228 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
229 229 self.urlopener = urlmod.opener(ui, authinfo, useragent)
230 230 self.retry = ui.configint('lfs', 'retry')
231 231
232 232 def writebatch(self, pointers, fromstore):
233 233 """Batch upload from local to remote blobstore."""
234 234 self._batch(_deduplicate(pointers), fromstore, 'upload')
235 235
236 236 def readbatch(self, pointers, tostore):
237 237 """Batch download from remote to local blostore."""
238 238 self._batch(_deduplicate(pointers), tostore, 'download')
239 239
240 240 def _batchrequest(self, pointers, action):
241 241 """Get metadata about objects pointed by pointers for given action
242 242
243 243 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
244 244 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
245 245 """
246 246 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
247 247 requestdata = json.dumps({
248 248 'objects': objects,
249 249 'operation': action,
250 250 })
251 251 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
252 252 data=requestdata)
253 253 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
254 254 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
255 255 try:
256 256 rsp = self.urlopener.open(batchreq)
257 257 rawjson = rsp.read()
258 258 except util.urlerr.httperror as ex:
259 259 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
260 260 % (ex, action))
261 261 try:
262 262 response = json.loads(rawjson)
263 263 except ValueError:
264 264 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
265 265 % rawjson)
266 266
267 267 if self.ui.debugflag:
268 268 self.ui.debug('Status: %d\n' % rsp.status)
269 269 # lfs-test-server and hg serve return headers in different order
270 270 self.ui.debug('%s\n'
271 271 % '\n'.join(sorted(str(rsp.info()).splitlines())))
272 272
273 273 if 'objects' in response:
274 274 response['objects'] = sorted(response['objects'],
275 275 key=lambda p: p['oid'])
276 276 self.ui.debug('%s\n'
277 277 % json.dumps(response, indent=2,
278 278 separators=('', ': '), sort_keys=True))
279 279
280 280 return response
281 281
282 282 def _checkforservererror(self, pointers, responses, action):
283 283 """Scans errors from objects
284 284
285 285 Raises LfsRemoteError if any objects have an error"""
286 286 for response in responses:
287 287 # The server should return 404 when objects cannot be found. Some
288 288 # server implementation (ex. lfs-test-server) does not set "error"
289 289 # but just removes "download" from "actions". Treat that case
290 290 # as the same as 404 error.
291 291 if 'error' not in response:
292 292 if (action == 'download'
293 293 and action not in response.get('actions', [])):
294 294 code = 404
295 295 else:
296 296 continue
297 297 else:
298 298 # An error dict without a code doesn't make much sense, so
299 299 # treat as a server error.
300 300 code = response.get('error').get('code', 500)
301 301
302 302 ptrmap = {p.oid(): p for p in pointers}
303 303 p = ptrmap.get(response['oid'], None)
304 304 if p:
305 305 filename = getattr(p, 'filename', 'unknown')
306 306 errors = {
307 307 404: 'The object does not exist',
308 308 410: 'The object was removed by the owner',
309 309 422: 'Validation error',
310 310 500: 'Internal server error',
311 311 }
312 312 msg = errors.get(code, 'status code %d' % code)
313 313 raise LfsRemoteError(_('LFS server error for "%s": %s')
314 314 % (filename, msg))
315 315 else:
316 316 raise LfsRemoteError(
317 317 _('LFS server error. Unsolicited response for oid %s')
318 318 % response['oid'])
319 319
320 320 def _extractobjects(self, response, pointers, action):
321 321 """extract objects from response of the batch API
322 322
323 323 response: parsed JSON object returned by batch API
324 324 return response['objects'] filtered by action
325 325 raise if any object has an error
326 326 """
327 327 # Scan errors from objects - fail early
328 328 objects = response.get('objects', [])
329 329 self._checkforservererror(pointers, objects, action)
330 330
331 331 # Filter objects with given action. Practically, this skips uploading
332 332 # objects which exist in the server.
333 333 filteredobjects = [o for o in objects if action in o.get('actions', [])]
334 334
335 335 return filteredobjects
336 336
337 337 def _basictransfer(self, obj, action, localstore):
338 338 """Download or upload a single object using basic transfer protocol
339 339
340 340 obj: dict, an object description returned by batch API
341 341 action: string, one of ['upload', 'download']
342 342 localstore: blobstore.local
343 343
344 344 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
345 345 basic-transfers.md
346 346 """
347 347 oid = pycompat.bytestr(obj['oid'])
348 348
349 349 href = pycompat.bytestr(obj['actions'][action].get('href'))
350 350 headers = obj['actions'][action].get('header', {}).items()
351 351
352 352 request = util.urlreq.request(href)
353 353 if action == 'upload':
354 354 # If uploading blobs, read data from local blobstore.
355 355 if not localstore.verify(oid):
356 356 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
357 357 hint=_('run hg verify'))
358 358 request.data = filewithprogress(localstore.open(oid), None)
359 359 request.get_method = lambda: 'PUT'
360 360 request.add_header('Content-Type', 'application/octet-stream')
361 361
362 362 for k, v in headers:
363 363 request.add_header(k, v)
364 364
365 365 response = b''
366 366 try:
367 367 req = self.urlopener.open(request)
368 368
369 369 if self.ui.debugflag:
370 370 self.ui.debug('Status: %d\n' % req.status)
371 371 # lfs-test-server and hg serve return headers in different order
372 372 self.ui.debug('%s\n'
373 373 % '\n'.join(sorted(str(req.info()).splitlines())))
374 374
375 375 if action == 'download':
376 376 # If downloading blobs, store downloaded data to local blobstore
377 377 localstore.download(oid, req)
378 378 else:
379 379 while True:
380 380 data = req.read(1048576)
381 381 if not data:
382 382 break
383 383 response += data
384 384 if response:
385 385 self.ui.debug('lfs %s response: %s' % (action, response))
386 386 except util.urlerr.httperror as ex:
387 387 if self.ui.debugflag:
388 388 self.ui.debug('%s: %s\n' % (oid, ex.read()))
389 389 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
390 390 % (ex, oid, action))
391 391
392 392 def _batch(self, pointers, localstore, action):
393 393 if action not in ['upload', 'download']:
394 394 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
395 395
396 396 response = self._batchrequest(pointers, action)
397 397 objects = self._extractobjects(response, pointers, action)
398 398 total = sum(x.get('size', 0) for x in objects)
399 399 sizes = {}
400 400 for obj in objects:
401 401 sizes[obj.get('oid')] = obj.get('size', 0)
402 402 topic = {'upload': _('lfs uploading'),
403 403 'download': _('lfs downloading')}[action]
404 404 if len(objects) > 1:
405 405 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
406 406 % (len(objects), util.bytecount(total)))
407 407 self.ui.progress(topic, 0, total=total)
408 408 def transfer(chunk):
409 409 for obj in chunk:
410 410 objsize = obj.get('size', 0)
411 411 if self.ui.verbose:
412 412 if action == 'download':
413 413 msg = _('lfs: downloading %s (%s)\n')
414 414 elif action == 'upload':
415 415 msg = _('lfs: uploading %s (%s)\n')
416 416 self.ui.note(msg % (obj.get('oid'),
417 417 util.bytecount(objsize)))
418 418 retry = self.retry
419 419 while True:
420 420 try:
421 421 self._basictransfer(obj, action, localstore)
422 422 yield 1, obj.get('oid')
423 423 break
424 424 except socket.error as ex:
425 425 if retry > 0:
426 426 self.ui.note(
427 427 _('lfs: failed: %r (remaining retry %d)\n')
428 428 % (ex, retry))
429 429 retry -= 1
430 430 continue
431 431 raise
432 432
433 433 # Until https multiplexing gets sorted out
434 434 if self.ui.configbool('experimental', 'lfs.worker-enable'):
435 435 oids = worker.worker(self.ui, 0.1, transfer, (),
436 436 sorted(objects, key=lambda o: o.get('oid')))
437 437 else:
438 438 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
439 439
440 440 processed = 0
441 441 blobs = 0
442 442 for _one, oid in oids:
443 443 processed += sizes[oid]
444 444 blobs += 1
445 445 self.ui.progress(topic, processed, total=total)
446 446 self.ui.note(_('lfs: processed: %s\n') % oid)
447 447 self.ui.progress(topic, pos=None, total=total)
448 448
449 449 if blobs > 0:
450 450 if action == 'upload':
451 451 self.ui.status(_('lfs: uploaded %d files (%s)\n')
452 452 % (blobs, util.bytecount(processed)))
453 453 # TODO: coalesce the download requests, and comment this in
454 454 #elif action == 'download':
455 455 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
456 456 # % (blobs, util.bytecount(processed)))
457 457
458 458 def __del__(self):
459 459 # copied from mercurial/httppeer.py
460 460 urlopener = getattr(self, 'urlopener', None)
461 461 if urlopener:
462 462 for h in urlopener.handlers:
463 463 h.close()
464 464 getattr(h, "close_all", lambda : None)()
465 465
466 466 class _dummyremote(object):
467 467 """Dummy store storing blobs to temp directory."""
468 468
469 469 def __init__(self, repo, url):
470 470 fullpath = repo.vfs.join('lfs', url.path)
471 471 self.vfs = lfsvfs(fullpath)
472 472
473 473 def writebatch(self, pointers, fromstore):
474 474 for p in _deduplicate(pointers):
475 475 content = fromstore.read(p.oid(), verify=True)
476 476 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
477 477 fp.write(content)
478 478
479 479 def readbatch(self, pointers, tostore):
480 480 for p in _deduplicate(pointers):
481 481 with self.vfs(p.oid(), 'rb') as fp:
482 482 tostore.download(p.oid(), fp)
483 483
484 484 class _nullremote(object):
485 485 """Null store storing blobs to /dev/null."""
486 486
487 487 def __init__(self, repo, url):
488 488 pass
489 489
490 490 def writebatch(self, pointers, fromstore):
491 491 pass
492 492
493 493 def readbatch(self, pointers, tostore):
494 494 pass
495 495
496 496 class _promptremote(object):
497 497 """Prompt user to set lfs.url when accessed."""
498 498
499 499 def __init__(self, repo, url):
500 500 pass
501 501
502 502 def writebatch(self, pointers, fromstore, ui=None):
503 503 self._prompt()
504 504
505 505 def readbatch(self, pointers, tostore, ui=None):
506 506 self._prompt()
507 507
508 508 def _prompt(self):
509 509 raise error.Abort(_('lfs.url needs to be configured'))
510 510
511 511 _storemap = {
512 512 'https': _gitlfsremote,
513 513 'http': _gitlfsremote,
514 514 'file': _dummyremote,
515 515 'null': _nullremote,
516 516 None: _promptremote,
517 517 }
518 518
519 519 def _deduplicate(pointers):
520 520 """Remove any duplicate oids that exist in the list"""
521 521 reduced = util.sortdict()
522 522 for p in pointers:
523 523 reduced[p.oid()] = p
524 524 return reduced.values()
525 525
526 526 def _verify(oid, content):
527 527 realoid = hashlib.sha256(content).hexdigest()
528 528 if realoid != oid:
529 529 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
530 530 hint=_('run hg verify'))
531 531
532 532 def remote(repo, remote=None):
533 533 """remotestore factory. return a store in _storemap depending on config
534 534
535 535 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
536 536 infer the endpoint, based on the remote repository using the same path
537 537 adjustments as git. As an extension, 'http' is supported as well so that
538 538 ``hg serve`` works out of the box.
539 539
540 540 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
541 541 """
542 url = util.url(repo.ui.config('lfs', 'url') or '')
543 if url.scheme is None:
542 lfsurl = repo.ui.config('lfs', 'url')
543 url = util.url(lfsurl or '')
544 if lfsurl is None:
544 545 if remote:
545 defaulturl = util.url(remote)
546 path = remote
546 547 elif util.safehasattr(repo, '_subtoppath'):
547 548 # The pull command sets this during the optional update phase, which
548 549 # tells exactly where the pull originated, whether 'paths.default'
549 550 # or explicit.
550 defaulturl = util.url(repo._subtoppath)
551 path = repo._subtoppath
551 552 else:
552 553 # TODO: investigate 'paths.remote:lfsurl' style path customization,
553 554 # and fall back to inferring from 'paths.remote' if unspecified.
554 defaulturl = util.url(repo.ui.config('paths', 'default') or b'')
555 path = repo.ui.config('paths', 'default') or ''
556
557 defaulturl = util.url(path)
555 558
556 559 # TODO: support local paths as well.
557 560 # TODO: consider the ssh -> https transformation that git applies
558 561 if defaulturl.scheme in (b'http', b'https'):
562 if defaulturl.path and defaulturl.path[:-1] != b'/':
563 defaulturl.path += b'/'
559 564 defaulturl.path = defaulturl.path or b'' + b'.git/info/lfs'
560 565
561 566 url = util.url(bytes(defaulturl))
562 567 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
563 568
564 569 scheme = url.scheme
565 570 if scheme not in _storemap:
566 571 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
567 572 return _storemap[scheme](repo, url)
568 573
569 574 class LfsRemoteError(error.RevlogError):
570 575 pass
General Comments 0
You need to be logged in to leave comments. Login now