##// END OF EJS Templates
lfs: use progess helper...
Martin von Zweigbergk -
r38424:76a08cec default
parent child Browse files
Show More
@@ -1,580 +1,581 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import hashlib
11 import hashlib
12 import json
12 import json
13 import os
13 import os
14 import re
14 import re
15 import socket
15 import socket
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18
18
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 pathutil,
21 pathutil,
22 pycompat,
22 pycompat,
23 url as urlmod,
23 url as urlmod,
24 util,
24 util,
25 vfs as vfsmod,
25 vfs as vfsmod,
26 worker,
26 worker,
27 )
27 )
28
28
29 from ..largefiles import lfutil
29 from ..largefiles import lfutil
30
30
31 # 64 bytes for SHA256
31 # 64 bytes for SHA256
32 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
32 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
33
33
34 class lfsvfs(vfsmod.vfs):
34 class lfsvfs(vfsmod.vfs):
35 def join(self, path):
35 def join(self, path):
36 """split the path at first two characters, like: XX/XXXXX..."""
36 """split the path at first two characters, like: XX/XXXXX..."""
37 if not _lfsre.match(path):
37 if not _lfsre.match(path):
38 raise error.ProgrammingError('unexpected lfs path: %s' % path)
38 raise error.ProgrammingError('unexpected lfs path: %s' % path)
39 return super(lfsvfs, self).join(path[0:2], path[2:])
39 return super(lfsvfs, self).join(path[0:2], path[2:])
40
40
41 def walk(self, path=None, onerror=None):
41 def walk(self, path=None, onerror=None):
42 """Yield (dirpath, [], oids) tuple for blobs under path
42 """Yield (dirpath, [], oids) tuple for blobs under path
43
43
44 Oids only exist in the root of this vfs, so dirpath is always ''.
44 Oids only exist in the root of this vfs, so dirpath is always ''.
45 """
45 """
46 root = os.path.normpath(self.base)
46 root = os.path.normpath(self.base)
47 # when dirpath == root, dirpath[prefixlen:] becomes empty
47 # when dirpath == root, dirpath[prefixlen:] becomes empty
48 # because len(dirpath) < prefixlen.
48 # because len(dirpath) < prefixlen.
49 prefixlen = len(pathutil.normasprefix(root))
49 prefixlen = len(pathutil.normasprefix(root))
50 oids = []
50 oids = []
51
51
52 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
52 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
53 onerror=onerror):
53 onerror=onerror):
54 dirpath = dirpath[prefixlen:]
54 dirpath = dirpath[prefixlen:]
55
55
56 # Silently skip unexpected files and directories
56 # Silently skip unexpected files and directories
57 if len(dirpath) == 2:
57 if len(dirpath) == 2:
58 oids.extend([dirpath + f for f in files
58 oids.extend([dirpath + f for f in files
59 if _lfsre.match(dirpath + f)])
59 if _lfsre.match(dirpath + f)])
60
60
61 yield ('', [], oids)
61 yield ('', [], oids)
62
62
63 class nullvfs(lfsvfs):
63 class nullvfs(lfsvfs):
64 def __init__(self):
64 def __init__(self):
65 pass
65 pass
66
66
67 def exists(self, oid):
67 def exists(self, oid):
68 return False
68 return False
69
69
70 def read(self, oid):
70 def read(self, oid):
71 # store.read() calls into here if the blob doesn't exist in its
71 # store.read() calls into here if the blob doesn't exist in its
72 # self.vfs. Raise the same error as a normal vfs when asked to read a
72 # self.vfs. Raise the same error as a normal vfs when asked to read a
73 # file that doesn't exist. The only difference is the full file path
73 # file that doesn't exist. The only difference is the full file path
74 # isn't available in the error.
74 # isn't available in the error.
75 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
75 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
76
76
77 def walk(self, path=None, onerror=None):
77 def walk(self, path=None, onerror=None):
78 return ('', [], [])
78 return ('', [], [])
79
79
80 def write(self, oid, data):
80 def write(self, oid, data):
81 pass
81 pass
82
82
83 class filewithprogress(object):
83 class filewithprogress(object):
84 """a file-like object that supports __len__ and read.
84 """a file-like object that supports __len__ and read.
85
85
86 Useful to provide progress information for how many bytes are read.
86 Useful to provide progress information for how many bytes are read.
87 """
87 """
88
88
89 def __init__(self, fp, callback):
89 def __init__(self, fp, callback):
90 self._fp = fp
90 self._fp = fp
91 self._callback = callback # func(readsize)
91 self._callback = callback # func(readsize)
92 fp.seek(0, os.SEEK_END)
92 fp.seek(0, os.SEEK_END)
93 self._len = fp.tell()
93 self._len = fp.tell()
94 fp.seek(0)
94 fp.seek(0)
95
95
96 def __len__(self):
96 def __len__(self):
97 return self._len
97 return self._len
98
98
99 def read(self, size):
99 def read(self, size):
100 if self._fp is None:
100 if self._fp is None:
101 return b''
101 return b''
102 data = self._fp.read(size)
102 data = self._fp.read(size)
103 if data:
103 if data:
104 if self._callback:
104 if self._callback:
105 self._callback(len(data))
105 self._callback(len(data))
106 else:
106 else:
107 self._fp.close()
107 self._fp.close()
108 self._fp = None
108 self._fp = None
109 return data
109 return data
110
110
111 class local(object):
111 class local(object):
112 """Local blobstore for large file contents.
112 """Local blobstore for large file contents.
113
113
114 This blobstore is used both as a cache and as a staging area for large blobs
114 This blobstore is used both as a cache and as a staging area for large blobs
115 to be uploaded to the remote blobstore.
115 to be uploaded to the remote blobstore.
116 """
116 """
117
117
118 def __init__(self, repo):
118 def __init__(self, repo):
119 fullpath = repo.svfs.join('lfs/objects')
119 fullpath = repo.svfs.join('lfs/objects')
120 self.vfs = lfsvfs(fullpath)
120 self.vfs = lfsvfs(fullpath)
121
121
122 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
122 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
123 self.cachevfs = nullvfs()
123 self.cachevfs = nullvfs()
124 else:
124 else:
125 usercache = lfutil._usercachedir(repo.ui, 'lfs')
125 usercache = lfutil._usercachedir(repo.ui, 'lfs')
126 self.cachevfs = lfsvfs(usercache)
126 self.cachevfs = lfsvfs(usercache)
127 self.ui = repo.ui
127 self.ui = repo.ui
128
128
129 def open(self, oid):
129 def open(self, oid):
130 """Open a read-only file descriptor to the named blob, in either the
130 """Open a read-only file descriptor to the named blob, in either the
131 usercache or the local store."""
131 usercache or the local store."""
132 # The usercache is the most likely place to hold the file. Commit will
132 # The usercache is the most likely place to hold the file. Commit will
133 # write to both it and the local store, as will anything that downloads
133 # write to both it and the local store, as will anything that downloads
134 # the blobs. However, things like clone without an update won't
134 # the blobs. However, things like clone without an update won't
135 # populate the local store. For an init + push of a local clone,
135 # populate the local store. For an init + push of a local clone,
136 # the usercache is the only place it _could_ be. If not present, the
136 # the usercache is the only place it _could_ be. If not present, the
137 # missing file msg here will indicate the local repo, not the usercache.
137 # missing file msg here will indicate the local repo, not the usercache.
138 if self.cachevfs.exists(oid):
138 if self.cachevfs.exists(oid):
139 return self.cachevfs(oid, 'rb')
139 return self.cachevfs(oid, 'rb')
140
140
141 return self.vfs(oid, 'rb')
141 return self.vfs(oid, 'rb')
142
142
143 def download(self, oid, src):
143 def download(self, oid, src):
144 """Read the blob from the remote source in chunks, verify the content,
144 """Read the blob from the remote source in chunks, verify the content,
145 and write to this local blobstore."""
145 and write to this local blobstore."""
146 sha256 = hashlib.sha256()
146 sha256 = hashlib.sha256()
147
147
148 with self.vfs(oid, 'wb', atomictemp=True) as fp:
148 with self.vfs(oid, 'wb', atomictemp=True) as fp:
149 for chunk in util.filechunkiter(src, size=1048576):
149 for chunk in util.filechunkiter(src, size=1048576):
150 fp.write(chunk)
150 fp.write(chunk)
151 sha256.update(chunk)
151 sha256.update(chunk)
152
152
153 realoid = sha256.hexdigest()
153 realoid = sha256.hexdigest()
154 if realoid != oid:
154 if realoid != oid:
155 raise LfsCorruptionError(_('corrupt remote lfs object: %s')
155 raise LfsCorruptionError(_('corrupt remote lfs object: %s')
156 % oid)
156 % oid)
157
157
158 self._linktousercache(oid)
158 self._linktousercache(oid)
159
159
160 def write(self, oid, data):
160 def write(self, oid, data):
161 """Write blob to local blobstore.
161 """Write blob to local blobstore.
162
162
163 This should only be called from the filelog during a commit or similar.
163 This should only be called from the filelog during a commit or similar.
164 As such, there is no need to verify the data. Imports from a remote
164 As such, there is no need to verify the data. Imports from a remote
165 store must use ``download()`` instead."""
165 store must use ``download()`` instead."""
166 with self.vfs(oid, 'wb', atomictemp=True) as fp:
166 with self.vfs(oid, 'wb', atomictemp=True) as fp:
167 fp.write(data)
167 fp.write(data)
168
168
169 self._linktousercache(oid)
169 self._linktousercache(oid)
170
170
171 def _linktousercache(self, oid):
171 def _linktousercache(self, oid):
172 # XXX: should we verify the content of the cache, and hardlink back to
172 # XXX: should we verify the content of the cache, and hardlink back to
173 # the local store on success, but truncate, write and link on failure?
173 # the local store on success, but truncate, write and link on failure?
174 if (not self.cachevfs.exists(oid)
174 if (not self.cachevfs.exists(oid)
175 and not isinstance(self.cachevfs, nullvfs)):
175 and not isinstance(self.cachevfs, nullvfs)):
176 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
176 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
177 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
177 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
178
178
179 def read(self, oid, verify=True):
179 def read(self, oid, verify=True):
180 """Read blob from local blobstore."""
180 """Read blob from local blobstore."""
181 if not self.vfs.exists(oid):
181 if not self.vfs.exists(oid):
182 blob = self._read(self.cachevfs, oid, verify)
182 blob = self._read(self.cachevfs, oid, verify)
183
183
184 # Even if revlog will verify the content, it needs to be verified
184 # Even if revlog will verify the content, it needs to be verified
185 # now before making the hardlink to avoid propagating corrupt blobs.
185 # now before making the hardlink to avoid propagating corrupt blobs.
186 # Don't abort if corruption is detected, because `hg verify` will
186 # Don't abort if corruption is detected, because `hg verify` will
187 # give more useful info about the corruption- simply don't add the
187 # give more useful info about the corruption- simply don't add the
188 # hardlink.
188 # hardlink.
189 if verify or hashlib.sha256(blob).hexdigest() == oid:
189 if verify or hashlib.sha256(blob).hexdigest() == oid:
190 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
190 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
191 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
191 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
192 else:
192 else:
193 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
193 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
194 blob = self._read(self.vfs, oid, verify)
194 blob = self._read(self.vfs, oid, verify)
195 return blob
195 return blob
196
196
197 def _read(self, vfs, oid, verify):
197 def _read(self, vfs, oid, verify):
198 """Read blob (after verifying) from the given store"""
198 """Read blob (after verifying) from the given store"""
199 blob = vfs.read(oid)
199 blob = vfs.read(oid)
200 if verify:
200 if verify:
201 _verify(oid, blob)
201 _verify(oid, blob)
202 return blob
202 return blob
203
203
204 def verify(self, oid):
204 def verify(self, oid):
205 """Indicate whether or not the hash of the underlying file matches its
205 """Indicate whether or not the hash of the underlying file matches its
206 name."""
206 name."""
207 sha256 = hashlib.sha256()
207 sha256 = hashlib.sha256()
208
208
209 with self.open(oid) as fp:
209 with self.open(oid) as fp:
210 for chunk in util.filechunkiter(fp, size=1048576):
210 for chunk in util.filechunkiter(fp, size=1048576):
211 sha256.update(chunk)
211 sha256.update(chunk)
212
212
213 return oid == sha256.hexdigest()
213 return oid == sha256.hexdigest()
214
214
215 def has(self, oid):
215 def has(self, oid):
216 """Returns True if the local blobstore contains the requested blob,
216 """Returns True if the local blobstore contains the requested blob,
217 False otherwise."""
217 False otherwise."""
218 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
218 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
219
219
220 class _gitlfsremote(object):
220 class _gitlfsremote(object):
221
221
222 def __init__(self, repo, url):
222 def __init__(self, repo, url):
223 ui = repo.ui
223 ui = repo.ui
224 self.ui = ui
224 self.ui = ui
225 baseurl, authinfo = url.authinfo()
225 baseurl, authinfo = url.authinfo()
226 self.baseurl = baseurl.rstrip('/')
226 self.baseurl = baseurl.rstrip('/')
227 useragent = repo.ui.config('experimental', 'lfs.user-agent')
227 useragent = repo.ui.config('experimental', 'lfs.user-agent')
228 if not useragent:
228 if not useragent:
229 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
229 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
230 self.urlopener = urlmod.opener(ui, authinfo, useragent)
230 self.urlopener = urlmod.opener(ui, authinfo, useragent)
231 self.retry = ui.configint('lfs', 'retry')
231 self.retry = ui.configint('lfs', 'retry')
232
232
233 def writebatch(self, pointers, fromstore):
233 def writebatch(self, pointers, fromstore):
234 """Batch upload from local to remote blobstore."""
234 """Batch upload from local to remote blobstore."""
235 self._batch(_deduplicate(pointers), fromstore, 'upload')
235 self._batch(_deduplicate(pointers), fromstore, 'upload')
236
236
237 def readbatch(self, pointers, tostore):
237 def readbatch(self, pointers, tostore):
238 """Batch download from remote to local blostore."""
238 """Batch download from remote to local blostore."""
239 self._batch(_deduplicate(pointers), tostore, 'download')
239 self._batch(_deduplicate(pointers), tostore, 'download')
240
240
241 def _batchrequest(self, pointers, action):
241 def _batchrequest(self, pointers, action):
242 """Get metadata about objects pointed by pointers for given action
242 """Get metadata about objects pointed by pointers for given action
243
243
244 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
244 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
245 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
245 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
246 """
246 """
247 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
247 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
248 requestdata = json.dumps({
248 requestdata = json.dumps({
249 'objects': objects,
249 'objects': objects,
250 'operation': action,
250 'operation': action,
251 })
251 })
252 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
252 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
253 data=requestdata)
253 data=requestdata)
254 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
254 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
255 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
255 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
256 try:
256 try:
257 rsp = self.urlopener.open(batchreq)
257 rsp = self.urlopener.open(batchreq)
258 rawjson = rsp.read()
258 rawjson = rsp.read()
259 except util.urlerr.httperror as ex:
259 except util.urlerr.httperror as ex:
260 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
260 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
261 % (ex, action))
261 % (ex, action))
262 try:
262 try:
263 response = json.loads(rawjson)
263 response = json.loads(rawjson)
264 except ValueError:
264 except ValueError:
265 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
265 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
266 % rawjson)
266 % rawjson)
267
267
268 if self.ui.debugflag:
268 if self.ui.debugflag:
269 self.ui.debug('Status: %d\n' % rsp.status)
269 self.ui.debug('Status: %d\n' % rsp.status)
270 # lfs-test-server and hg serve return headers in different order
270 # lfs-test-server and hg serve return headers in different order
271 self.ui.debug('%s\n'
271 self.ui.debug('%s\n'
272 % '\n'.join(sorted(str(rsp.info()).splitlines())))
272 % '\n'.join(sorted(str(rsp.info()).splitlines())))
273
273
274 if 'objects' in response:
274 if 'objects' in response:
275 response['objects'] = sorted(response['objects'],
275 response['objects'] = sorted(response['objects'],
276 key=lambda p: p['oid'])
276 key=lambda p: p['oid'])
277 self.ui.debug('%s\n'
277 self.ui.debug('%s\n'
278 % json.dumps(response, indent=2,
278 % json.dumps(response, indent=2,
279 separators=('', ': '), sort_keys=True))
279 separators=('', ': '), sort_keys=True))
280
280
281 return response
281 return response
282
282
283 def _checkforservererror(self, pointers, responses, action):
283 def _checkforservererror(self, pointers, responses, action):
284 """Scans errors from objects
284 """Scans errors from objects
285
285
286 Raises LfsRemoteError if any objects have an error"""
286 Raises LfsRemoteError if any objects have an error"""
287 for response in responses:
287 for response in responses:
288 # The server should return 404 when objects cannot be found. Some
288 # The server should return 404 when objects cannot be found. Some
289 # server implementation (ex. lfs-test-server) does not set "error"
289 # server implementation (ex. lfs-test-server) does not set "error"
290 # but just removes "download" from "actions". Treat that case
290 # but just removes "download" from "actions". Treat that case
291 # as the same as 404 error.
291 # as the same as 404 error.
292 if 'error' not in response:
292 if 'error' not in response:
293 if (action == 'download'
293 if (action == 'download'
294 and action not in response.get('actions', [])):
294 and action not in response.get('actions', [])):
295 code = 404
295 code = 404
296 else:
296 else:
297 continue
297 continue
298 else:
298 else:
299 # An error dict without a code doesn't make much sense, so
299 # An error dict without a code doesn't make much sense, so
300 # treat as a server error.
300 # treat as a server error.
301 code = response.get('error').get('code', 500)
301 code = response.get('error').get('code', 500)
302
302
303 ptrmap = {p.oid(): p for p in pointers}
303 ptrmap = {p.oid(): p for p in pointers}
304 p = ptrmap.get(response['oid'], None)
304 p = ptrmap.get(response['oid'], None)
305 if p:
305 if p:
306 filename = getattr(p, 'filename', 'unknown')
306 filename = getattr(p, 'filename', 'unknown')
307 errors = {
307 errors = {
308 404: 'The object does not exist',
308 404: 'The object does not exist',
309 410: 'The object was removed by the owner',
309 410: 'The object was removed by the owner',
310 422: 'Validation error',
310 422: 'Validation error',
311 500: 'Internal server error',
311 500: 'Internal server error',
312 }
312 }
313 msg = errors.get(code, 'status code %d' % code)
313 msg = errors.get(code, 'status code %d' % code)
314 raise LfsRemoteError(_('LFS server error for "%s": %s')
314 raise LfsRemoteError(_('LFS server error for "%s": %s')
315 % (filename, msg))
315 % (filename, msg))
316 else:
316 else:
317 raise LfsRemoteError(
317 raise LfsRemoteError(
318 _('LFS server error. Unsolicited response for oid %s')
318 _('LFS server error. Unsolicited response for oid %s')
319 % response['oid'])
319 % response['oid'])
320
320
321 def _extractobjects(self, response, pointers, action):
321 def _extractobjects(self, response, pointers, action):
322 """extract objects from response of the batch API
322 """extract objects from response of the batch API
323
323
324 response: parsed JSON object returned by batch API
324 response: parsed JSON object returned by batch API
325 return response['objects'] filtered by action
325 return response['objects'] filtered by action
326 raise if any object has an error
326 raise if any object has an error
327 """
327 """
328 # Scan errors from objects - fail early
328 # Scan errors from objects - fail early
329 objects = response.get('objects', [])
329 objects = response.get('objects', [])
330 self._checkforservererror(pointers, objects, action)
330 self._checkforservererror(pointers, objects, action)
331
331
332 # Filter objects with given action. Practically, this skips uploading
332 # Filter objects with given action. Practically, this skips uploading
333 # objects which exist in the server.
333 # objects which exist in the server.
334 filteredobjects = [o for o in objects if action in o.get('actions', [])]
334 filteredobjects = [o for o in objects if action in o.get('actions', [])]
335
335
336 return filteredobjects
336 return filteredobjects
337
337
338 def _basictransfer(self, obj, action, localstore):
338 def _basictransfer(self, obj, action, localstore):
339 """Download or upload a single object using basic transfer protocol
339 """Download or upload a single object using basic transfer protocol
340
340
341 obj: dict, an object description returned by batch API
341 obj: dict, an object description returned by batch API
342 action: string, one of ['upload', 'download']
342 action: string, one of ['upload', 'download']
343 localstore: blobstore.local
343 localstore: blobstore.local
344
344
345 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
345 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
346 basic-transfers.md
346 basic-transfers.md
347 """
347 """
348 oid = pycompat.bytestr(obj['oid'])
348 oid = pycompat.bytestr(obj['oid'])
349
349
350 href = pycompat.bytestr(obj['actions'][action].get('href'))
350 href = pycompat.bytestr(obj['actions'][action].get('href'))
351 headers = obj['actions'][action].get('header', {}).items()
351 headers = obj['actions'][action].get('header', {}).items()
352
352
353 request = util.urlreq.request(href)
353 request = util.urlreq.request(href)
354 if action == 'upload':
354 if action == 'upload':
355 # If uploading blobs, read data from local blobstore.
355 # If uploading blobs, read data from local blobstore.
356 if not localstore.verify(oid):
356 if not localstore.verify(oid):
357 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
357 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
358 hint=_('run hg verify'))
358 hint=_('run hg verify'))
359 request.data = filewithprogress(localstore.open(oid), None)
359 request.data = filewithprogress(localstore.open(oid), None)
360 request.get_method = lambda: 'PUT'
360 request.get_method = lambda: 'PUT'
361 request.add_header('Content-Type', 'application/octet-stream')
361 request.add_header('Content-Type', 'application/octet-stream')
362
362
363 for k, v in headers:
363 for k, v in headers:
364 request.add_header(k, v)
364 request.add_header(k, v)
365
365
366 response = b''
366 response = b''
367 try:
367 try:
368 req = self.urlopener.open(request)
368 req = self.urlopener.open(request)
369
369
370 if self.ui.debugflag:
370 if self.ui.debugflag:
371 self.ui.debug('Status: %d\n' % req.status)
371 self.ui.debug('Status: %d\n' % req.status)
372 # lfs-test-server and hg serve return headers in different order
372 # lfs-test-server and hg serve return headers in different order
373 self.ui.debug('%s\n'
373 self.ui.debug('%s\n'
374 % '\n'.join(sorted(str(req.info()).splitlines())))
374 % '\n'.join(sorted(str(req.info()).splitlines())))
375
375
376 if action == 'download':
376 if action == 'download':
377 # If downloading blobs, store downloaded data to local blobstore
377 # If downloading blobs, store downloaded data to local blobstore
378 localstore.download(oid, req)
378 localstore.download(oid, req)
379 else:
379 else:
380 while True:
380 while True:
381 data = req.read(1048576)
381 data = req.read(1048576)
382 if not data:
382 if not data:
383 break
383 break
384 response += data
384 response += data
385 if response:
385 if response:
386 self.ui.debug('lfs %s response: %s' % (action, response))
386 self.ui.debug('lfs %s response: %s' % (action, response))
387 except util.urlerr.httperror as ex:
387 except util.urlerr.httperror as ex:
388 if self.ui.debugflag:
388 if self.ui.debugflag:
389 self.ui.debug('%s: %s\n' % (oid, ex.read()))
389 self.ui.debug('%s: %s\n' % (oid, ex.read()))
390 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
390 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
391 % (ex, oid, action))
391 % (ex, oid, action))
392
392
393 def _batch(self, pointers, localstore, action):
393 def _batch(self, pointers, localstore, action):
394 if action not in ['upload', 'download']:
394 if action not in ['upload', 'download']:
395 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
395 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
396
396
397 response = self._batchrequest(pointers, action)
397 response = self._batchrequest(pointers, action)
398 objects = self._extractobjects(response, pointers, action)
398 objects = self._extractobjects(response, pointers, action)
399 total = sum(x.get('size', 0) for x in objects)
399 total = sum(x.get('size', 0) for x in objects)
400 sizes = {}
400 sizes = {}
401 for obj in objects:
401 for obj in objects:
402 sizes[obj.get('oid')] = obj.get('size', 0)
402 sizes[obj.get('oid')] = obj.get('size', 0)
403 topic = {'upload': _('lfs uploading'),
403 topic = {'upload': _('lfs uploading'),
404 'download': _('lfs downloading')}[action]
404 'download': _('lfs downloading')}[action]
405 if len(objects) > 1:
405 if len(objects) > 1:
406 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
406 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
407 % (len(objects), util.bytecount(total)))
407 % (len(objects), util.bytecount(total)))
408 self.ui.progress(topic, 0, total=total)
408 progress = self.ui.makeprogress(topic, total=total)
409 progress.update(0)
409 def transfer(chunk):
410 def transfer(chunk):
410 for obj in chunk:
411 for obj in chunk:
411 objsize = obj.get('size', 0)
412 objsize = obj.get('size', 0)
412 if self.ui.verbose:
413 if self.ui.verbose:
413 if action == 'download':
414 if action == 'download':
414 msg = _('lfs: downloading %s (%s)\n')
415 msg = _('lfs: downloading %s (%s)\n')
415 elif action == 'upload':
416 elif action == 'upload':
416 msg = _('lfs: uploading %s (%s)\n')
417 msg = _('lfs: uploading %s (%s)\n')
417 self.ui.note(msg % (obj.get('oid'),
418 self.ui.note(msg % (obj.get('oid'),
418 util.bytecount(objsize)))
419 util.bytecount(objsize)))
419 retry = self.retry
420 retry = self.retry
420 while True:
421 while True:
421 try:
422 try:
422 self._basictransfer(obj, action, localstore)
423 self._basictransfer(obj, action, localstore)
423 yield 1, obj.get('oid')
424 yield 1, obj.get('oid')
424 break
425 break
425 except socket.error as ex:
426 except socket.error as ex:
426 if retry > 0:
427 if retry > 0:
427 self.ui.note(
428 self.ui.note(
428 _('lfs: failed: %r (remaining retry %d)\n')
429 _('lfs: failed: %r (remaining retry %d)\n')
429 % (ex, retry))
430 % (ex, retry))
430 retry -= 1
431 retry -= 1
431 continue
432 continue
432 raise
433 raise
433
434
434 # Until https multiplexing gets sorted out
435 # Until https multiplexing gets sorted out
435 if self.ui.configbool('experimental', 'lfs.worker-enable'):
436 if self.ui.configbool('experimental', 'lfs.worker-enable'):
436 oids = worker.worker(self.ui, 0.1, transfer, (),
437 oids = worker.worker(self.ui, 0.1, transfer, (),
437 sorted(objects, key=lambda o: o.get('oid')))
438 sorted(objects, key=lambda o: o.get('oid')))
438 else:
439 else:
439 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
440 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
440
441
441 processed = 0
442 processed = 0
442 blobs = 0
443 blobs = 0
443 for _one, oid in oids:
444 for _one, oid in oids:
444 processed += sizes[oid]
445 processed += sizes[oid]
445 blobs += 1
446 blobs += 1
446 self.ui.progress(topic, processed, total=total)
447 progress.update(processed)
447 self.ui.note(_('lfs: processed: %s\n') % oid)
448 self.ui.note(_('lfs: processed: %s\n') % oid)
448 self.ui.progress(topic, pos=None, total=total)
449 progress.complete()
449
450
450 if blobs > 0:
451 if blobs > 0:
451 if action == 'upload':
452 if action == 'upload':
452 self.ui.status(_('lfs: uploaded %d files (%s)\n')
453 self.ui.status(_('lfs: uploaded %d files (%s)\n')
453 % (blobs, util.bytecount(processed)))
454 % (blobs, util.bytecount(processed)))
454 elif action == 'download':
455 elif action == 'download':
455 self.ui.status(_('lfs: downloaded %d files (%s)\n')
456 self.ui.status(_('lfs: downloaded %d files (%s)\n')
456 % (blobs, util.bytecount(processed)))
457 % (blobs, util.bytecount(processed)))
457
458
458 def __del__(self):
459 def __del__(self):
459 # copied from mercurial/httppeer.py
460 # copied from mercurial/httppeer.py
460 urlopener = getattr(self, 'urlopener', None)
461 urlopener = getattr(self, 'urlopener', None)
461 if urlopener:
462 if urlopener:
462 for h in urlopener.handlers:
463 for h in urlopener.handlers:
463 h.close()
464 h.close()
464 getattr(h, "close_all", lambda : None)()
465 getattr(h, "close_all", lambda : None)()
465
466
466 class _dummyremote(object):
467 class _dummyremote(object):
467 """Dummy store storing blobs to temp directory."""
468 """Dummy store storing blobs to temp directory."""
468
469
469 def __init__(self, repo, url):
470 def __init__(self, repo, url):
470 fullpath = repo.vfs.join('lfs', url.path)
471 fullpath = repo.vfs.join('lfs', url.path)
471 self.vfs = lfsvfs(fullpath)
472 self.vfs = lfsvfs(fullpath)
472
473
473 def writebatch(self, pointers, fromstore):
474 def writebatch(self, pointers, fromstore):
474 for p in _deduplicate(pointers):
475 for p in _deduplicate(pointers):
475 content = fromstore.read(p.oid(), verify=True)
476 content = fromstore.read(p.oid(), verify=True)
476 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
477 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
477 fp.write(content)
478 fp.write(content)
478
479
479 def readbatch(self, pointers, tostore):
480 def readbatch(self, pointers, tostore):
480 for p in _deduplicate(pointers):
481 for p in _deduplicate(pointers):
481 with self.vfs(p.oid(), 'rb') as fp:
482 with self.vfs(p.oid(), 'rb') as fp:
482 tostore.download(p.oid(), fp)
483 tostore.download(p.oid(), fp)
483
484
484 class _nullremote(object):
485 class _nullremote(object):
485 """Null store storing blobs to /dev/null."""
486 """Null store storing blobs to /dev/null."""
486
487
487 def __init__(self, repo, url):
488 def __init__(self, repo, url):
488 pass
489 pass
489
490
490 def writebatch(self, pointers, fromstore):
491 def writebatch(self, pointers, fromstore):
491 pass
492 pass
492
493
493 def readbatch(self, pointers, tostore):
494 def readbatch(self, pointers, tostore):
494 pass
495 pass
495
496
496 class _promptremote(object):
497 class _promptremote(object):
497 """Prompt user to set lfs.url when accessed."""
498 """Prompt user to set lfs.url when accessed."""
498
499
499 def __init__(self, repo, url):
500 def __init__(self, repo, url):
500 pass
501 pass
501
502
502 def writebatch(self, pointers, fromstore, ui=None):
503 def writebatch(self, pointers, fromstore, ui=None):
503 self._prompt()
504 self._prompt()
504
505
505 def readbatch(self, pointers, tostore, ui=None):
506 def readbatch(self, pointers, tostore, ui=None):
506 self._prompt()
507 self._prompt()
507
508
508 def _prompt(self):
509 def _prompt(self):
509 raise error.Abort(_('lfs.url needs to be configured'))
510 raise error.Abort(_('lfs.url needs to be configured'))
510
511
511 _storemap = {
512 _storemap = {
512 'https': _gitlfsremote,
513 'https': _gitlfsremote,
513 'http': _gitlfsremote,
514 'http': _gitlfsremote,
514 'file': _dummyremote,
515 'file': _dummyremote,
515 'null': _nullremote,
516 'null': _nullremote,
516 None: _promptremote,
517 None: _promptremote,
517 }
518 }
518
519
519 def _deduplicate(pointers):
520 def _deduplicate(pointers):
520 """Remove any duplicate oids that exist in the list"""
521 """Remove any duplicate oids that exist in the list"""
521 reduced = util.sortdict()
522 reduced = util.sortdict()
522 for p in pointers:
523 for p in pointers:
523 reduced[p.oid()] = p
524 reduced[p.oid()] = p
524 return reduced.values()
525 return reduced.values()
525
526
526 def _verify(oid, content):
527 def _verify(oid, content):
527 realoid = hashlib.sha256(content).hexdigest()
528 realoid = hashlib.sha256(content).hexdigest()
528 if realoid != oid:
529 if realoid != oid:
529 raise LfsCorruptionError(_('detected corrupt lfs object: %s') % oid,
530 raise LfsCorruptionError(_('detected corrupt lfs object: %s') % oid,
530 hint=_('run hg verify'))
531 hint=_('run hg verify'))
531
532
532 def remote(repo, remote=None):
533 def remote(repo, remote=None):
533 """remotestore factory. return a store in _storemap depending on config
534 """remotestore factory. return a store in _storemap depending on config
534
535
535 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
536 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
536 infer the endpoint, based on the remote repository using the same path
537 infer the endpoint, based on the remote repository using the same path
537 adjustments as git. As an extension, 'http' is supported as well so that
538 adjustments as git. As an extension, 'http' is supported as well so that
538 ``hg serve`` works out of the box.
539 ``hg serve`` works out of the box.
539
540
540 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
541 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
541 """
542 """
542 lfsurl = repo.ui.config('lfs', 'url')
543 lfsurl = repo.ui.config('lfs', 'url')
543 url = util.url(lfsurl or '')
544 url = util.url(lfsurl or '')
544 if lfsurl is None:
545 if lfsurl is None:
545 if remote:
546 if remote:
546 path = remote
547 path = remote
547 elif util.safehasattr(repo, '_subtoppath'):
548 elif util.safehasattr(repo, '_subtoppath'):
548 # The pull command sets this during the optional update phase, which
549 # The pull command sets this during the optional update phase, which
549 # tells exactly where the pull originated, whether 'paths.default'
550 # tells exactly where the pull originated, whether 'paths.default'
550 # or explicit.
551 # or explicit.
551 path = repo._subtoppath
552 path = repo._subtoppath
552 else:
553 else:
553 # TODO: investigate 'paths.remote:lfsurl' style path customization,
554 # TODO: investigate 'paths.remote:lfsurl' style path customization,
554 # and fall back to inferring from 'paths.remote' if unspecified.
555 # and fall back to inferring from 'paths.remote' if unspecified.
555 path = repo.ui.config('paths', 'default') or ''
556 path = repo.ui.config('paths', 'default') or ''
556
557
557 defaulturl = util.url(path)
558 defaulturl = util.url(path)
558
559
559 # TODO: support local paths as well.
560 # TODO: support local paths as well.
560 # TODO: consider the ssh -> https transformation that git applies
561 # TODO: consider the ssh -> https transformation that git applies
561 if defaulturl.scheme in (b'http', b'https'):
562 if defaulturl.scheme in (b'http', b'https'):
562 if defaulturl.path and defaulturl.path[:-1] != b'/':
563 if defaulturl.path and defaulturl.path[:-1] != b'/':
563 defaulturl.path += b'/'
564 defaulturl.path += b'/'
564 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
565 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
565
566
566 url = util.url(bytes(defaulturl))
567 url = util.url(bytes(defaulturl))
567 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
568 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
568
569
569 scheme = url.scheme
570 scheme = url.scheme
570 if scheme not in _storemap:
571 if scheme not in _storemap:
571 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
572 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
572 return _storemap[scheme](repo, url)
573 return _storemap[scheme](repo, url)
573
574
574 class LfsRemoteError(error.RevlogError):
575 class LfsRemoteError(error.RevlogError):
575 pass
576 pass
576
577
577 class LfsCorruptionError(error.Abort):
578 class LfsCorruptionError(error.Abort):
578 """Raised when a corrupt blob is detected, aborting an operation
579 """Raised when a corrupt blob is detected, aborting an operation
579
580
580 It exists to allow specialized handling on the server side."""
581 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now