##// END OF EJS Templates
lfs: add the 'Content-Type' header called out in the file transfer spec...
Matt Harbison -
r37260:3e293808 default
parent child Browse files
Show More
@@ -1,515 +1,516
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import json
12 12 import os
13 13 import re
14 14 import socket
15 15
16 16 from mercurial.i18n import _
17 17
18 18 from mercurial import (
19 19 error,
20 20 pathutil,
21 21 pycompat,
22 22 url as urlmod,
23 23 util,
24 24 vfs as vfsmod,
25 25 worker,
26 26 )
27 27
28 28 from ..largefiles import lfutil
29 29
30 30 # 64 bytes for SHA256
31 31 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
32 32
33 33 class lfsvfs(vfsmod.vfs):
34 34 def join(self, path):
35 35 """split the path at first two characters, like: XX/XXXXX..."""
36 36 if not _lfsre.match(path):
37 37 raise error.ProgrammingError('unexpected lfs path: %s' % path)
38 38 return super(lfsvfs, self).join(path[0:2], path[2:])
39 39
40 40 def walk(self, path=None, onerror=None):
41 41 """Yield (dirpath, [], oids) tuple for blobs under path
42 42
43 43 Oids only exist in the root of this vfs, so dirpath is always ''.
44 44 """
45 45 root = os.path.normpath(self.base)
46 46 # when dirpath == root, dirpath[prefixlen:] becomes empty
47 47 # because len(dirpath) < prefixlen.
48 48 prefixlen = len(pathutil.normasprefix(root))
49 49 oids = []
50 50
51 51 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
52 52 onerror=onerror):
53 53 dirpath = dirpath[prefixlen:]
54 54
55 55 # Silently skip unexpected files and directories
56 56 if len(dirpath) == 2:
57 57 oids.extend([dirpath + f for f in files
58 58 if _lfsre.match(dirpath + f)])
59 59
60 60 yield ('', [], oids)
61 61
62 62 class filewithprogress(object):
63 63 """a file-like object that supports __len__ and read.
64 64
65 65 Useful to provide progress information for how many bytes are read.
66 66 """
67 67
68 68 def __init__(self, fp, callback):
69 69 self._fp = fp
70 70 self._callback = callback # func(readsize)
71 71 fp.seek(0, os.SEEK_END)
72 72 self._len = fp.tell()
73 73 fp.seek(0)
74 74
75 75 def __len__(self):
76 76 return self._len
77 77
78 78 def read(self, size):
79 79 if self._fp is None:
80 80 return b''
81 81 data = self._fp.read(size)
82 82 if data:
83 83 if self._callback:
84 84 self._callback(len(data))
85 85 else:
86 86 self._fp.close()
87 87 self._fp = None
88 88 return data
89 89
90 90 class local(object):
91 91 """Local blobstore for large file contents.
92 92
93 93 This blobstore is used both as a cache and as a staging area for large blobs
94 94 to be uploaded to the remote blobstore.
95 95 """
96 96
97 97 def __init__(self, repo):
98 98 fullpath = repo.svfs.join('lfs/objects')
99 99 self.vfs = lfsvfs(fullpath)
100 100 usercache = lfutil._usercachedir(repo.ui, 'lfs')
101 101 self.cachevfs = lfsvfs(usercache)
102 102 self.ui = repo.ui
103 103
104 104 def open(self, oid):
105 105 """Open a read-only file descriptor to the named blob, in either the
106 106 usercache or the local store."""
107 107 # The usercache is the most likely place to hold the file. Commit will
108 108 # write to both it and the local store, as will anything that downloads
109 109 # the blobs. However, things like clone without an update won't
110 110 # populate the local store. For an init + push of a local clone,
111 111 # the usercache is the only place it _could_ be. If not present, the
112 112 # missing file msg here will indicate the local repo, not the usercache.
113 113 if self.cachevfs.exists(oid):
114 114 return self.cachevfs(oid, 'rb')
115 115
116 116 return self.vfs(oid, 'rb')
117 117
118 118 def download(self, oid, src):
119 119 """Read the blob from the remote source in chunks, verify the content,
120 120 and write to this local blobstore."""
121 121 sha256 = hashlib.sha256()
122 122
123 123 with self.vfs(oid, 'wb', atomictemp=True) as fp:
124 124 for chunk in util.filechunkiter(src, size=1048576):
125 125 fp.write(chunk)
126 126 sha256.update(chunk)
127 127
128 128 realoid = sha256.hexdigest()
129 129 if realoid != oid:
130 130 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
131 131
132 132 # XXX: should we verify the content of the cache, and hardlink back to
133 133 # the local store on success, but truncate, write and link on failure?
134 134 if not self.cachevfs.exists(oid):
135 135 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
136 136 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
137 137
138 138 def write(self, oid, data):
139 139 """Write blob to local blobstore.
140 140
141 141 This should only be called from the filelog during a commit or similar.
142 142 As such, there is no need to verify the data. Imports from a remote
143 143 store must use ``download()`` instead."""
144 144 with self.vfs(oid, 'wb', atomictemp=True) as fp:
145 145 fp.write(data)
146 146
147 147 # XXX: should we verify the content of the cache, and hardlink back to
148 148 # the local store on success, but truncate, write and link on failure?
149 149 if not self.cachevfs.exists(oid):
150 150 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
151 151 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
152 152
153 153 def read(self, oid, verify=True):
154 154 """Read blob from local blobstore."""
155 155 if not self.vfs.exists(oid):
156 156 blob = self._read(self.cachevfs, oid, verify)
157 157
158 158 # Even if revlog will verify the content, it needs to be verified
159 159 # now before making the hardlink to avoid propagating corrupt blobs.
160 160 # Don't abort if corruption is detected, because `hg verify` will
161 161 # give more useful info about the corruption- simply don't add the
162 162 # hardlink.
163 163 if verify or hashlib.sha256(blob).hexdigest() == oid:
164 164 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
165 165 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
166 166 else:
167 167 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
168 168 blob = self._read(self.vfs, oid, verify)
169 169 return blob
170 170
171 171 def _read(self, vfs, oid, verify):
172 172 """Read blob (after verifying) from the given store"""
173 173 blob = vfs.read(oid)
174 174 if verify:
175 175 _verify(oid, blob)
176 176 return blob
177 177
178 178 def verify(self, oid):
179 179 """Indicate whether or not the hash of the underlying file matches its
180 180 name."""
181 181 sha256 = hashlib.sha256()
182 182
183 183 with self.open(oid) as fp:
184 184 for chunk in util.filechunkiter(fp, size=1048576):
185 185 sha256.update(chunk)
186 186
187 187 return oid == sha256.hexdigest()
188 188
189 189 def has(self, oid):
190 190 """Returns True if the local blobstore contains the requested blob,
191 191 False otherwise."""
192 192 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
193 193
194 194 class _gitlfsremote(object):
195 195
196 196 def __init__(self, repo, url):
197 197 ui = repo.ui
198 198 self.ui = ui
199 199 baseurl, authinfo = url.authinfo()
200 200 self.baseurl = baseurl.rstrip('/')
201 201 useragent = repo.ui.config('experimental', 'lfs.user-agent')
202 202 if not useragent:
203 203 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
204 204 self.urlopener = urlmod.opener(ui, authinfo, useragent)
205 205 self.retry = ui.configint('lfs', 'retry')
206 206
207 207 def writebatch(self, pointers, fromstore):
208 208 """Batch upload from local to remote blobstore."""
209 209 self._batch(_deduplicate(pointers), fromstore, 'upload')
210 210
211 211 def readbatch(self, pointers, tostore):
212 212 """Batch download from remote to local blostore."""
213 213 self._batch(_deduplicate(pointers), tostore, 'download')
214 214
215 215 def _batchrequest(self, pointers, action):
216 216 """Get metadata about objects pointed by pointers for given action
217 217
218 218 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
219 219 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
220 220 """
221 221 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
222 222 requestdata = json.dumps({
223 223 'objects': objects,
224 224 'operation': action,
225 225 })
226 226 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
227 227 data=requestdata)
228 228 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
229 229 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
230 230 try:
231 231 rsp = self.urlopener.open(batchreq)
232 232 rawjson = rsp.read()
233 233 except util.urlerr.httperror as ex:
234 234 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
235 235 % (ex, action))
236 236 try:
237 237 response = json.loads(rawjson)
238 238 except ValueError:
239 239 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
240 240 % rawjson)
241 241
242 242 if self.ui.debugflag:
243 243 self.ui.debug('Status: %d\n' % rsp.status)
244 244 # lfs-test-server and hg serve return headers in different order
245 245 self.ui.debug('%s\n'
246 246 % '\n'.join(sorted(str(rsp.info()).splitlines())))
247 247
248 248 if 'objects' in response:
249 249 response['objects'] = sorted(response['objects'],
250 250 key=lambda p: p['oid'])
251 251 self.ui.debug('%s\n'
252 252 % json.dumps(response, indent=2,
253 253 separators=('', ': '), sort_keys=True))
254 254
255 255 return response
256 256
257 257 def _checkforservererror(self, pointers, responses, action):
258 258 """Scans errors from objects
259 259
260 260 Raises LfsRemoteError if any objects have an error"""
261 261 for response in responses:
262 262 # The server should return 404 when objects cannot be found. Some
263 263 # server implementation (ex. lfs-test-server) does not set "error"
264 264 # but just removes "download" from "actions". Treat that case
265 265 # as the same as 404 error.
266 266 if 'error' not in response:
267 267 if (action == 'download'
268 268 and action not in response.get('actions', [])):
269 269 code = 404
270 270 else:
271 271 continue
272 272 else:
273 273 # An error dict without a code doesn't make much sense, so
274 274 # treat as a server error.
275 275 code = response.get('error').get('code', 500)
276 276
277 277 ptrmap = {p.oid(): p for p in pointers}
278 278 p = ptrmap.get(response['oid'], None)
279 279 if p:
280 280 filename = getattr(p, 'filename', 'unknown')
281 281 errors = {
282 282 404: 'The object does not exist',
283 283 410: 'The object was removed by the owner',
284 284 422: 'Validation error',
285 285 500: 'Internal server error',
286 286 }
287 287 msg = errors.get(code, 'status code %d' % code)
288 288 raise LfsRemoteError(_('LFS server error for "%s": %s')
289 289 % (filename, msg))
290 290 else:
291 291 raise LfsRemoteError(
292 292 _('LFS server error. Unsolicited response for oid %s')
293 293 % response['oid'])
294 294
295 295 def _extractobjects(self, response, pointers, action):
296 296 """extract objects from response of the batch API
297 297
298 298 response: parsed JSON object returned by batch API
299 299 return response['objects'] filtered by action
300 300 raise if any object has an error
301 301 """
302 302 # Scan errors from objects - fail early
303 303 objects = response.get('objects', [])
304 304 self._checkforservererror(pointers, objects, action)
305 305
306 306 # Filter objects with given action. Practically, this skips uploading
307 307 # objects which exist in the server.
308 308 filteredobjects = [o for o in objects if action in o.get('actions', [])]
309 309
310 310 return filteredobjects
311 311
312 312 def _basictransfer(self, obj, action, localstore):
313 313 """Download or upload a single object using basic transfer protocol
314 314
315 315 obj: dict, an object description returned by batch API
316 316 action: string, one of ['upload', 'download']
317 317 localstore: blobstore.local
318 318
319 319 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
320 320 basic-transfers.md
321 321 """
322 322 oid = pycompat.bytestr(obj['oid'])
323 323
324 324 href = pycompat.bytestr(obj['actions'][action].get('href'))
325 325 headers = obj['actions'][action].get('header', {}).items()
326 326
327 327 request = util.urlreq.request(href)
328 328 if action == 'upload':
329 329 # If uploading blobs, read data from local blobstore.
330 330 if not localstore.verify(oid):
331 331 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
332 332 hint=_('run hg verify'))
333 333 request.data = filewithprogress(localstore.open(oid), None)
334 334 request.get_method = lambda: 'PUT'
335 request.add_header('Content-Type', 'application/octet-stream')
335 336
336 337 for k, v in headers:
337 338 request.add_header(k, v)
338 339
339 340 response = b''
340 341 try:
341 342 req = self.urlopener.open(request)
342 343
343 344 if self.ui.debugflag:
344 345 self.ui.debug('Status: %d\n' % req.status)
345 346 # lfs-test-server and hg serve return headers in different order
346 347 self.ui.debug('%s\n'
347 348 % '\n'.join(sorted(str(req.info()).splitlines())))
348 349
349 350 if action == 'download':
350 351 # If downloading blobs, store downloaded data to local blobstore
351 352 localstore.download(oid, req)
352 353 else:
353 354 while True:
354 355 data = req.read(1048576)
355 356 if not data:
356 357 break
357 358 response += data
358 359 if response:
359 360 self.ui.debug('lfs %s response: %s' % (action, response))
360 361 except util.urlerr.httperror as ex:
361 362 if self.ui.debugflag:
362 363 self.ui.debug('%s: %s\n' % (oid, ex.read()))
363 364 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
364 365 % (ex, oid, action))
365 366
366 367 def _batch(self, pointers, localstore, action):
367 368 if action not in ['upload', 'download']:
368 369 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
369 370
370 371 response = self._batchrequest(pointers, action)
371 372 objects = self._extractobjects(response, pointers, action)
372 373 total = sum(x.get('size', 0) for x in objects)
373 374 sizes = {}
374 375 for obj in objects:
375 376 sizes[obj.get('oid')] = obj.get('size', 0)
376 377 topic = {'upload': _('lfs uploading'),
377 378 'download': _('lfs downloading')}[action]
378 379 if len(objects) > 1:
379 380 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
380 381 % (len(objects), util.bytecount(total)))
381 382 self.ui.progress(topic, 0, total=total)
382 383 def transfer(chunk):
383 384 for obj in chunk:
384 385 objsize = obj.get('size', 0)
385 386 if self.ui.verbose:
386 387 if action == 'download':
387 388 msg = _('lfs: downloading %s (%s)\n')
388 389 elif action == 'upload':
389 390 msg = _('lfs: uploading %s (%s)\n')
390 391 self.ui.note(msg % (obj.get('oid'),
391 392 util.bytecount(objsize)))
392 393 retry = self.retry
393 394 while True:
394 395 try:
395 396 self._basictransfer(obj, action, localstore)
396 397 yield 1, obj.get('oid')
397 398 break
398 399 except socket.error as ex:
399 400 if retry > 0:
400 401 self.ui.note(
401 402 _('lfs: failed: %r (remaining retry %d)\n')
402 403 % (ex, retry))
403 404 retry -= 1
404 405 continue
405 406 raise
406 407
407 408 # Until https multiplexing gets sorted out
408 409 if self.ui.configbool('experimental', 'lfs.worker-enable'):
409 410 oids = worker.worker(self.ui, 0.1, transfer, (),
410 411 sorted(objects, key=lambda o: o.get('oid')))
411 412 else:
412 413 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
413 414
414 415 processed = 0
415 416 blobs = 0
416 417 for _one, oid in oids:
417 418 processed += sizes[oid]
418 419 blobs += 1
419 420 self.ui.progress(topic, processed, total=total)
420 421 self.ui.note(_('lfs: processed: %s\n') % oid)
421 422 self.ui.progress(topic, pos=None, total=total)
422 423
423 424 if blobs > 0:
424 425 if action == 'upload':
425 426 self.ui.status(_('lfs: uploaded %d files (%s)\n')
426 427 % (blobs, util.bytecount(processed)))
427 428 # TODO: coalesce the download requests, and comment this in
428 429 #elif action == 'download':
429 430 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
430 431 # % (blobs, util.bytecount(processed)))
431 432
432 433 def __del__(self):
433 434 # copied from mercurial/httppeer.py
434 435 urlopener = getattr(self, 'urlopener', None)
435 436 if urlopener:
436 437 for h in urlopener.handlers:
437 438 h.close()
438 439 getattr(h, "close_all", lambda : None)()
439 440
440 441 class _dummyremote(object):
441 442 """Dummy store storing blobs to temp directory."""
442 443
443 444 def __init__(self, repo, url):
444 445 fullpath = repo.vfs.join('lfs', url.path)
445 446 self.vfs = lfsvfs(fullpath)
446 447
447 448 def writebatch(self, pointers, fromstore):
448 449 for p in _deduplicate(pointers):
449 450 content = fromstore.read(p.oid(), verify=True)
450 451 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
451 452 fp.write(content)
452 453
453 454 def readbatch(self, pointers, tostore):
454 455 for p in _deduplicate(pointers):
455 456 with self.vfs(p.oid(), 'rb') as fp:
456 457 tostore.download(p.oid(), fp)
457 458
458 459 class _nullremote(object):
459 460 """Null store storing blobs to /dev/null."""
460 461
461 462 def __init__(self, repo, url):
462 463 pass
463 464
464 465 def writebatch(self, pointers, fromstore):
465 466 pass
466 467
467 468 def readbatch(self, pointers, tostore):
468 469 pass
469 470
470 471 class _promptremote(object):
471 472 """Prompt user to set lfs.url when accessed."""
472 473
473 474 def __init__(self, repo, url):
474 475 pass
475 476
476 477 def writebatch(self, pointers, fromstore, ui=None):
477 478 self._prompt()
478 479
479 480 def readbatch(self, pointers, tostore, ui=None):
480 481 self._prompt()
481 482
482 483 def _prompt(self):
483 484 raise error.Abort(_('lfs.url needs to be configured'))
484 485
485 486 _storemap = {
486 487 'https': _gitlfsremote,
487 488 'http': _gitlfsremote,
488 489 'file': _dummyremote,
489 490 'null': _nullremote,
490 491 None: _promptremote,
491 492 }
492 493
493 494 def _deduplicate(pointers):
494 495 """Remove any duplicate oids that exist in the list"""
495 496 reduced = util.sortdict()
496 497 for p in pointers:
497 498 reduced[p.oid()] = p
498 499 return reduced.values()
499 500
500 501 def _verify(oid, content):
501 502 realoid = hashlib.sha256(content).hexdigest()
502 503 if realoid != oid:
503 504 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
504 505 hint=_('run hg verify'))
505 506
506 507 def remote(repo):
507 508 """remotestore factory. return a store in _storemap depending on config"""
508 509 url = util.url(repo.ui.config('lfs', 'url') or '')
509 510 scheme = url.scheme
510 511 if scheme not in _storemap:
511 512 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
512 513 return _storemap[scheme](repo, url)
513 514
514 515 class LfsRemoteError(error.RevlogError):
515 516 pass
General Comments 0
You need to be logged in to leave comments. Login now