##// END OF EJS Templates
lfs: add a local store method for opening a blob...
Matt Harbison -
r35543:83903433 default
parent child Browse files
Show More
@@ -1,434 +1,442
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import json
12 12 import os
13 13 import re
14 14 import socket
15 15
16 16 from mercurial.i18n import _
17 17
18 18 from mercurial import (
19 19 error,
20 20 pathutil,
21 21 url as urlmod,
22 22 util,
23 23 vfs as vfsmod,
24 24 worker,
25 25 )
26 26
27 27 from ..largefiles import lfutil
28 28
29 29 # 64 bytes for SHA256
30 30 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
31 31
32 32 class lfsvfs(vfsmod.vfs):
33 33 def join(self, path):
34 34 """split the path at first two characters, like: XX/XXXXX..."""
35 35 if not _lfsre.match(path):
36 36 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 37 return super(lfsvfs, self).join(path[0:2], path[2:])
38 38
39 39 def walk(self, path=None, onerror=None):
40 40 """Yield (dirpath, [], oids) tuple for blobs under path
41 41
42 42 Oids only exist in the root of this vfs, so dirpath is always ''.
43 43 """
44 44 root = os.path.normpath(self.base)
45 45 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 46 # because len(dirpath) < prefixlen.
47 47 prefixlen = len(pathutil.normasprefix(root))
48 48 oids = []
49 49
50 50 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 51 onerror=onerror):
52 52 dirpath = dirpath[prefixlen:]
53 53
54 54 # Silently skip unexpected files and directories
55 55 if len(dirpath) == 2:
56 56 oids.extend([dirpath + f for f in files
57 57 if _lfsre.match(dirpath + f)])
58 58
59 59 yield ('', [], oids)
60 60
61 61 class filewithprogress(object):
62 62 """a file-like object that supports __len__ and read.
63 63
64 64 Useful to provide progress information for how many bytes are read.
65 65 """
66 66
67 67 def __init__(self, fp, callback):
68 68 self._fp = fp
69 69 self._callback = callback # func(readsize)
70 70 fp.seek(0, os.SEEK_END)
71 71 self._len = fp.tell()
72 72 fp.seek(0)
73 73
74 74 def __len__(self):
75 75 return self._len
76 76
77 77 def read(self, size):
78 78 if self._fp is None:
79 79 return b''
80 80 data = self._fp.read(size)
81 81 if data:
82 82 if self._callback:
83 83 self._callback(len(data))
84 84 else:
85 85 self._fp.close()
86 86 self._fp = None
87 87 return data
88 88
89 89 class local(object):
90 90 """Local blobstore for large file contents.
91 91
92 92 This blobstore is used both as a cache and as a staging area for large blobs
93 93 to be uploaded to the remote blobstore.
94 94 """
95 95
96 96 def __init__(self, repo):
97 97 fullpath = repo.svfs.join('lfs/objects')
98 98 self.vfs = lfsvfs(fullpath)
99 99 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 100 self.cachevfs = lfsvfs(usercache)
101 101 self.ui = repo.ui
102 102
103 def open(self, oid):
104 """Open a read-only file descriptor to the named blob, in either the
105 usercache or the local store."""
106 if self.cachevfs.exists(oid):
107 return self.cachevfs(oid, 'rb')
108
109 return self.vfs(oid, 'rb')
110
103 111 def write(self, oid, data, verify=True):
104 112 """Write blob to local blobstore."""
105 113 if verify:
106 114 _verify(oid, data)
107 115
108 116 with self.vfs(oid, 'wb', atomictemp=True) as fp:
109 117 fp.write(data)
110 118
111 119 # XXX: should we verify the content of the cache, and hardlink back to
112 120 # the local store on success, but truncate, write and link on failure?
113 121 if not self.cachevfs.exists(oid):
114 122 if verify or hashlib.sha256(data).hexdigest() == oid:
115 123 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
116 124 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
117 125
118 126 def read(self, oid, verify=True):
119 127 """Read blob from local blobstore."""
120 128 if not self.vfs.exists(oid):
121 129 blob = self._read(self.cachevfs, oid, verify)
122 130
123 131 # Even if revlog will verify the content, it needs to be verified
124 132 # now before making the hardlink to avoid propagating corrupt blobs.
125 133 # Don't abort if corruption is detected, because `hg verify` will
126 134 # give more useful info about the corruption- simply don't add the
127 135 # hardlink.
128 136 if verify or hashlib.sha256(blob).hexdigest() == oid:
129 137 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
130 138 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
131 139 else:
132 140 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
133 141 blob = self._read(self.vfs, oid, verify)
134 142 return blob
135 143
136 144 def _read(self, vfs, oid, verify):
137 145 """Read blob (after verifying) from the given store"""
138 146 blob = vfs.read(oid)
139 147 if verify:
140 148 _verify(oid, blob)
141 149 return blob
142 150
143 151 def has(self, oid):
144 152 """Returns True if the local blobstore contains the requested blob,
145 153 False otherwise."""
146 154 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
147 155
148 156 class _gitlfsremote(object):
149 157
150 158 def __init__(self, repo, url):
151 159 ui = repo.ui
152 160 self.ui = ui
153 161 baseurl, authinfo = url.authinfo()
154 162 self.baseurl = baseurl.rstrip('/')
155 163 useragent = repo.ui.config('experimental', 'lfs.user-agent')
156 164 if not useragent:
157 165 useragent = 'mercurial/%s git/2.15.1' % util.version()
158 166 self.urlopener = urlmod.opener(ui, authinfo, useragent)
159 167 self.retry = ui.configint('lfs', 'retry')
160 168
161 169 def writebatch(self, pointers, fromstore):
162 170 """Batch upload from local to remote blobstore."""
163 171 self._batch(pointers, fromstore, 'upload')
164 172
165 173 def readbatch(self, pointers, tostore):
166 174 """Batch download from remote to local blostore."""
167 175 self._batch(pointers, tostore, 'download')
168 176
169 177 def _batchrequest(self, pointers, action):
170 178 """Get metadata about objects pointed by pointers for given action
171 179
172 180 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
173 181 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
174 182 """
175 183 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
176 184 requestdata = json.dumps({
177 185 'objects': objects,
178 186 'operation': action,
179 187 })
180 188 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
181 189 data=requestdata)
182 190 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
183 191 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
184 192 try:
185 193 rawjson = self.urlopener.open(batchreq).read()
186 194 except util.urlerr.httperror as ex:
187 195 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
188 196 % (ex, action))
189 197 try:
190 198 response = json.loads(rawjson)
191 199 except ValueError:
192 200 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
193 201 % rawjson)
194 202 return response
195 203
196 204 def _checkforservererror(self, pointers, responses):
197 205 """Scans errors from objects
198 206
199 207 Returns LfsRemoteError if any objects has an error"""
200 208 for response in responses:
201 209 error = response.get('error')
202 210 if error:
203 211 ptrmap = {p.oid(): p for p in pointers}
204 212 p = ptrmap.get(response['oid'], None)
205 213 if error['code'] == 404 and p:
206 214 filename = getattr(p, 'filename', 'unknown')
207 215 raise LfsRemoteError(
208 216 _(('LFS server error. Remote object '
209 217 'for file %s not found: %r')) % (filename, response))
210 218 raise LfsRemoteError(_('LFS server error: %r') % response)
211 219
212 220 def _extractobjects(self, response, pointers, action):
213 221 """extract objects from response of the batch API
214 222
215 223 response: parsed JSON object returned by batch API
216 224 return response['objects'] filtered by action
217 225 raise if any object has an error
218 226 """
219 227 # Scan errors from objects - fail early
220 228 objects = response.get('objects', [])
221 229 self._checkforservererror(pointers, objects)
222 230
223 231 # Filter objects with given action. Practically, this skips uploading
224 232 # objects which exist in the server.
225 233 filteredobjects = [o for o in objects if action in o.get('actions', [])]
226 234 # But for downloading, we want all objects. Therefore missing objects
227 235 # should be considered an error.
228 236 if action == 'download':
229 237 if len(filteredobjects) < len(objects):
230 238 missing = [o.get('oid', '?')
231 239 for o in objects
232 240 if action not in o.get('actions', [])]
233 241 raise LfsRemoteError(
234 242 _('LFS server claims required objects do not exist:\n%s')
235 243 % '\n'.join(missing))
236 244
237 245 return filteredobjects
238 246
239 247 def _basictransfer(self, obj, action, localstore):
240 248 """Download or upload a single object using basic transfer protocol
241 249
242 250 obj: dict, an object description returned by batch API
243 251 action: string, one of ['upload', 'download']
244 252 localstore: blobstore.local
245 253
246 254 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
247 255 basic-transfers.md
248 256 """
249 257 oid = str(obj['oid'])
250 258
251 259 href = str(obj['actions'][action].get('href'))
252 260 headers = obj['actions'][action].get('header', {}).items()
253 261
254 262 request = util.urlreq.request(href)
255 263 if action == 'upload':
256 264 # If uploading blobs, read data from local blobstore.
257 265 with localstore.vfs(oid) as fp:
258 266 _verifyfile(oid, fp)
259 267 request.data = filewithprogress(localstore.vfs(oid), None)
260 268 request.get_method = lambda: 'PUT'
261 269
262 270 for k, v in headers:
263 271 request.add_header(k, v)
264 272
265 273 response = b''
266 274 try:
267 275 req = self.urlopener.open(request)
268 276 while True:
269 277 data = req.read(1048576)
270 278 if not data:
271 279 break
272 280 response += data
273 281 except util.urlerr.httperror as ex:
274 282 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
275 283 % (ex, oid, action))
276 284
277 285 if action == 'download':
278 286 # If downloading blobs, store downloaded data to local blobstore
279 287 localstore.write(oid, response, verify=True)
280 288
281 289 def _batch(self, pointers, localstore, action):
282 290 if action not in ['upload', 'download']:
283 291 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
284 292
285 293 response = self._batchrequest(pointers, action)
286 294 objects = self._extractobjects(response, pointers, action)
287 295 total = sum(x.get('size', 0) for x in objects)
288 296 sizes = {}
289 297 for obj in objects:
290 298 sizes[obj.get('oid')] = obj.get('size', 0)
291 299 topic = {'upload': _('lfs uploading'),
292 300 'download': _('lfs downloading')}[action]
293 301 if len(objects) > 1:
294 302 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
295 303 % (len(objects), util.bytecount(total)))
296 304 self.ui.progress(topic, 0, total=total)
297 305 def transfer(chunk):
298 306 for obj in chunk:
299 307 objsize = obj.get('size', 0)
300 308 if self.ui.verbose:
301 309 if action == 'download':
302 310 msg = _('lfs: downloading %s (%s)\n')
303 311 elif action == 'upload':
304 312 msg = _('lfs: uploading %s (%s)\n')
305 313 self.ui.note(msg % (obj.get('oid'),
306 314 util.bytecount(objsize)))
307 315 retry = self.retry
308 316 while True:
309 317 try:
310 318 self._basictransfer(obj, action, localstore)
311 319 yield 1, obj.get('oid')
312 320 break
313 321 except socket.error as ex:
314 322 if retry > 0:
315 323 self.ui.note(
316 324 _('lfs: failed: %r (remaining retry %d)\n')
317 325 % (ex, retry))
318 326 retry -= 1
319 327 continue
320 328 raise
321 329
322 330 oids = worker.worker(self.ui, 0.1, transfer, (),
323 331 sorted(objects, key=lambda o: o.get('oid')))
324 332 processed = 0
325 333 for _one, oid in oids:
326 334 processed += sizes[oid]
327 335 self.ui.progress(topic, processed, total=total)
328 336 self.ui.note(_('lfs: processed: %s\n') % oid)
329 337 self.ui.progress(topic, pos=None, total=total)
330 338
331 339 def __del__(self):
332 340 # copied from mercurial/httppeer.py
333 341 urlopener = getattr(self, 'urlopener', None)
334 342 if urlopener:
335 343 for h in urlopener.handlers:
336 344 h.close()
337 345 getattr(h, "close_all", lambda : None)()
338 346
339 347 class _dummyremote(object):
340 348 """Dummy store storing blobs to temp directory."""
341 349
342 350 def __init__(self, repo, url):
343 351 fullpath = repo.vfs.join('lfs', url.path)
344 352 self.vfs = lfsvfs(fullpath)
345 353
346 354 def writebatch(self, pointers, fromstore):
347 355 for p in pointers:
348 356 content = fromstore.read(p.oid(), verify=True)
349 357 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
350 358 fp.write(content)
351 359
352 360 def readbatch(self, pointers, tostore):
353 361 for p in pointers:
354 362 content = self.vfs.read(p.oid())
355 363 tostore.write(p.oid(), content, verify=True)
356 364
357 365 class _nullremote(object):
358 366 """Null store storing blobs to /dev/null."""
359 367
360 368 def __init__(self, repo, url):
361 369 pass
362 370
363 371 def writebatch(self, pointers, fromstore):
364 372 pass
365 373
366 374 def readbatch(self, pointers, tostore):
367 375 pass
368 376
369 377 class _promptremote(object):
370 378 """Prompt user to set lfs.url when accessed."""
371 379
372 380 def __init__(self, repo, url):
373 381 pass
374 382
375 383 def writebatch(self, pointers, fromstore, ui=None):
376 384 self._prompt()
377 385
378 386 def readbatch(self, pointers, tostore, ui=None):
379 387 self._prompt()
380 388
381 389 def _prompt(self):
382 390 raise error.Abort(_('lfs.url needs to be configured'))
383 391
384 392 _storemap = {
385 393 'https': _gitlfsremote,
386 394 'http': _gitlfsremote,
387 395 'file': _dummyremote,
388 396 'null': _nullremote,
389 397 None: _promptremote,
390 398 }
391 399
392 400 def _verify(oid, content):
393 401 realoid = hashlib.sha256(content).hexdigest()
394 402 if realoid != oid:
395 403 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
396 404 hint=_('run hg verify'))
397 405
398 406 def _verifyfile(oid, fp):
399 407 sha256 = hashlib.sha256()
400 408 while True:
401 409 data = fp.read(1024 * 1024)
402 410 if not data:
403 411 break
404 412 sha256.update(data)
405 413 realoid = sha256.hexdigest()
406 414 if realoid != oid:
407 415 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
408 416 hint=_('run hg verify'))
409 417
410 418 def remote(repo):
411 419 """remotestore factory. return a store in _storemap depending on config"""
412 420 defaulturl = ''
413 421
414 422 # convert deprecated configs to the new url. TODO: remove this if other
415 423 # places are migrated to the new url config.
416 424 # deprecated config: lfs.remotestore
417 425 deprecatedstore = repo.ui.config('lfs', 'remotestore')
418 426 if deprecatedstore == 'dummy':
419 427 # deprecated config: lfs.remotepath
420 428 defaulturl = 'file://' + repo.ui.config('lfs', 'remotepath')
421 429 elif deprecatedstore == 'git-lfs':
422 430 # deprecated config: lfs.remoteurl
423 431 defaulturl = repo.ui.config('lfs', 'remoteurl')
424 432 elif deprecatedstore == 'null':
425 433 defaulturl = 'null://'
426 434
427 435 url = util.url(repo.ui.config('lfs', 'url', defaulturl))
428 436 scheme = url.scheme
429 437 if scheme not in _storemap:
430 438 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
431 439 return _storemap[scheme](repo, url)
432 440
433 441 class LfsRemoteError(error.RevlogError):
434 442 pass
General Comments 0
You need to be logged in to leave comments. Login now