##// END OF EJS Templates
lfs: raise an error if the server sends an unsolicited oid...
Matt Harbison -
r35713:dd672e3d default
parent child Browse files
Show More
@@ -1,452 +1,456
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import json
12 12 import os
13 13 import re
14 14 import socket
15 15
16 16 from mercurial.i18n import _
17 17
18 18 from mercurial import (
19 19 error,
20 20 pathutil,
21 21 url as urlmod,
22 22 util,
23 23 vfs as vfsmod,
24 24 worker,
25 25 )
26 26
27 27 from ..largefiles import lfutil
28 28
29 29 # 64 bytes for SHA256
30 30 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
31 31
32 32 class lfsvfs(vfsmod.vfs):
33 33 def join(self, path):
34 34 """split the path at first two characters, like: XX/XXXXX..."""
35 35 if not _lfsre.match(path):
36 36 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 37 return super(lfsvfs, self).join(path[0:2], path[2:])
38 38
39 39 def walk(self, path=None, onerror=None):
40 40 """Yield (dirpath, [], oids) tuple for blobs under path
41 41
42 42 Oids only exist in the root of this vfs, so dirpath is always ''.
43 43 """
44 44 root = os.path.normpath(self.base)
45 45 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 46 # because len(dirpath) < prefixlen.
47 47 prefixlen = len(pathutil.normasprefix(root))
48 48 oids = []
49 49
50 50 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 51 onerror=onerror):
52 52 dirpath = dirpath[prefixlen:]
53 53
54 54 # Silently skip unexpected files and directories
55 55 if len(dirpath) == 2:
56 56 oids.extend([dirpath + f for f in files
57 57 if _lfsre.match(dirpath + f)])
58 58
59 59 yield ('', [], oids)
60 60
61 61 class filewithprogress(object):
62 62 """a file-like object that supports __len__ and read.
63 63
64 64 Useful to provide progress information for how many bytes are read.
65 65 """
66 66
67 67 def __init__(self, fp, callback):
68 68 self._fp = fp
69 69 self._callback = callback # func(readsize)
70 70 fp.seek(0, os.SEEK_END)
71 71 self._len = fp.tell()
72 72 fp.seek(0)
73 73
74 74 def __len__(self):
75 75 return self._len
76 76
77 77 def read(self, size):
78 78 if self._fp is None:
79 79 return b''
80 80 data = self._fp.read(size)
81 81 if data:
82 82 if self._callback:
83 83 self._callback(len(data))
84 84 else:
85 85 self._fp.close()
86 86 self._fp = None
87 87 return data
88 88
89 89 class local(object):
90 90 """Local blobstore for large file contents.
91 91
92 92 This blobstore is used both as a cache and as a staging area for large blobs
93 93 to be uploaded to the remote blobstore.
94 94 """
95 95
96 96 def __init__(self, repo):
97 97 fullpath = repo.svfs.join('lfs/objects')
98 98 self.vfs = lfsvfs(fullpath)
99 99 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 100 self.cachevfs = lfsvfs(usercache)
101 101 self.ui = repo.ui
102 102
103 103 def open(self, oid):
104 104 """Open a read-only file descriptor to the named blob, in either the
105 105 usercache or the local store."""
106 106 # The usercache is the most likely place to hold the file. Commit will
107 107 # write to both it and the local store, as will anything that downloads
108 108 # the blobs. However, things like clone without an update won't
109 109 # populate the local store. For an init + push of a local clone,
110 110 # the usercache is the only place it _could_ be. If not present, the
111 111 # missing file msg here will indicate the local repo, not the usercache.
112 112 if self.cachevfs.exists(oid):
113 113 return self.cachevfs(oid, 'rb')
114 114
115 115 return self.vfs(oid, 'rb')
116 116
117 117 def download(self, oid, src):
118 118 """Read the blob from the remote source in chunks, verify the content,
119 119 and write to this local blobstore."""
120 120 sha256 = hashlib.sha256()
121 121
122 122 with self.vfs(oid, 'wb', atomictemp=True) as fp:
123 123 for chunk in util.filechunkiter(src, size=1048576):
124 124 fp.write(chunk)
125 125 sha256.update(chunk)
126 126
127 127 realoid = sha256.hexdigest()
128 128 if realoid != oid:
129 129 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
130 130
131 131 # XXX: should we verify the content of the cache, and hardlink back to
132 132 # the local store on success, but truncate, write and link on failure?
133 133 if not self.cachevfs.exists(oid):
134 134 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
135 135 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
136 136
137 137 def write(self, oid, data):
138 138 """Write blob to local blobstore.
139 139
140 140 This should only be called from the filelog during a commit or similar.
141 141 As such, there is no need to verify the data. Imports from a remote
142 142 store must use ``download()`` instead."""
143 143 with self.vfs(oid, 'wb', atomictemp=True) as fp:
144 144 fp.write(data)
145 145
146 146 # XXX: should we verify the content of the cache, and hardlink back to
147 147 # the local store on success, but truncate, write and link on failure?
148 148 if not self.cachevfs.exists(oid):
149 149 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
150 150 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
151 151
152 152 def read(self, oid, verify=True):
153 153 """Read blob from local blobstore."""
154 154 if not self.vfs.exists(oid):
155 155 blob = self._read(self.cachevfs, oid, verify)
156 156
157 157 # Even if revlog will verify the content, it needs to be verified
158 158 # now before making the hardlink to avoid propagating corrupt blobs.
159 159 # Don't abort if corruption is detected, because `hg verify` will
160 160 # give more useful info about the corruption- simply don't add the
161 161 # hardlink.
162 162 if verify or hashlib.sha256(blob).hexdigest() == oid:
163 163 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
164 164 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
165 165 else:
166 166 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
167 167 blob = self._read(self.vfs, oid, verify)
168 168 return blob
169 169
170 170 def _read(self, vfs, oid, verify):
171 171 """Read blob (after verifying) from the given store"""
172 172 blob = vfs.read(oid)
173 173 if verify:
174 174 _verify(oid, blob)
175 175 return blob
176 176
177 177 def has(self, oid):
178 178 """Returns True if the local blobstore contains the requested blob,
179 179 False otherwise."""
180 180 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
181 181
182 182 class _gitlfsremote(object):
183 183
184 184 def __init__(self, repo, url):
185 185 ui = repo.ui
186 186 self.ui = ui
187 187 baseurl, authinfo = url.authinfo()
188 188 self.baseurl = baseurl.rstrip('/')
189 189 useragent = repo.ui.config('experimental', 'lfs.user-agent')
190 190 if not useragent:
191 191 useragent = 'mercurial/%s git/2.15.1' % util.version()
192 192 self.urlopener = urlmod.opener(ui, authinfo, useragent)
193 193 self.retry = ui.configint('lfs', 'retry')
194 194
195 195 def writebatch(self, pointers, fromstore):
196 196 """Batch upload from local to remote blobstore."""
197 197 self._batch(pointers, fromstore, 'upload')
198 198
199 199 def readbatch(self, pointers, tostore):
200 200 """Batch download from remote to local blostore."""
201 201 self._batch(pointers, tostore, 'download')
202 202
203 203 def _batchrequest(self, pointers, action):
204 204 """Get metadata about objects pointed by pointers for given action
205 205
206 206 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
207 207 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
208 208 """
209 209 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
210 210 requestdata = json.dumps({
211 211 'objects': objects,
212 212 'operation': action,
213 213 })
214 214 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
215 215 data=requestdata)
216 216 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
217 217 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
218 218 try:
219 219 rawjson = self.urlopener.open(batchreq).read()
220 220 except util.urlerr.httperror as ex:
221 221 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
222 222 % (ex, action))
223 223 try:
224 224 response = json.loads(rawjson)
225 225 except ValueError:
226 226 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
227 227 % rawjson)
228 228 return response
229 229
230 230 def _checkforservererror(self, pointers, responses, action):
231 231 """Scans errors from objects
232 232
233 233 Raises LfsRemoteError if any objects have an error"""
234 234 for response in responses:
235 235 # The server should return 404 when objects cannot be found. Some
236 236 # server implementation (ex. lfs-test-server) does not set "error"
237 237 # but just removes "download" from "actions". Treat that case
238 238 # as the same as 404 error.
239 239 notfound = (response.get('error', {}).get('code') == 404
240 240 or (action == 'download'
241 241 and action not in response.get('actions', [])))
242 242 if notfound:
243 243 ptrmap = {p.oid(): p for p in pointers}
244 244 p = ptrmap.get(response['oid'], None)
245 245 if p:
246 246 filename = getattr(p, 'filename', 'unknown')
247 247 raise LfsRemoteError(
248 248 _(('LFS server error. Remote object '
249 249 'for "%s" not found: %r')) % (filename, response))
250 else:
251 raise LfsRemoteError(
252 _('LFS server error. Unsolicited response for oid %s')
253 % response['oid'])
250 254 if 'error' in response:
251 255 raise LfsRemoteError(_('LFS server error: %r') % response)
252 256
253 257 def _extractobjects(self, response, pointers, action):
254 258 """extract objects from response of the batch API
255 259
256 260 response: parsed JSON object returned by batch API
257 261 return response['objects'] filtered by action
258 262 raise if any object has an error
259 263 """
260 264 # Scan errors from objects - fail early
261 265 objects = response.get('objects', [])
262 266 self._checkforservererror(pointers, objects, action)
263 267
264 268 # Filter objects with given action. Practically, this skips uploading
265 269 # objects which exist in the server.
266 270 filteredobjects = [o for o in objects if action in o.get('actions', [])]
267 271
268 272 return filteredobjects
269 273
270 274 def _basictransfer(self, obj, action, localstore):
271 275 """Download or upload a single object using basic transfer protocol
272 276
273 277 obj: dict, an object description returned by batch API
274 278 action: string, one of ['upload', 'download']
275 279 localstore: blobstore.local
276 280
277 281 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
278 282 basic-transfers.md
279 283 """
280 284 oid = str(obj['oid'])
281 285
282 286 href = str(obj['actions'][action].get('href'))
283 287 headers = obj['actions'][action].get('header', {}).items()
284 288
285 289 request = util.urlreq.request(href)
286 290 if action == 'upload':
287 291 # If uploading blobs, read data from local blobstore.
288 292 with localstore.open(oid) as fp:
289 293 _verifyfile(oid, fp)
290 294 request.data = filewithprogress(localstore.open(oid), None)
291 295 request.get_method = lambda: 'PUT'
292 296
293 297 for k, v in headers:
294 298 request.add_header(k, v)
295 299
296 300 response = b''
297 301 try:
298 302 req = self.urlopener.open(request)
299 303 if action == 'download':
300 304 # If downloading blobs, store downloaded data to local blobstore
301 305 localstore.download(oid, req)
302 306 else:
303 307 while True:
304 308 data = req.read(1048576)
305 309 if not data:
306 310 break
307 311 response += data
308 312 if response:
309 313 self.ui.debug('lfs %s response: %s' % (action, response))
310 314 except util.urlerr.httperror as ex:
311 315 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
312 316 % (ex, oid, action))
313 317
314 318 def _batch(self, pointers, localstore, action):
315 319 if action not in ['upload', 'download']:
316 320 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
317 321
318 322 response = self._batchrequest(pointers, action)
319 323 objects = self._extractobjects(response, pointers, action)
320 324 total = sum(x.get('size', 0) for x in objects)
321 325 sizes = {}
322 326 for obj in objects:
323 327 sizes[obj.get('oid')] = obj.get('size', 0)
324 328 topic = {'upload': _('lfs uploading'),
325 329 'download': _('lfs downloading')}[action]
326 330 if len(objects) > 1:
327 331 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
328 332 % (len(objects), util.bytecount(total)))
329 333 self.ui.progress(topic, 0, total=total)
330 334 def transfer(chunk):
331 335 for obj in chunk:
332 336 objsize = obj.get('size', 0)
333 337 if self.ui.verbose:
334 338 if action == 'download':
335 339 msg = _('lfs: downloading %s (%s)\n')
336 340 elif action == 'upload':
337 341 msg = _('lfs: uploading %s (%s)\n')
338 342 self.ui.note(msg % (obj.get('oid'),
339 343 util.bytecount(objsize)))
340 344 retry = self.retry
341 345 while True:
342 346 try:
343 347 self._basictransfer(obj, action, localstore)
344 348 yield 1, obj.get('oid')
345 349 break
346 350 except socket.error as ex:
347 351 if retry > 0:
348 352 self.ui.note(
349 353 _('lfs: failed: %r (remaining retry %d)\n')
350 354 % (ex, retry))
351 355 retry -= 1
352 356 continue
353 357 raise
354 358
355 359 oids = worker.worker(self.ui, 0.1, transfer, (),
356 360 sorted(objects, key=lambda o: o.get('oid')))
357 361 processed = 0
358 362 for _one, oid in oids:
359 363 processed += sizes[oid]
360 364 self.ui.progress(topic, processed, total=total)
361 365 self.ui.note(_('lfs: processed: %s\n') % oid)
362 366 self.ui.progress(topic, pos=None, total=total)
363 367
364 368 def __del__(self):
365 369 # copied from mercurial/httppeer.py
366 370 urlopener = getattr(self, 'urlopener', None)
367 371 if urlopener:
368 372 for h in urlopener.handlers:
369 373 h.close()
370 374 getattr(h, "close_all", lambda : None)()
371 375
372 376 class _dummyremote(object):
373 377 """Dummy store storing blobs to temp directory."""
374 378
375 379 def __init__(self, repo, url):
376 380 fullpath = repo.vfs.join('lfs', url.path)
377 381 self.vfs = lfsvfs(fullpath)
378 382
379 383 def writebatch(self, pointers, fromstore):
380 384 for p in pointers:
381 385 content = fromstore.read(p.oid(), verify=True)
382 386 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
383 387 fp.write(content)
384 388
385 389 def readbatch(self, pointers, tostore):
386 390 for p in pointers:
387 391 with self.vfs(p.oid(), 'rb') as fp:
388 392 tostore.download(p.oid(), fp)
389 393
390 394 class _nullremote(object):
391 395 """Null store storing blobs to /dev/null."""
392 396
393 397 def __init__(self, repo, url):
394 398 pass
395 399
396 400 def writebatch(self, pointers, fromstore):
397 401 pass
398 402
399 403 def readbatch(self, pointers, tostore):
400 404 pass
401 405
402 406 class _promptremote(object):
403 407 """Prompt user to set lfs.url when accessed."""
404 408
405 409 def __init__(self, repo, url):
406 410 pass
407 411
408 412 def writebatch(self, pointers, fromstore, ui=None):
409 413 self._prompt()
410 414
411 415 def readbatch(self, pointers, tostore, ui=None):
412 416 self._prompt()
413 417
414 418 def _prompt(self):
415 419 raise error.Abort(_('lfs.url needs to be configured'))
416 420
417 421 _storemap = {
418 422 'https': _gitlfsremote,
419 423 'http': _gitlfsremote,
420 424 'file': _dummyremote,
421 425 'null': _nullremote,
422 426 None: _promptremote,
423 427 }
424 428
425 429 def _verify(oid, content):
426 430 realoid = hashlib.sha256(content).hexdigest()
427 431 if realoid != oid:
428 432 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
429 433 hint=_('run hg verify'))
430 434
431 435 def _verifyfile(oid, fp):
432 436 sha256 = hashlib.sha256()
433 437 while True:
434 438 data = fp.read(1024 * 1024)
435 439 if not data:
436 440 break
437 441 sha256.update(data)
438 442 realoid = sha256.hexdigest()
439 443 if realoid != oid:
440 444 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
441 445 hint=_('run hg verify'))
442 446
443 447 def remote(repo):
444 448 """remotestore factory. return a store in _storemap depending on config"""
445 449 url = util.url(repo.ui.config('lfs', 'url') or '')
446 450 scheme = url.scheme
447 451 if scheme not in _storemap:
448 452 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
449 453 return _storemap[scheme](repo, url)
450 454
451 455 class LfsRemoteError(error.RevlogError):
452 456 pass
General Comments 0
You need to be logged in to leave comments. Login now