##// END OF EJS Templates
lfs: add a blob verification method to the local store...
Matt Harbison -
r37163:56c7cd06 default
parent child Browse files
Show More
@@ -1,503 +1,514 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import json
11 import json
12 import os
12 import os
13 import re
13 import re
14 import socket
14 import socket
15
15
16 from mercurial.i18n import _
16 from mercurial.i18n import _
17
17
18 from mercurial import (
18 from mercurial import (
19 error,
19 error,
20 pathutil,
20 pathutil,
21 pycompat,
21 pycompat,
22 url as urlmod,
22 url as urlmod,
23 util,
23 util,
24 vfs as vfsmod,
24 vfs as vfsmod,
25 worker,
25 worker,
26 )
26 )
27
27
28 from ..largefiles import lfutil
28 from ..largefiles import lfutil
29
29
30 # 64 bytes for SHA256
30 # 64 bytes for SHA256
31 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
31 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
32
32
33 class lfsvfs(vfsmod.vfs):
33 class lfsvfs(vfsmod.vfs):
34 def join(self, path):
34 def join(self, path):
35 """split the path at first two characters, like: XX/XXXXX..."""
35 """split the path at first two characters, like: XX/XXXXX..."""
36 if not _lfsre.match(path):
36 if not _lfsre.match(path):
37 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 raise error.ProgrammingError('unexpected lfs path: %s' % path)
38 return super(lfsvfs, self).join(path[0:2], path[2:])
38 return super(lfsvfs, self).join(path[0:2], path[2:])
39
39
40 def walk(self, path=None, onerror=None):
40 def walk(self, path=None, onerror=None):
41 """Yield (dirpath, [], oids) tuple for blobs under path
41 """Yield (dirpath, [], oids) tuple for blobs under path
42
42
43 Oids only exist in the root of this vfs, so dirpath is always ''.
43 Oids only exist in the root of this vfs, so dirpath is always ''.
44 """
44 """
45 root = os.path.normpath(self.base)
45 root = os.path.normpath(self.base)
46 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 # when dirpath == root, dirpath[prefixlen:] becomes empty
47 # because len(dirpath) < prefixlen.
47 # because len(dirpath) < prefixlen.
48 prefixlen = len(pathutil.normasprefix(root))
48 prefixlen = len(pathutil.normasprefix(root))
49 oids = []
49 oids = []
50
50
51 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
52 onerror=onerror):
52 onerror=onerror):
53 dirpath = dirpath[prefixlen:]
53 dirpath = dirpath[prefixlen:]
54
54
55 # Silently skip unexpected files and directories
55 # Silently skip unexpected files and directories
56 if len(dirpath) == 2:
56 if len(dirpath) == 2:
57 oids.extend([dirpath + f for f in files
57 oids.extend([dirpath + f for f in files
58 if _lfsre.match(dirpath + f)])
58 if _lfsre.match(dirpath + f)])
59
59
60 yield ('', [], oids)
60 yield ('', [], oids)
61
61
62 class filewithprogress(object):
62 class filewithprogress(object):
63 """a file-like object that supports __len__ and read.
63 """a file-like object that supports __len__ and read.
64
64
65 Useful to provide progress information for how many bytes are read.
65 Useful to provide progress information for how many bytes are read.
66 """
66 """
67
67
68 def __init__(self, fp, callback):
68 def __init__(self, fp, callback):
69 self._fp = fp
69 self._fp = fp
70 self._callback = callback # func(readsize)
70 self._callback = callback # func(readsize)
71 fp.seek(0, os.SEEK_END)
71 fp.seek(0, os.SEEK_END)
72 self._len = fp.tell()
72 self._len = fp.tell()
73 fp.seek(0)
73 fp.seek(0)
74
74
75 def __len__(self):
75 def __len__(self):
76 return self._len
76 return self._len
77
77
78 def read(self, size):
78 def read(self, size):
79 if self._fp is None:
79 if self._fp is None:
80 return b''
80 return b''
81 data = self._fp.read(size)
81 data = self._fp.read(size)
82 if data:
82 if data:
83 if self._callback:
83 if self._callback:
84 self._callback(len(data))
84 self._callback(len(data))
85 else:
85 else:
86 self._fp.close()
86 self._fp.close()
87 self._fp = None
87 self._fp = None
88 return data
88 return data
89
89
90 class local(object):
90 class local(object):
91 """Local blobstore for large file contents.
91 """Local blobstore for large file contents.
92
92
93 This blobstore is used both as a cache and as a staging area for large blobs
93 This blobstore is used both as a cache and as a staging area for large blobs
94 to be uploaded to the remote blobstore.
94 to be uploaded to the remote blobstore.
95 """
95 """
96
96
97 def __init__(self, repo):
97 def __init__(self, repo):
98 fullpath = repo.svfs.join('lfs/objects')
98 fullpath = repo.svfs.join('lfs/objects')
99 self.vfs = lfsvfs(fullpath)
99 self.vfs = lfsvfs(fullpath)
100 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 usercache = lfutil._usercachedir(repo.ui, 'lfs')
101 self.cachevfs = lfsvfs(usercache)
101 self.cachevfs = lfsvfs(usercache)
102 self.ui = repo.ui
102 self.ui = repo.ui
103
103
104 def open(self, oid):
104 def open(self, oid):
105 """Open a read-only file descriptor to the named blob, in either the
105 """Open a read-only file descriptor to the named blob, in either the
106 usercache or the local store."""
106 usercache or the local store."""
107 # The usercache is the most likely place to hold the file. Commit will
107 # The usercache is the most likely place to hold the file. Commit will
108 # write to both it and the local store, as will anything that downloads
108 # write to both it and the local store, as will anything that downloads
109 # the blobs. However, things like clone without an update won't
109 # the blobs. However, things like clone without an update won't
110 # populate the local store. For an init + push of a local clone,
110 # populate the local store. For an init + push of a local clone,
111 # the usercache is the only place it _could_ be. If not present, the
111 # the usercache is the only place it _could_ be. If not present, the
112 # missing file msg here will indicate the local repo, not the usercache.
112 # missing file msg here will indicate the local repo, not the usercache.
113 if self.cachevfs.exists(oid):
113 if self.cachevfs.exists(oid):
114 return self.cachevfs(oid, 'rb')
114 return self.cachevfs(oid, 'rb')
115
115
116 return self.vfs(oid, 'rb')
116 return self.vfs(oid, 'rb')
117
117
118 def download(self, oid, src):
118 def download(self, oid, src):
119 """Read the blob from the remote source in chunks, verify the content,
119 """Read the blob from the remote source in chunks, verify the content,
120 and write to this local blobstore."""
120 and write to this local blobstore."""
121 sha256 = hashlib.sha256()
121 sha256 = hashlib.sha256()
122
122
123 with self.vfs(oid, 'wb', atomictemp=True) as fp:
123 with self.vfs(oid, 'wb', atomictemp=True) as fp:
124 for chunk in util.filechunkiter(src, size=1048576):
124 for chunk in util.filechunkiter(src, size=1048576):
125 fp.write(chunk)
125 fp.write(chunk)
126 sha256.update(chunk)
126 sha256.update(chunk)
127
127
128 realoid = sha256.hexdigest()
128 realoid = sha256.hexdigest()
129 if realoid != oid:
129 if realoid != oid:
130 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
130 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
131
131
132 # XXX: should we verify the content of the cache, and hardlink back to
132 # XXX: should we verify the content of the cache, and hardlink back to
133 # the local store on success, but truncate, write and link on failure?
133 # the local store on success, but truncate, write and link on failure?
134 if not self.cachevfs.exists(oid):
134 if not self.cachevfs.exists(oid):
135 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
135 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
136 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
136 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
137
137
138 def write(self, oid, data):
138 def write(self, oid, data):
139 """Write blob to local blobstore.
139 """Write blob to local blobstore.
140
140
141 This should only be called from the filelog during a commit or similar.
141 This should only be called from the filelog during a commit or similar.
142 As such, there is no need to verify the data. Imports from a remote
142 As such, there is no need to verify the data. Imports from a remote
143 store must use ``download()`` instead."""
143 store must use ``download()`` instead."""
144 with self.vfs(oid, 'wb', atomictemp=True) as fp:
144 with self.vfs(oid, 'wb', atomictemp=True) as fp:
145 fp.write(data)
145 fp.write(data)
146
146
147 # XXX: should we verify the content of the cache, and hardlink back to
147 # XXX: should we verify the content of the cache, and hardlink back to
148 # the local store on success, but truncate, write and link on failure?
148 # the local store on success, but truncate, write and link on failure?
149 if not self.cachevfs.exists(oid):
149 if not self.cachevfs.exists(oid):
150 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
150 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
151 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
151 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
152
152
153 def read(self, oid, verify=True):
153 def read(self, oid, verify=True):
154 """Read blob from local blobstore."""
154 """Read blob from local blobstore."""
155 if not self.vfs.exists(oid):
155 if not self.vfs.exists(oid):
156 blob = self._read(self.cachevfs, oid, verify)
156 blob = self._read(self.cachevfs, oid, verify)
157
157
158 # Even if revlog will verify the content, it needs to be verified
158 # Even if revlog will verify the content, it needs to be verified
159 # now before making the hardlink to avoid propagating corrupt blobs.
159 # now before making the hardlink to avoid propagating corrupt blobs.
160 # Don't abort if corruption is detected, because `hg verify` will
160 # Don't abort if corruption is detected, because `hg verify` will
161 # give more useful info about the corruption- simply don't add the
161 # give more useful info about the corruption- simply don't add the
162 # hardlink.
162 # hardlink.
163 if verify or hashlib.sha256(blob).hexdigest() == oid:
163 if verify or hashlib.sha256(blob).hexdigest() == oid:
164 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
164 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
165 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
165 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
166 else:
166 else:
167 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
167 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
168 blob = self._read(self.vfs, oid, verify)
168 blob = self._read(self.vfs, oid, verify)
169 return blob
169 return blob
170
170
171 def _read(self, vfs, oid, verify):
171 def _read(self, vfs, oid, verify):
172 """Read blob (after verifying) from the given store"""
172 """Read blob (after verifying) from the given store"""
173 blob = vfs.read(oid)
173 blob = vfs.read(oid)
174 if verify:
174 if verify:
175 _verify(oid, blob)
175 _verify(oid, blob)
176 return blob
176 return blob
177
177
178 def verify(self, oid):
179 """Indicate whether or not the hash of the underlying file matches its
180 name."""
181 sha256 = hashlib.sha256()
182
183 with self.open(oid) as fp:
184 for chunk in util.filechunkiter(fp, size=1048576):
185 sha256.update(chunk)
186
187 return oid == sha256.hexdigest()
188
178 def has(self, oid):
189 def has(self, oid):
179 """Returns True if the local blobstore contains the requested blob,
190 """Returns True if the local blobstore contains the requested blob,
180 False otherwise."""
191 False otherwise."""
181 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
192 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
182
193
183 class _gitlfsremote(object):
194 class _gitlfsremote(object):
184
195
185 def __init__(self, repo, url):
196 def __init__(self, repo, url):
186 ui = repo.ui
197 ui = repo.ui
187 self.ui = ui
198 self.ui = ui
188 baseurl, authinfo = url.authinfo()
199 baseurl, authinfo = url.authinfo()
189 self.baseurl = baseurl.rstrip('/')
200 self.baseurl = baseurl.rstrip('/')
190 useragent = repo.ui.config('experimental', 'lfs.user-agent')
201 useragent = repo.ui.config('experimental', 'lfs.user-agent')
191 if not useragent:
202 if not useragent:
192 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
203 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
193 self.urlopener = urlmod.opener(ui, authinfo, useragent)
204 self.urlopener = urlmod.opener(ui, authinfo, useragent)
194 self.retry = ui.configint('lfs', 'retry')
205 self.retry = ui.configint('lfs', 'retry')
195
206
196 def writebatch(self, pointers, fromstore):
207 def writebatch(self, pointers, fromstore):
197 """Batch upload from local to remote blobstore."""
208 """Batch upload from local to remote blobstore."""
198 self._batch(_deduplicate(pointers), fromstore, 'upload')
209 self._batch(_deduplicate(pointers), fromstore, 'upload')
199
210
200 def readbatch(self, pointers, tostore):
211 def readbatch(self, pointers, tostore):
201 """Batch download from remote to local blostore."""
212 """Batch download from remote to local blostore."""
202 self._batch(_deduplicate(pointers), tostore, 'download')
213 self._batch(_deduplicate(pointers), tostore, 'download')
203
214
204 def _batchrequest(self, pointers, action):
215 def _batchrequest(self, pointers, action):
205 """Get metadata about objects pointed by pointers for given action
216 """Get metadata about objects pointed by pointers for given action
206
217
207 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
218 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
208 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
219 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
209 """
220 """
210 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
221 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
211 requestdata = json.dumps({
222 requestdata = json.dumps({
212 'objects': objects,
223 'objects': objects,
213 'operation': action,
224 'operation': action,
214 })
225 })
215 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
226 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
216 data=requestdata)
227 data=requestdata)
217 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
228 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
218 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
229 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
219 try:
230 try:
220 rsp = self.urlopener.open(batchreq)
231 rsp = self.urlopener.open(batchreq)
221 rawjson = rsp.read()
232 rawjson = rsp.read()
222 except util.urlerr.httperror as ex:
233 except util.urlerr.httperror as ex:
223 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
234 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
224 % (ex, action))
235 % (ex, action))
225 try:
236 try:
226 response = json.loads(rawjson)
237 response = json.loads(rawjson)
227 except ValueError:
238 except ValueError:
228 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
239 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
229 % rawjson)
240 % rawjson)
230
241
231 if self.ui.debugflag:
242 if self.ui.debugflag:
232 self.ui.debug('Status: %d\n' % rsp.status)
243 self.ui.debug('Status: %d\n' % rsp.status)
233 # lfs-test-server and hg serve return headers in different order
244 # lfs-test-server and hg serve return headers in different order
234 self.ui.debug('%s\n'
245 self.ui.debug('%s\n'
235 % '\n'.join(sorted(str(rsp.info()).splitlines())))
246 % '\n'.join(sorted(str(rsp.info()).splitlines())))
236
247
237 if 'objects' in response:
248 if 'objects' in response:
238 response['objects'] = sorted(response['objects'],
249 response['objects'] = sorted(response['objects'],
239 key=lambda p: p['oid'])
250 key=lambda p: p['oid'])
240 self.ui.debug('%s\n'
251 self.ui.debug('%s\n'
241 % json.dumps(response, indent=2, sort_keys=True))
252 % json.dumps(response, indent=2, sort_keys=True))
242
253
243 return response
254 return response
244
255
245 def _checkforservererror(self, pointers, responses, action):
256 def _checkforservererror(self, pointers, responses, action):
246 """Scans errors from objects
257 """Scans errors from objects
247
258
248 Raises LfsRemoteError if any objects have an error"""
259 Raises LfsRemoteError if any objects have an error"""
249 for response in responses:
260 for response in responses:
250 # The server should return 404 when objects cannot be found. Some
261 # The server should return 404 when objects cannot be found. Some
251 # server implementation (ex. lfs-test-server) does not set "error"
262 # server implementation (ex. lfs-test-server) does not set "error"
252 # but just removes "download" from "actions". Treat that case
263 # but just removes "download" from "actions". Treat that case
253 # as the same as 404 error.
264 # as the same as 404 error.
254 notfound = (response.get('error', {}).get('code') == 404
265 notfound = (response.get('error', {}).get('code') == 404
255 or (action == 'download'
266 or (action == 'download'
256 and action not in response.get('actions', [])))
267 and action not in response.get('actions', [])))
257 if notfound:
268 if notfound:
258 ptrmap = {p.oid(): p for p in pointers}
269 ptrmap = {p.oid(): p for p in pointers}
259 p = ptrmap.get(response['oid'], None)
270 p = ptrmap.get(response['oid'], None)
260 if p:
271 if p:
261 filename = getattr(p, 'filename', 'unknown')
272 filename = getattr(p, 'filename', 'unknown')
262 raise LfsRemoteError(
273 raise LfsRemoteError(
263 _(('LFS server error. Remote object '
274 _(('LFS server error. Remote object '
264 'for "%s" not found: %r')) % (filename, response))
275 'for "%s" not found: %r')) % (filename, response))
265 else:
276 else:
266 raise LfsRemoteError(
277 raise LfsRemoteError(
267 _('LFS server error. Unsolicited response for oid %s')
278 _('LFS server error. Unsolicited response for oid %s')
268 % response['oid'])
279 % response['oid'])
269 if 'error' in response:
280 if 'error' in response:
270 raise LfsRemoteError(_('LFS server error: %r') % response)
281 raise LfsRemoteError(_('LFS server error: %r') % response)
271
282
272 def _extractobjects(self, response, pointers, action):
283 def _extractobjects(self, response, pointers, action):
273 """extract objects from response of the batch API
284 """extract objects from response of the batch API
274
285
275 response: parsed JSON object returned by batch API
286 response: parsed JSON object returned by batch API
276 return response['objects'] filtered by action
287 return response['objects'] filtered by action
277 raise if any object has an error
288 raise if any object has an error
278 """
289 """
279 # Scan errors from objects - fail early
290 # Scan errors from objects - fail early
280 objects = response.get('objects', [])
291 objects = response.get('objects', [])
281 self._checkforservererror(pointers, objects, action)
292 self._checkforservererror(pointers, objects, action)
282
293
283 # Filter objects with given action. Practically, this skips uploading
294 # Filter objects with given action. Practically, this skips uploading
284 # objects which exist in the server.
295 # objects which exist in the server.
285 filteredobjects = [o for o in objects if action in o.get('actions', [])]
296 filteredobjects = [o for o in objects if action in o.get('actions', [])]
286
297
287 return filteredobjects
298 return filteredobjects
288
299
289 def _basictransfer(self, obj, action, localstore):
300 def _basictransfer(self, obj, action, localstore):
290 """Download or upload a single object using basic transfer protocol
301 """Download or upload a single object using basic transfer protocol
291
302
292 obj: dict, an object description returned by batch API
303 obj: dict, an object description returned by batch API
293 action: string, one of ['upload', 'download']
304 action: string, one of ['upload', 'download']
294 localstore: blobstore.local
305 localstore: blobstore.local
295
306
296 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
307 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
297 basic-transfers.md
308 basic-transfers.md
298 """
309 """
299 oid = pycompat.bytestr(obj['oid'])
310 oid = pycompat.bytestr(obj['oid'])
300
311
301 href = pycompat.bytestr(obj['actions'][action].get('href'))
312 href = pycompat.bytestr(obj['actions'][action].get('href'))
302 headers = obj['actions'][action].get('header', {}).items()
313 headers = obj['actions'][action].get('header', {}).items()
303
314
304 request = util.urlreq.request(href)
315 request = util.urlreq.request(href)
305 if action == 'upload':
316 if action == 'upload':
306 # If uploading blobs, read data from local blobstore.
317 # If uploading blobs, read data from local blobstore.
307 with localstore.open(oid) as fp:
318 with localstore.open(oid) as fp:
308 _verifyfile(oid, fp)
319 _verifyfile(oid, fp)
309 request.data = filewithprogress(localstore.open(oid), None)
320 request.data = filewithprogress(localstore.open(oid), None)
310 request.get_method = lambda: 'PUT'
321 request.get_method = lambda: 'PUT'
311
322
312 for k, v in headers:
323 for k, v in headers:
313 request.add_header(k, v)
324 request.add_header(k, v)
314
325
315 response = b''
326 response = b''
316 try:
327 try:
317 req = self.urlopener.open(request)
328 req = self.urlopener.open(request)
318
329
319 if self.ui.debugflag:
330 if self.ui.debugflag:
320 self.ui.debug('Status: %d\n' % req.status)
331 self.ui.debug('Status: %d\n' % req.status)
321 # lfs-test-server and hg serve return headers in different order
332 # lfs-test-server and hg serve return headers in different order
322 self.ui.debug('%s\n'
333 self.ui.debug('%s\n'
323 % '\n'.join(sorted(str(req.info()).splitlines())))
334 % '\n'.join(sorted(str(req.info()).splitlines())))
324
335
325 if action == 'download':
336 if action == 'download':
326 # If downloading blobs, store downloaded data to local blobstore
337 # If downloading blobs, store downloaded data to local blobstore
327 localstore.download(oid, req)
338 localstore.download(oid, req)
328 else:
339 else:
329 while True:
340 while True:
330 data = req.read(1048576)
341 data = req.read(1048576)
331 if not data:
342 if not data:
332 break
343 break
333 response += data
344 response += data
334 if response:
345 if response:
335 self.ui.debug('lfs %s response: %s' % (action, response))
346 self.ui.debug('lfs %s response: %s' % (action, response))
336 except util.urlerr.httperror as ex:
347 except util.urlerr.httperror as ex:
337 if self.ui.debugflag:
348 if self.ui.debugflag:
338 self.ui.debug('%s: %s\n' % (oid, ex.read()))
349 self.ui.debug('%s: %s\n' % (oid, ex.read()))
339 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
350 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
340 % (ex, oid, action))
351 % (ex, oid, action))
341
352
342 def _batch(self, pointers, localstore, action):
353 def _batch(self, pointers, localstore, action):
343 if action not in ['upload', 'download']:
354 if action not in ['upload', 'download']:
344 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
355 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
345
356
346 response = self._batchrequest(pointers, action)
357 response = self._batchrequest(pointers, action)
347 objects = self._extractobjects(response, pointers, action)
358 objects = self._extractobjects(response, pointers, action)
348 total = sum(x.get('size', 0) for x in objects)
359 total = sum(x.get('size', 0) for x in objects)
349 sizes = {}
360 sizes = {}
350 for obj in objects:
361 for obj in objects:
351 sizes[obj.get('oid')] = obj.get('size', 0)
362 sizes[obj.get('oid')] = obj.get('size', 0)
352 topic = {'upload': _('lfs uploading'),
363 topic = {'upload': _('lfs uploading'),
353 'download': _('lfs downloading')}[action]
364 'download': _('lfs downloading')}[action]
354 if len(objects) > 1:
365 if len(objects) > 1:
355 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
366 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
356 % (len(objects), util.bytecount(total)))
367 % (len(objects), util.bytecount(total)))
357 self.ui.progress(topic, 0, total=total)
368 self.ui.progress(topic, 0, total=total)
358 def transfer(chunk):
369 def transfer(chunk):
359 for obj in chunk:
370 for obj in chunk:
360 objsize = obj.get('size', 0)
371 objsize = obj.get('size', 0)
361 if self.ui.verbose:
372 if self.ui.verbose:
362 if action == 'download':
373 if action == 'download':
363 msg = _('lfs: downloading %s (%s)\n')
374 msg = _('lfs: downloading %s (%s)\n')
364 elif action == 'upload':
375 elif action == 'upload':
365 msg = _('lfs: uploading %s (%s)\n')
376 msg = _('lfs: uploading %s (%s)\n')
366 self.ui.note(msg % (obj.get('oid'),
377 self.ui.note(msg % (obj.get('oid'),
367 util.bytecount(objsize)))
378 util.bytecount(objsize)))
368 retry = self.retry
379 retry = self.retry
369 while True:
380 while True:
370 try:
381 try:
371 self._basictransfer(obj, action, localstore)
382 self._basictransfer(obj, action, localstore)
372 yield 1, obj.get('oid')
383 yield 1, obj.get('oid')
373 break
384 break
374 except socket.error as ex:
385 except socket.error as ex:
375 if retry > 0:
386 if retry > 0:
376 self.ui.note(
387 self.ui.note(
377 _('lfs: failed: %r (remaining retry %d)\n')
388 _('lfs: failed: %r (remaining retry %d)\n')
378 % (ex, retry))
389 % (ex, retry))
379 retry -= 1
390 retry -= 1
380 continue
391 continue
381 raise
392 raise
382
393
383 # Until https multiplexing gets sorted out
394 # Until https multiplexing gets sorted out
384 if self.ui.configbool('experimental', 'lfs.worker-enable'):
395 if self.ui.configbool('experimental', 'lfs.worker-enable'):
385 oids = worker.worker(self.ui, 0.1, transfer, (),
396 oids = worker.worker(self.ui, 0.1, transfer, (),
386 sorted(objects, key=lambda o: o.get('oid')))
397 sorted(objects, key=lambda o: o.get('oid')))
387 else:
398 else:
388 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
399 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
389
400
390 processed = 0
401 processed = 0
391 blobs = 0
402 blobs = 0
392 for _one, oid in oids:
403 for _one, oid in oids:
393 processed += sizes[oid]
404 processed += sizes[oid]
394 blobs += 1
405 blobs += 1
395 self.ui.progress(topic, processed, total=total)
406 self.ui.progress(topic, processed, total=total)
396 self.ui.note(_('lfs: processed: %s\n') % oid)
407 self.ui.note(_('lfs: processed: %s\n') % oid)
397 self.ui.progress(topic, pos=None, total=total)
408 self.ui.progress(topic, pos=None, total=total)
398
409
399 if blobs > 0:
410 if blobs > 0:
400 if action == 'upload':
411 if action == 'upload':
401 self.ui.status(_('lfs: uploaded %d files (%s)\n')
412 self.ui.status(_('lfs: uploaded %d files (%s)\n')
402 % (blobs, util.bytecount(processed)))
413 % (blobs, util.bytecount(processed)))
403 # TODO: coalesce the download requests, and comment this in
414 # TODO: coalesce the download requests, and comment this in
404 #elif action == 'download':
415 #elif action == 'download':
405 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
416 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
406 # % (blobs, util.bytecount(processed)))
417 # % (blobs, util.bytecount(processed)))
407
418
408 def __del__(self):
419 def __del__(self):
409 # copied from mercurial/httppeer.py
420 # copied from mercurial/httppeer.py
410 urlopener = getattr(self, 'urlopener', None)
421 urlopener = getattr(self, 'urlopener', None)
411 if urlopener:
422 if urlopener:
412 for h in urlopener.handlers:
423 for h in urlopener.handlers:
413 h.close()
424 h.close()
414 getattr(h, "close_all", lambda : None)()
425 getattr(h, "close_all", lambda : None)()
415
426
416 class _dummyremote(object):
427 class _dummyremote(object):
417 """Dummy store storing blobs to temp directory."""
428 """Dummy store storing blobs to temp directory."""
418
429
419 def __init__(self, repo, url):
430 def __init__(self, repo, url):
420 fullpath = repo.vfs.join('lfs', url.path)
431 fullpath = repo.vfs.join('lfs', url.path)
421 self.vfs = lfsvfs(fullpath)
432 self.vfs = lfsvfs(fullpath)
422
433
423 def writebatch(self, pointers, fromstore):
434 def writebatch(self, pointers, fromstore):
424 for p in _deduplicate(pointers):
435 for p in _deduplicate(pointers):
425 content = fromstore.read(p.oid(), verify=True)
436 content = fromstore.read(p.oid(), verify=True)
426 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
437 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
427 fp.write(content)
438 fp.write(content)
428
439
429 def readbatch(self, pointers, tostore):
440 def readbatch(self, pointers, tostore):
430 for p in _deduplicate(pointers):
441 for p in _deduplicate(pointers):
431 with self.vfs(p.oid(), 'rb') as fp:
442 with self.vfs(p.oid(), 'rb') as fp:
432 tostore.download(p.oid(), fp)
443 tostore.download(p.oid(), fp)
433
444
434 class _nullremote(object):
445 class _nullremote(object):
435 """Null store storing blobs to /dev/null."""
446 """Null store storing blobs to /dev/null."""
436
447
437 def __init__(self, repo, url):
448 def __init__(self, repo, url):
438 pass
449 pass
439
450
440 def writebatch(self, pointers, fromstore):
451 def writebatch(self, pointers, fromstore):
441 pass
452 pass
442
453
443 def readbatch(self, pointers, tostore):
454 def readbatch(self, pointers, tostore):
444 pass
455 pass
445
456
446 class _promptremote(object):
457 class _promptremote(object):
447 """Prompt user to set lfs.url when accessed."""
458 """Prompt user to set lfs.url when accessed."""
448
459
449 def __init__(self, repo, url):
460 def __init__(self, repo, url):
450 pass
461 pass
451
462
452 def writebatch(self, pointers, fromstore, ui=None):
463 def writebatch(self, pointers, fromstore, ui=None):
453 self._prompt()
464 self._prompt()
454
465
455 def readbatch(self, pointers, tostore, ui=None):
466 def readbatch(self, pointers, tostore, ui=None):
456 self._prompt()
467 self._prompt()
457
468
458 def _prompt(self):
469 def _prompt(self):
459 raise error.Abort(_('lfs.url needs to be configured'))
470 raise error.Abort(_('lfs.url needs to be configured'))
460
471
461 _storemap = {
472 _storemap = {
462 'https': _gitlfsremote,
473 'https': _gitlfsremote,
463 'http': _gitlfsremote,
474 'http': _gitlfsremote,
464 'file': _dummyremote,
475 'file': _dummyremote,
465 'null': _nullremote,
476 'null': _nullremote,
466 None: _promptremote,
477 None: _promptremote,
467 }
478 }
468
479
469 def _deduplicate(pointers):
480 def _deduplicate(pointers):
470 """Remove any duplicate oids that exist in the list"""
481 """Remove any duplicate oids that exist in the list"""
471 reduced = util.sortdict()
482 reduced = util.sortdict()
472 for p in pointers:
483 for p in pointers:
473 reduced[p.oid()] = p
484 reduced[p.oid()] = p
474 return reduced.values()
485 return reduced.values()
475
486
476 def _verify(oid, content):
487 def _verify(oid, content):
477 realoid = hashlib.sha256(content).hexdigest()
488 realoid = hashlib.sha256(content).hexdigest()
478 if realoid != oid:
489 if realoid != oid:
479 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
490 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
480 hint=_('run hg verify'))
491 hint=_('run hg verify'))
481
492
482 def _verifyfile(oid, fp):
493 def _verifyfile(oid, fp):
483 sha256 = hashlib.sha256()
494 sha256 = hashlib.sha256()
484 while True:
495 while True:
485 data = fp.read(1024 * 1024)
496 data = fp.read(1024 * 1024)
486 if not data:
497 if not data:
487 break
498 break
488 sha256.update(data)
499 sha256.update(data)
489 realoid = sha256.hexdigest()
500 realoid = sha256.hexdigest()
490 if realoid != oid:
501 if realoid != oid:
491 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
502 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
492 hint=_('run hg verify'))
503 hint=_('run hg verify'))
493
504
494 def remote(repo):
505 def remote(repo):
495 """remotestore factory. return a store in _storemap depending on config"""
506 """remotestore factory. return a store in _storemap depending on config"""
496 url = util.url(repo.ui.config('lfs', 'url') or '')
507 url = util.url(repo.ui.config('lfs', 'url') or '')
497 scheme = url.scheme
508 scheme = url.scheme
498 if scheme not in _storemap:
509 if scheme not in _storemap:
499 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
510 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
500 return _storemap[scheme](repo, url)
511 return _storemap[scheme](repo, url)
501
512
502 class LfsRemoteError(error.RevlogError):
513 class LfsRemoteError(error.RevlogError):
503 pass
514 pass
General Comments 0
You need to be logged in to leave comments. Login now