##// END OF EJS Templates
lfs: using workers in lfs prefetch...
Wojciech Lis -
r35449:f98fac24 default
parent child Browse files
Show More
@@ -1,381 +1,386 b''
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import json
11 11 import os
12 12 import re
13 13
14 14 from mercurial.i18n import _
15 15
16 16 from mercurial import (
17 17 error,
18 18 pathutil,
19 19 url as urlmod,
20 20 util,
21 21 vfs as vfsmod,
22 worker,
22 23 )
23 24
24 25 from ..largefiles import lfutil
25 26
26 27 # 64 bytes for SHA256
27 28 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
28 29
29 30 class lfsvfs(vfsmod.vfs):
30 31 def join(self, path):
31 32 """split the path at first two characters, like: XX/XXXXX..."""
32 33 if not _lfsre.match(path):
33 34 raise error.ProgrammingError('unexpected lfs path: %s' % path)
34 35 return super(lfsvfs, self).join(path[0:2], path[2:])
35 36
36 37 def walk(self, path=None, onerror=None):
37 38 """Yield (dirpath, [], oids) tuple for blobs under path
38 39
39 40 Oids only exist in the root of this vfs, so dirpath is always ''.
40 41 """
41 42 root = os.path.normpath(self.base)
42 43 # when dirpath == root, dirpath[prefixlen:] becomes empty
43 44 # because len(dirpath) < prefixlen.
44 45 prefixlen = len(pathutil.normasprefix(root))
45 46 oids = []
46 47
47 48 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
48 49 onerror=onerror):
49 50 dirpath = dirpath[prefixlen:]
50 51
51 52 # Silently skip unexpected files and directories
52 53 if len(dirpath) == 2:
53 54 oids.extend([dirpath + f for f in files
54 55 if _lfsre.match(dirpath + f)])
55 56
56 57 yield ('', [], oids)
57 58
58 59 class filewithprogress(object):
59 60 """a file-like object that supports __len__ and read.
60 61
61 62 Useful to provide progress information for how many bytes are read.
62 63 """
63 64
64 65 def __init__(self, fp, callback):
65 66 self._fp = fp
66 67 self._callback = callback # func(readsize)
67 68 fp.seek(0, os.SEEK_END)
68 69 self._len = fp.tell()
69 70 fp.seek(0)
70 71
71 72 def __len__(self):
72 73 return self._len
73 74
74 75 def read(self, size):
75 76 if self._fp is None:
76 77 return b''
77 78 data = self._fp.read(size)
78 79 if data:
79 80 if self._callback:
80 81 self._callback(len(data))
81 82 else:
82 83 self._fp.close()
83 84 self._fp = None
84 85 return data
85 86
86 87 class local(object):
87 88 """Local blobstore for large file contents.
88 89
89 90 This blobstore is used both as a cache and as a staging area for large blobs
90 91 to be uploaded to the remote blobstore.
91 92 """
92 93
93 94 def __init__(self, repo):
94 95 fullpath = repo.svfs.join('lfs/objects')
95 96 self.vfs = lfsvfs(fullpath)
96 97 usercache = lfutil._usercachedir(repo.ui, 'lfs')
97 98 self.cachevfs = lfsvfs(usercache)
98 99
99 100 def write(self, oid, data):
100 101 """Write blob to local blobstore."""
101 102 with self.vfs(oid, 'wb', atomictemp=True) as fp:
102 103 fp.write(data)
103 104
104 105 # XXX: should we verify the content of the cache, and hardlink back to
105 106 # the local store on success, but truncate, write and link on failure?
106 107 if not self.cachevfs.exists(oid):
107 108 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
108 109
109 110 def read(self, oid):
110 111 """Read blob from local blobstore."""
111 112 if not self.vfs.exists(oid):
112 113 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
113 114 return self.vfs.read(oid)
114 115
115 116 def has(self, oid):
116 117 """Returns True if the local blobstore contains the requested blob,
117 118 False otherwise."""
118 119 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
119 120
120 121 class _gitlfsremote(object):
121 122
122 123 def __init__(self, repo, url):
123 124 ui = repo.ui
124 125 self.ui = ui
125 126 baseurl, authinfo = url.authinfo()
126 127 self.baseurl = baseurl.rstrip('/')
127 128 self.urlopener = urlmod.opener(ui, authinfo)
128 129 self.retry = ui.configint('lfs', 'retry')
129 130
130 131 def writebatch(self, pointers, fromstore):
131 132 """Batch upload from local to remote blobstore."""
132 133 self._batch(pointers, fromstore, 'upload')
133 134
134 135 def readbatch(self, pointers, tostore):
135 136 """Batch download from remote to local blostore."""
136 137 self._batch(pointers, tostore, 'download')
137 138
138 139 def _batchrequest(self, pointers, action):
139 140 """Get metadata about objects pointed by pointers for given action
140 141
141 142 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
142 143 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
143 144 """
144 145 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
145 146 requestdata = json.dumps({
146 147 'objects': objects,
147 148 'operation': action,
148 149 })
149 150 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
150 151 data=requestdata)
151 152 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
152 153 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
153 154 try:
154 155 rawjson = self.urlopener.open(batchreq).read()
155 156 except util.urlerr.httperror as ex:
156 157 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
157 158 % (ex, action))
158 159 try:
159 160 response = json.loads(rawjson)
160 161 except ValueError:
161 162 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
162 163 % rawjson)
163 164 return response
164 165
165 166 def _checkforservererror(self, pointers, responses):
166 167 """Scans errors from objects
167 168
168 169 Returns LfsRemoteError if any objects has an error"""
169 170 for response in responses:
170 171 error = response.get('error')
171 172 if error:
172 173 ptrmap = {p.oid(): p for p in pointers}
173 174 p = ptrmap.get(response['oid'], None)
174 175 if error['code'] == 404 and p:
175 176 filename = getattr(p, 'filename', 'unknown')
176 177 raise LfsRemoteError(
177 178 _(('LFS server error. Remote object '
178 179 'for file %s not found: %r')) % (filename, response))
179 180 raise LfsRemoteError(_('LFS server error: %r') % response)
180 181
181 182 def _extractobjects(self, response, pointers, action):
182 183 """extract objects from response of the batch API
183 184
184 185 response: parsed JSON object returned by batch API
185 186 return response['objects'] filtered by action
186 187 raise if any object has an error
187 188 """
188 189 # Scan errors from objects - fail early
189 190 objects = response.get('objects', [])
190 191 self._checkforservererror(pointers, objects)
191 192
192 193 # Filter objects with given action. Practically, this skips uploading
193 194 # objects which exist in the server.
194 195 filteredobjects = [o for o in objects if action in o.get('actions', [])]
195 196 # But for downloading, we want all objects. Therefore missing objects
196 197 # should be considered an error.
197 198 if action == 'download':
198 199 if len(filteredobjects) < len(objects):
199 200 missing = [o.get('oid', '?')
200 201 for o in objects
201 202 if action not in o.get('actions', [])]
202 203 raise LfsRemoteError(
203 204 _('LFS server claims required objects do not exist:\n%s')
204 205 % '\n'.join(missing))
205 206
206 207 return filteredobjects
207 208
208 def _basictransfer(self, obj, action, localstore, progress=None):
209 def _basictransfer(self, obj, action, localstore):
209 210 """Download or upload a single object using basic transfer protocol
210 211
211 212 obj: dict, an object description returned by batch API
212 213 action: string, one of ['upload', 'download']
213 214 localstore: blobstore.local
214 215
215 216 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
216 217 basic-transfers.md
217 218 """
218 219 oid = str(obj['oid'])
219 220
220 221 href = str(obj['actions'][action].get('href'))
221 222 headers = obj['actions'][action].get('header', {}).items()
222 223
223 224 request = util.urlreq.request(href)
224 225 if action == 'upload':
225 226 # If uploading blobs, read data from local blobstore.
226 request.data = filewithprogress(localstore.vfs(oid), progress)
227 request.data = filewithprogress(localstore.vfs(oid), None)
227 228 request.get_method = lambda: 'PUT'
228 229
229 230 for k, v in headers:
230 231 request.add_header(k, v)
231 232
232 233 response = b''
233 234 try:
234 235 req = self.urlopener.open(request)
235 236 while True:
236 237 data = req.read(1048576)
237 238 if not data:
238 239 break
239 if action == 'download' and progress:
240 progress(len(data))
241 240 response += data
242 241 except util.urlerr.httperror as ex:
243 242 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
244 243 % (ex, oid, action))
245 244
246 245 if action == 'download':
247 246 # If downloading blobs, store downloaded data to local blobstore
248 247 localstore.write(oid, response)
249 248
250 249 def _batch(self, pointers, localstore, action):
251 250 if action not in ['upload', 'download']:
252 251 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
253 252
254 253 response = self._batchrequest(pointers, action)
255 prunningsize = [0]
256 254 objects = self._extractobjects(response, pointers, action)
257 255 total = sum(x.get('size', 0) for x in objects)
256 sizes = {}
257 for obj in objects:
258 sizes[obj.get('oid')] = obj.get('size', 0)
258 259 topic = {'upload': _('lfs uploading'),
259 260 'download': _('lfs downloading')}[action]
260 261 if self.ui.verbose and len(objects) > 1:
261 262 self.ui.write(_('lfs: need to transfer %d objects (%s)\n')
262 263 % (len(objects), util.bytecount(total)))
263 264 self.ui.progress(topic, 0, total=total)
264 def progress(size):
265 # advance progress bar by "size" bytes
266 prunningsize[0] += size
267 self.ui.progress(topic, prunningsize[0], total=total)
268 for obj in sorted(objects, key=lambda o: o.get('oid')):
269 objsize = obj.get('size', 0)
265 def transfer(chunk):
266 for obj in chunk:
267 objsize = obj.get('size', 0)
268 if self.ui.verbose:
269 if action == 'download':
270 msg = _('lfs: downloading %s (%s)\n')
271 elif action == 'upload':
272 msg = _('lfs: uploading %s (%s)\n')
273 self.ui.write(msg % (obj.get('oid'),
274 util.bytecount(objsize)))
275 retry = self.retry
276 while True:
277 try:
278 self._basictransfer(obj, action, localstore)
279 yield 1, obj.get('oid')
280 break
281 except Exception as ex:
282 if retry > 0:
283 if self.ui.verbose:
284 self.ui.write(
285 _('lfs: failed: %r (remaining retry %d)\n')
286 % (ex, retry))
287 retry -= 1
288 continue
289 raise
290
291 oids = worker.worker(self.ui, 0.1, transfer, (),
292 sorted(objects, key=lambda o: o.get('oid')))
293 processed = 0
294 for _one, oid in oids:
295 processed += sizes[oid]
296 self.ui.progress(topic, processed, total=total)
270 297 if self.ui.verbose:
271 if action == 'download':
272 msg = _('lfs: downloading %s (%s)\n')
273 elif action == 'upload':
274 msg = _('lfs: uploading %s (%s)\n')
275 self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize)))
276 origrunningsize = prunningsize[0]
277 retry = self.retry
278 while True:
279 prunningsize[0] = origrunningsize
280 try:
281 self._basictransfer(obj, action, localstore,
282 progress=progress)
283 break
284 except Exception as ex:
285 if retry > 0:
286 if self.ui.verbose:
287 self.ui.write(
288 _('lfs: failed: %r (remaining retry %d)\n')
289 % (ex, retry))
290 retry -= 1
291 continue
292 raise
293
298 self.ui.write(_('lfs: processed: %s\n') % oid)
294 299 self.ui.progress(topic, pos=None, total=total)
295 300
296 301 def __del__(self):
297 302 # copied from mercurial/httppeer.py
298 303 urlopener = getattr(self, 'urlopener', None)
299 304 if urlopener:
300 305 for h in urlopener.handlers:
301 306 h.close()
302 307 getattr(h, "close_all", lambda : None)()
303 308
304 309 class _dummyremote(object):
305 310 """Dummy store storing blobs to temp directory."""
306 311
307 312 def __init__(self, repo, url):
308 313 fullpath = repo.vfs.join('lfs', url.path)
309 314 self.vfs = lfsvfs(fullpath)
310 315
311 316 def writebatch(self, pointers, fromstore):
312 317 for p in pointers:
313 318 content = fromstore.read(p.oid())
314 319 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
315 320 fp.write(content)
316 321
317 322 def readbatch(self, pointers, tostore):
318 323 for p in pointers:
319 324 content = self.vfs.read(p.oid())
320 325 tostore.write(p.oid(), content)
321 326
322 327 class _nullremote(object):
323 328 """Null store storing blobs to /dev/null."""
324 329
325 330 def __init__(self, repo, url):
326 331 pass
327 332
328 333 def writebatch(self, pointers, fromstore):
329 334 pass
330 335
331 336 def readbatch(self, pointers, tostore):
332 337 pass
333 338
334 339 class _promptremote(object):
335 340 """Prompt user to set lfs.url when accessed."""
336 341
337 342 def __init__(self, repo, url):
338 343 pass
339 344
340 345 def writebatch(self, pointers, fromstore, ui=None):
341 346 self._prompt()
342 347
343 348 def readbatch(self, pointers, tostore, ui=None):
344 349 self._prompt()
345 350
346 351 def _prompt(self):
347 352 raise error.Abort(_('lfs.url needs to be configured'))
348 353
349 354 _storemap = {
350 355 'https': _gitlfsremote,
351 356 'http': _gitlfsremote,
352 357 'file': _dummyremote,
353 358 'null': _nullremote,
354 359 None: _promptremote,
355 360 }
356 361
357 362 def remote(repo):
358 363 """remotestore factory. return a store in _storemap depending on config"""
359 364 defaulturl = ''
360 365
361 366 # convert deprecated configs to the new url. TODO: remove this if other
362 367 # places are migrated to the new url config.
363 368 # deprecated config: lfs.remotestore
364 369 deprecatedstore = repo.ui.config('lfs', 'remotestore')
365 370 if deprecatedstore == 'dummy':
366 371 # deprecated config: lfs.remotepath
367 372 defaulturl = 'file://' + repo.ui.config('lfs', 'remotepath')
368 373 elif deprecatedstore == 'git-lfs':
369 374 # deprecated config: lfs.remoteurl
370 375 defaulturl = repo.ui.config('lfs', 'remoteurl')
371 376 elif deprecatedstore == 'null':
372 377 defaulturl = 'null://'
373 378
374 379 url = util.url(repo.ui.config('lfs', 'url', defaulturl))
375 380 scheme = url.scheme
376 381 if scheme not in _storemap:
377 382 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
378 383 return _storemap[scheme](repo, url)
379 384
380 385 class LfsRemoteError(error.RevlogError):
381 386 pass
@@ -1,132 +1,138 b''
1 1 #require lfs-test-server
2 2
3 3 $ LFS_LISTEN="tcp://:$HGPORT"
4 4 $ LFS_HOST="localhost:$HGPORT"
5 5 $ LFS_PUBLIC=1
6 6 $ export LFS_LISTEN LFS_HOST LFS_PUBLIC
7 7 #if no-windows
8 8 $ lfs-test-server &> lfs-server.log &
9 9 $ echo $! >> $DAEMON_PIDS
10 10 #else
11 11 $ cat >> $TESTTMP/spawn.py <<EOF
12 12 > import os
13 13 > import subprocess
14 14 > import sys
15 15 >
16 16 > for path in os.environ["PATH"].split(os.pathsep):
17 17 > exe = os.path.join(path, 'lfs-test-server.exe')
18 18 > if os.path.exists(exe):
19 19 > with open('lfs-server.log', 'wb') as out:
20 20 > p = subprocess.Popen(exe, stdout=out, stderr=out)
21 21 > sys.stdout.write('%s\n' % p.pid)
22 22 > sys.exit(0)
23 23 > sys.exit(1)
24 24 > EOF
25 25 $ $PYTHON $TESTTMP/spawn.py >> $DAEMON_PIDS
26 26 #endif
27 27
28 28 $ cat >> $HGRCPATH <<EOF
29 29 > [extensions]
30 30 > lfs=
31 31 > [lfs]
32 32 > url=http://foo:bar@$LFS_HOST/
33 33 > threshold=1
34 34 > EOF
35 35
36 36 $ hg init repo1
37 37 $ cd repo1
38 38 $ echo THIS-IS-LFS > a
39 39 $ hg commit -m a -A a
40 40
41 41 $ hg init ../repo2
42 42 $ hg push ../repo2 -v
43 43 pushing to ../repo2
44 44 searching for changes
45 45 lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
46 lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
46 47 1 changesets found
47 48 uncompressed size of bundle content:
48 49 * (changelog) (glob)
49 50 * (manifests) (glob)
50 51 * a (glob)
51 52 adding changesets
52 53 adding manifests
53 54 adding file changes
54 55 added 1 changesets with 1 changes to 1 files
55 56
56 57 Clear the cache to force a download
57 58 $ rm -rf `hg config lfs.usercache`
58 59 $ cd ../repo2
59 60 $ hg update tip -v
60 61 resolving manifests
61 62 getting a
62 63 lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
64 lfs: processed: 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b
63 65 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
64 66
65 67 When the server has some blobs already
66 68
67 69 $ hg mv a b
68 70 $ echo ANOTHER-LARGE-FILE > c
69 71 $ echo ANOTHER-LARGE-FILE2 > d
70 72 $ hg commit -m b-and-c -A b c d
71 73 $ hg push ../repo1 -v | grep -v '^ '
72 74 pushing to ../repo1
73 75 searching for changes
74 76 lfs: need to transfer 2 objects (39 bytes)
75 77 lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
78 lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
76 79 lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
80 lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
77 81 1 changesets found
78 82 uncompressed size of bundle content:
79 83 adding changesets
80 84 adding manifests
81 85 adding file changes
82 86 added 1 changesets with 3 changes to 3 files
83 87
84 88 Clear the cache to force a download
85 89 $ rm -rf `hg config lfs.usercache`
86 90 $ hg --repo ../repo1 update tip -v
87 91 resolving manifests
88 92 getting b
89 93 getting c
90 94 lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
95 lfs: processed: d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998
91 96 getting d
92 97 lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
98 lfs: processed: 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19
93 99 3 files updated, 0 files merged, 0 files removed, 0 files unresolved
94 100
95 101 Check error message when the remote missed a blob:
96 102
97 103 $ echo FFFFF > b
98 104 $ hg commit -m b -A b
99 105 $ echo FFFFF >> b
100 106 $ hg commit -m b b
101 107 $ rm -rf .hg/store/lfs
102 108 $ rm -rf `hg config lfs.usercache`
103 109 $ hg update -C '.^'
104 110 abort: LFS server claims required objects do not exist:
105 111 8e6ea5f6c066b44a0efa43bcce86aea73f17e6e23f0663df0251e7524e140a13!
106 112 [255]
107 113
108 114 Check error message when object does not exist:
109 115
110 116 $ hg init test && cd test
111 117 $ echo "[extensions]" >> .hg/hgrc
112 118 $ echo "lfs=" >> .hg/hgrc
113 119 $ echo "[lfs]" >> .hg/hgrc
114 120 $ echo "threshold=1" >> .hg/hgrc
115 121 $ echo a > a
116 122 $ hg add a
117 123 $ hg commit -m 'test'
118 124 $ echo aaaaa > a
119 125 $ hg commit -m 'largefile'
120 126 $ hg debugdata .hg/store/data/a.i 1 # verify this is no the file content but includes "oid", the LFS "pointer".
121 127 version https://git-lfs.github.com/spec/v1
122 128 oid sha256:bdc26931acfb734b142a8d675f205becf27560dc461f501822de13274fe6fc8a
123 129 size 6
124 130 x-is-binary 0
125 131 $ cd ..
126 132 $ rm -rf `hg config lfs.usercache`
127 133 $ hg --config 'lfs.url=https://dewey-lfs.vip.facebook.com/lfs' clone test test2
128 134 updating to branch default
129 135 abort: LFS server error. Remote object for file data/a.i not found:(.*)! (re)
130 136 [255]
131 137
132 138 $ $PYTHON $RUNTESTDIR/killdaemons.py $DAEMON_PIDS
General Comments 0
You need to be logged in to leave comments. Login now