##// END OF EJS Templates
lfs: correct the directory list value returned by lfsvfs.walk()...
Matt Harbison -
r35397:c8edeb03 default
parent child Browse files
Show More
@@ -1,381 +1,381
1 1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 2 #
3 3 # Copyright 2017 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import json
11 11 import os
12 12 import re
13 13
14 14 from mercurial.i18n import _
15 15
16 16 from mercurial import (
17 17 error,
18 18 pathutil,
19 19 url as urlmod,
20 20 util,
21 21 vfs as vfsmod,
22 22 )
23 23
24 24 from ..largefiles import lfutil
25 25
26 26 # 64 bytes for SHA256
27 27 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
28 28
29 29 class lfsvfs(vfsmod.vfs):
30 30 def join(self, path):
31 31 """split the path at first two characters, like: XX/XXXXX..."""
32 32 if not _lfsre.match(path):
33 33 raise error.ProgrammingError('unexpected lfs path: %s' % path)
34 34 return super(lfsvfs, self).join(path[0:2], path[2:])
35 35
36 36 def walk(self, path=None, onerror=None):
37 """Yield (dirpath, '', oids) tuple for blobs under path
37 """Yield (dirpath, [], oids) tuple for blobs under path
38 38
39 39 Oids only exist in the root of this vfs, so dirpath is always ''.
40 40 """
41 41 root = os.path.normpath(self.base)
42 42 # when dirpath == root, dirpath[prefixlen:] becomes empty
43 43 # because len(dirpath) < prefixlen.
44 44 prefixlen = len(pathutil.normasprefix(root))
45 45 oids = []
46 46
47 47 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
48 48 onerror=onerror):
49 49 dirpath = dirpath[prefixlen:]
50 50
51 51 # Silently skip unexpected files and directories
52 52 if len(dirpath) == 2:
53 53 oids.extend([dirpath + f for f in files
54 54 if _lfsre.match(dirpath + f)])
55 55
56 yield ('', '', oids)
56 yield ('', [], oids)
57 57
58 58 class filewithprogress(object):
59 59 """a file-like object that supports __len__ and read.
60 60
61 61 Useful to provide progress information for how many bytes are read.
62 62 """
63 63
64 64 def __init__(self, fp, callback):
65 65 self._fp = fp
66 66 self._callback = callback # func(readsize)
67 67 fp.seek(0, os.SEEK_END)
68 68 self._len = fp.tell()
69 69 fp.seek(0)
70 70
71 71 def __len__(self):
72 72 return self._len
73 73
74 74 def read(self, size):
75 75 if self._fp is None:
76 76 return b''
77 77 data = self._fp.read(size)
78 78 if data:
79 79 if self._callback:
80 80 self._callback(len(data))
81 81 else:
82 82 self._fp.close()
83 83 self._fp = None
84 84 return data
85 85
86 86 class local(object):
87 87 """Local blobstore for large file contents.
88 88
89 89 This blobstore is used both as a cache and as a staging area for large blobs
90 90 to be uploaded to the remote blobstore.
91 91 """
92 92
93 93 def __init__(self, repo):
94 94 fullpath = repo.svfs.join('lfs/objects')
95 95 self.vfs = lfsvfs(fullpath)
96 96 usercache = lfutil._usercachedir(repo.ui, 'lfs')
97 97 self.cachevfs = lfsvfs(usercache)
98 98
99 99 def write(self, oid, data):
100 100 """Write blob to local blobstore."""
101 101 with self.vfs(oid, 'wb', atomictemp=True) as fp:
102 102 fp.write(data)
103 103
104 104 # XXX: should we verify the content of the cache, and hardlink back to
105 105 # the local store on success, but truncate, write and link on failure?
106 106 if not self.cachevfs.exists(oid):
107 107 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
108 108
109 109 def read(self, oid):
110 110 """Read blob from local blobstore."""
111 111 if not self.vfs.exists(oid):
112 112 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
113 113 return self.vfs.read(oid)
114 114
115 115 def has(self, oid):
116 116 """Returns True if the local blobstore contains the requested blob,
117 117 False otherwise."""
118 118 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
119 119
120 120 class _gitlfsremote(object):
121 121
122 122 def __init__(self, repo, url):
123 123 ui = repo.ui
124 124 self.ui = ui
125 125 baseurl, authinfo = url.authinfo()
126 126 self.baseurl = baseurl.rstrip('/')
127 127 self.urlopener = urlmod.opener(ui, authinfo)
128 128 self.retry = ui.configint('lfs', 'retry')
129 129
130 130 def writebatch(self, pointers, fromstore):
131 131 """Batch upload from local to remote blobstore."""
132 132 self._batch(pointers, fromstore, 'upload')
133 133
134 134 def readbatch(self, pointers, tostore):
135 135 """Batch download from remote to local blostore."""
136 136 self._batch(pointers, tostore, 'download')
137 137
138 138 def _batchrequest(self, pointers, action):
139 139 """Get metadata about objects pointed by pointers for given action
140 140
141 141 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
142 142 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
143 143 """
144 144 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
145 145 requestdata = json.dumps({
146 146 'objects': objects,
147 147 'operation': action,
148 148 })
149 149 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
150 150 data=requestdata)
151 151 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
152 152 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
153 153 try:
154 154 rawjson = self.urlopener.open(batchreq).read()
155 155 except util.urlerr.httperror as ex:
156 156 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
157 157 % (ex, action))
158 158 try:
159 159 response = json.loads(rawjson)
160 160 except ValueError:
161 161 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
162 162 % rawjson)
163 163 return response
164 164
165 165 def _checkforservererror(self, pointers, responses):
166 166 """Scans errors from objects
167 167
168 168 Returns LfsRemoteError if any objects has an error"""
169 169 for response in responses:
170 170 error = response.get('error')
171 171 if error:
172 172 ptrmap = {p.oid(): p for p in pointers}
173 173 p = ptrmap.get(response['oid'], None)
174 174 if error['code'] == 404 and p:
175 175 filename = getattr(p, 'filename', 'unknown')
176 176 raise LfsRemoteError(
177 177 _(('LFS server error. Remote object '
178 178 'for file %s not found: %r')) % (filename, response))
179 179 raise LfsRemoteError(_('LFS server error: %r') % response)
180 180
181 181 def _extractobjects(self, response, pointers, action):
182 182 """extract objects from response of the batch API
183 183
184 184 response: parsed JSON object returned by batch API
185 185 return response['objects'] filtered by action
186 186 raise if any object has an error
187 187 """
188 188 # Scan errors from objects - fail early
189 189 objects = response.get('objects', [])
190 190 self._checkforservererror(pointers, objects)
191 191
192 192 # Filter objects with given action. Practically, this skips uploading
193 193 # objects which exist in the server.
194 194 filteredobjects = [o for o in objects if action in o.get('actions', [])]
195 195 # But for downloading, we want all objects. Therefore missing objects
196 196 # should be considered an error.
197 197 if action == 'download':
198 198 if len(filteredobjects) < len(objects):
199 199 missing = [o.get('oid', '?')
200 200 for o in objects
201 201 if action not in o.get('actions', [])]
202 202 raise LfsRemoteError(
203 203 _('LFS server claims required objects do not exist:\n%s')
204 204 % '\n'.join(missing))
205 205
206 206 return filteredobjects
207 207
208 208 def _basictransfer(self, obj, action, localstore, progress=None):
209 209 """Download or upload a single object using basic transfer protocol
210 210
211 211 obj: dict, an object description returned by batch API
212 212 action: string, one of ['upload', 'download']
213 213 localstore: blobstore.local
214 214
215 215 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
216 216 basic-transfers.md
217 217 """
218 218 oid = str(obj['oid'])
219 219
220 220 href = str(obj['actions'][action].get('href'))
221 221 headers = obj['actions'][action].get('header', {}).items()
222 222
223 223 request = util.urlreq.request(href)
224 224 if action == 'upload':
225 225 # If uploading blobs, read data from local blobstore.
226 226 request.data = filewithprogress(localstore.vfs(oid), progress)
227 227 request.get_method = lambda: 'PUT'
228 228
229 229 for k, v in headers:
230 230 request.add_header(k, v)
231 231
232 232 response = b''
233 233 try:
234 234 req = self.urlopener.open(request)
235 235 while True:
236 236 data = req.read(1048576)
237 237 if not data:
238 238 break
239 239 if action == 'download' and progress:
240 240 progress(len(data))
241 241 response += data
242 242 except util.urlerr.httperror as ex:
243 243 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
244 244 % (ex, oid, action))
245 245
246 246 if action == 'download':
247 247 # If downloading blobs, store downloaded data to local blobstore
248 248 localstore.write(oid, response)
249 249
250 250 def _batch(self, pointers, localstore, action):
251 251 if action not in ['upload', 'download']:
252 252 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
253 253
254 254 response = self._batchrequest(pointers, action)
255 255 prunningsize = [0]
256 256 objects = self._extractobjects(response, pointers, action)
257 257 total = sum(x.get('size', 0) for x in objects)
258 258 topic = {'upload': _('lfs uploading'),
259 259 'download': _('lfs downloading')}[action]
260 260 if self.ui.verbose and len(objects) > 1:
261 261 self.ui.write(_('lfs: need to transfer %d objects (%s)\n')
262 262 % (len(objects), util.bytecount(total)))
263 263 self.ui.progress(topic, 0, total=total)
264 264 def progress(size):
265 265 # advance progress bar by "size" bytes
266 266 prunningsize[0] += size
267 267 self.ui.progress(topic, prunningsize[0], total=total)
268 268 for obj in sorted(objects, key=lambda o: o.get('oid')):
269 269 objsize = obj.get('size', 0)
270 270 if self.ui.verbose:
271 271 if action == 'download':
272 272 msg = _('lfs: downloading %s (%s)\n')
273 273 elif action == 'upload':
274 274 msg = _('lfs: uploading %s (%s)\n')
275 275 self.ui.write(msg % (obj.get('oid'), util.bytecount(objsize)))
276 276 origrunningsize = prunningsize[0]
277 277 retry = self.retry
278 278 while True:
279 279 prunningsize[0] = origrunningsize
280 280 try:
281 281 self._basictransfer(obj, action, localstore,
282 282 progress=progress)
283 283 break
284 284 except Exception as ex:
285 285 if retry > 0:
286 286 if self.ui.verbose:
287 287 self.ui.write(
288 288 _('lfs: failed: %r (remaining retry %d)\n')
289 289 % (ex, retry))
290 290 retry -= 1
291 291 continue
292 292 raise
293 293
294 294 self.ui.progress(topic, pos=None, total=total)
295 295
296 296 def __del__(self):
297 297 # copied from mercurial/httppeer.py
298 298 urlopener = getattr(self, 'urlopener', None)
299 299 if urlopener:
300 300 for h in urlopener.handlers:
301 301 h.close()
302 302 getattr(h, "close_all", lambda : None)()
303 303
304 304 class _dummyremote(object):
305 305 """Dummy store storing blobs to temp directory."""
306 306
307 307 def __init__(self, repo, url):
308 308 fullpath = repo.vfs.join('lfs', url.path)
309 309 self.vfs = lfsvfs(fullpath)
310 310
311 311 def writebatch(self, pointers, fromstore):
312 312 for p in pointers:
313 313 content = fromstore.read(p.oid())
314 314 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
315 315 fp.write(content)
316 316
317 317 def readbatch(self, pointers, tostore):
318 318 for p in pointers:
319 319 content = self.vfs.read(p.oid())
320 320 tostore.write(p.oid(), content)
321 321
322 322 class _nullremote(object):
323 323 """Null store storing blobs to /dev/null."""
324 324
325 325 def __init__(self, repo, url):
326 326 pass
327 327
328 328 def writebatch(self, pointers, fromstore):
329 329 pass
330 330
331 331 def readbatch(self, pointers, tostore):
332 332 pass
333 333
334 334 class _promptremote(object):
335 335 """Prompt user to set lfs.url when accessed."""
336 336
337 337 def __init__(self, repo, url):
338 338 pass
339 339
340 340 def writebatch(self, pointers, fromstore, ui=None):
341 341 self._prompt()
342 342
343 343 def readbatch(self, pointers, tostore, ui=None):
344 344 self._prompt()
345 345
346 346 def _prompt(self):
347 347 raise error.Abort(_('lfs.url needs to be configured'))
348 348
349 349 _storemap = {
350 350 'https': _gitlfsremote,
351 351 'http': _gitlfsremote,
352 352 'file': _dummyremote,
353 353 'null': _nullremote,
354 354 None: _promptremote,
355 355 }
356 356
357 357 def remote(repo):
358 358 """remotestore factory. return a store in _storemap depending on config"""
359 359 defaulturl = ''
360 360
361 361 # convert deprecated configs to the new url. TODO: remove this if other
362 362 # places are migrated to the new url config.
363 363 # deprecated config: lfs.remotestore
364 364 deprecatedstore = repo.ui.config('lfs', 'remotestore')
365 365 if deprecatedstore == 'dummy':
366 366 # deprecated config: lfs.remotepath
367 367 defaulturl = 'file://' + repo.ui.config('lfs', 'remotepath')
368 368 elif deprecatedstore == 'git-lfs':
369 369 # deprecated config: lfs.remoteurl
370 370 defaulturl = repo.ui.config('lfs', 'remoteurl')
371 371 elif deprecatedstore == 'null':
372 372 defaulturl = 'null://'
373 373
374 374 url = util.url(repo.ui.config('lfs', 'url', defaulturl))
375 375 scheme = url.scheme
376 376 if scheme not in _storemap:
377 377 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
378 378 return _storemap[scheme](repo, url)
379 379
380 380 class LfsRemoteError(error.RevlogError):
381 381 pass
General Comments 0
You need to be logged in to leave comments. Login now