##// END OF EJS Templates
lfs: ensure that the return of urlopener.open() is closed...
Matt Harbison -
r40701:fb379b78 default
parent child Browse files
Show More
@@ -1,640 +1,643 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import errno
11 import errno
11 import hashlib
12 import hashlib
12 import json
13 import json
13 import os
14 import os
14 import re
15 import re
15 import socket
16 import socket
16
17
17 from mercurial.i18n import _
18 from mercurial.i18n import _
18
19
19 from mercurial import (
20 from mercurial import (
20 encoding,
21 encoding,
21 error,
22 error,
22 pathutil,
23 pathutil,
23 pycompat,
24 pycompat,
24 url as urlmod,
25 url as urlmod,
25 util,
26 util,
26 vfs as vfsmod,
27 vfs as vfsmod,
27 worker,
28 worker,
28 )
29 )
29
30
30 from mercurial.utils import (
31 from mercurial.utils import (
31 stringutil,
32 stringutil,
32 )
33 )
33
34
34 from ..largefiles import lfutil
35 from ..largefiles import lfutil
35
36
36 # 64 bytes for SHA256
37 # 64 bytes for SHA256
37 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
38 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
38
39
39 class lfsvfs(vfsmod.vfs):
40 class lfsvfs(vfsmod.vfs):
40 def join(self, path):
41 def join(self, path):
41 """split the path at first two characters, like: XX/XXXXX..."""
42 """split the path at first two characters, like: XX/XXXXX..."""
42 if not _lfsre.match(path):
43 if not _lfsre.match(path):
43 raise error.ProgrammingError('unexpected lfs path: %s' % path)
44 raise error.ProgrammingError('unexpected lfs path: %s' % path)
44 return super(lfsvfs, self).join(path[0:2], path[2:])
45 return super(lfsvfs, self).join(path[0:2], path[2:])
45
46
46 def walk(self, path=None, onerror=None):
47 def walk(self, path=None, onerror=None):
47 """Yield (dirpath, [], oids) tuple for blobs under path
48 """Yield (dirpath, [], oids) tuple for blobs under path
48
49
49 Oids only exist in the root of this vfs, so dirpath is always ''.
50 Oids only exist in the root of this vfs, so dirpath is always ''.
50 """
51 """
51 root = os.path.normpath(self.base)
52 root = os.path.normpath(self.base)
52 # when dirpath == root, dirpath[prefixlen:] becomes empty
53 # when dirpath == root, dirpath[prefixlen:] becomes empty
53 # because len(dirpath) < prefixlen.
54 # because len(dirpath) < prefixlen.
54 prefixlen = len(pathutil.normasprefix(root))
55 prefixlen = len(pathutil.normasprefix(root))
55 oids = []
56 oids = []
56
57
57 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
58 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
58 onerror=onerror):
59 onerror=onerror):
59 dirpath = dirpath[prefixlen:]
60 dirpath = dirpath[prefixlen:]
60
61
61 # Silently skip unexpected files and directories
62 # Silently skip unexpected files and directories
62 if len(dirpath) == 2:
63 if len(dirpath) == 2:
63 oids.extend([dirpath + f for f in files
64 oids.extend([dirpath + f for f in files
64 if _lfsre.match(dirpath + f)])
65 if _lfsre.match(dirpath + f)])
65
66
66 yield ('', [], oids)
67 yield ('', [], oids)
67
68
68 class nullvfs(lfsvfs):
69 class nullvfs(lfsvfs):
69 def __init__(self):
70 def __init__(self):
70 pass
71 pass
71
72
72 def exists(self, oid):
73 def exists(self, oid):
73 return False
74 return False
74
75
75 def read(self, oid):
76 def read(self, oid):
76 # store.read() calls into here if the blob doesn't exist in its
77 # store.read() calls into here if the blob doesn't exist in its
77 # self.vfs. Raise the same error as a normal vfs when asked to read a
78 # self.vfs. Raise the same error as a normal vfs when asked to read a
78 # file that doesn't exist. The only difference is the full file path
79 # file that doesn't exist. The only difference is the full file path
79 # isn't available in the error.
80 # isn't available in the error.
80 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
81 raise IOError(errno.ENOENT, '%s: No such file or directory' % oid)
81
82
82 def walk(self, path=None, onerror=None):
83 def walk(self, path=None, onerror=None):
83 return ('', [], [])
84 return ('', [], [])
84
85
85 def write(self, oid, data):
86 def write(self, oid, data):
86 pass
87 pass
87
88
88 class filewithprogress(object):
89 class filewithprogress(object):
89 """a file-like object that supports __len__ and read.
90 """a file-like object that supports __len__ and read.
90
91
91 Useful to provide progress information for how many bytes are read.
92 Useful to provide progress information for how many bytes are read.
92 """
93 """
93
94
94 def __init__(self, fp, callback):
95 def __init__(self, fp, callback):
95 self._fp = fp
96 self._fp = fp
96 self._callback = callback # func(readsize)
97 self._callback = callback # func(readsize)
97 fp.seek(0, os.SEEK_END)
98 fp.seek(0, os.SEEK_END)
98 self._len = fp.tell()
99 self._len = fp.tell()
99 fp.seek(0)
100 fp.seek(0)
100
101
101 def __len__(self):
102 def __len__(self):
102 return self._len
103 return self._len
103
104
104 def read(self, size):
105 def read(self, size):
105 if self._fp is None:
106 if self._fp is None:
106 return b''
107 return b''
107 data = self._fp.read(size)
108 data = self._fp.read(size)
108 if data:
109 if data:
109 if self._callback:
110 if self._callback:
110 self._callback(len(data))
111 self._callback(len(data))
111 else:
112 else:
112 self._fp.close()
113 self._fp.close()
113 self._fp = None
114 self._fp = None
114 return data
115 return data
115
116
116 class local(object):
117 class local(object):
117 """Local blobstore for large file contents.
118 """Local blobstore for large file contents.
118
119
119 This blobstore is used both as a cache and as a staging area for large blobs
120 This blobstore is used both as a cache and as a staging area for large blobs
120 to be uploaded to the remote blobstore.
121 to be uploaded to the remote blobstore.
121 """
122 """
122
123
123 def __init__(self, repo):
124 def __init__(self, repo):
124 fullpath = repo.svfs.join('lfs/objects')
125 fullpath = repo.svfs.join('lfs/objects')
125 self.vfs = lfsvfs(fullpath)
126 self.vfs = lfsvfs(fullpath)
126
127
127 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
128 if repo.ui.configbool('experimental', 'lfs.disableusercache'):
128 self.cachevfs = nullvfs()
129 self.cachevfs = nullvfs()
129 else:
130 else:
130 usercache = lfutil._usercachedir(repo.ui, 'lfs')
131 usercache = lfutil._usercachedir(repo.ui, 'lfs')
131 self.cachevfs = lfsvfs(usercache)
132 self.cachevfs = lfsvfs(usercache)
132 self.ui = repo.ui
133 self.ui = repo.ui
133
134
134 def open(self, oid):
135 def open(self, oid):
135 """Open a read-only file descriptor to the named blob, in either the
136 """Open a read-only file descriptor to the named blob, in either the
136 usercache or the local store."""
137 usercache or the local store."""
137 # The usercache is the most likely place to hold the file. Commit will
138 # The usercache is the most likely place to hold the file. Commit will
138 # write to both it and the local store, as will anything that downloads
139 # write to both it and the local store, as will anything that downloads
139 # the blobs. However, things like clone without an update won't
140 # the blobs. However, things like clone without an update won't
140 # populate the local store. For an init + push of a local clone,
141 # populate the local store. For an init + push of a local clone,
141 # the usercache is the only place it _could_ be. If not present, the
142 # the usercache is the only place it _could_ be. If not present, the
142 # missing file msg here will indicate the local repo, not the usercache.
143 # missing file msg here will indicate the local repo, not the usercache.
143 if self.cachevfs.exists(oid):
144 if self.cachevfs.exists(oid):
144 return self.cachevfs(oid, 'rb')
145 return self.cachevfs(oid, 'rb')
145
146
146 return self.vfs(oid, 'rb')
147 return self.vfs(oid, 'rb')
147
148
148 def download(self, oid, src):
149 def download(self, oid, src):
149 """Read the blob from the remote source in chunks, verify the content,
150 """Read the blob from the remote source in chunks, verify the content,
150 and write to this local blobstore."""
151 and write to this local blobstore."""
151 sha256 = hashlib.sha256()
152 sha256 = hashlib.sha256()
152
153
153 with self.vfs(oid, 'wb', atomictemp=True) as fp:
154 with self.vfs(oid, 'wb', atomictemp=True) as fp:
154 for chunk in util.filechunkiter(src, size=1048576):
155 for chunk in util.filechunkiter(src, size=1048576):
155 fp.write(chunk)
156 fp.write(chunk)
156 sha256.update(chunk)
157 sha256.update(chunk)
157
158
158 realoid = sha256.hexdigest()
159 realoid = sha256.hexdigest()
159 if realoid != oid:
160 if realoid != oid:
160 raise LfsCorruptionError(_('corrupt remote lfs object: %s')
161 raise LfsCorruptionError(_('corrupt remote lfs object: %s')
161 % oid)
162 % oid)
162
163
163 self._linktousercache(oid)
164 self._linktousercache(oid)
164
165
165 def write(self, oid, data):
166 def write(self, oid, data):
166 """Write blob to local blobstore.
167 """Write blob to local blobstore.
167
168
168 This should only be called from the filelog during a commit or similar.
169 This should only be called from the filelog during a commit or similar.
169 As such, there is no need to verify the data. Imports from a remote
170 As such, there is no need to verify the data. Imports from a remote
170 store must use ``download()`` instead."""
171 store must use ``download()`` instead."""
171 with self.vfs(oid, 'wb', atomictemp=True) as fp:
172 with self.vfs(oid, 'wb', atomictemp=True) as fp:
172 fp.write(data)
173 fp.write(data)
173
174
174 self._linktousercache(oid)
175 self._linktousercache(oid)
175
176
176 def linkfromusercache(self, oid):
177 def linkfromusercache(self, oid):
177 """Link blobs found in the user cache into this store.
178 """Link blobs found in the user cache into this store.
178
179
179 The server module needs to do this when it lets the client know not to
180 The server module needs to do this when it lets the client know not to
180 upload the blob, to ensure it is always available in this store.
181 upload the blob, to ensure it is always available in this store.
181 Normally this is done implicitly when the client reads or writes the
182 Normally this is done implicitly when the client reads or writes the
182 blob, but that doesn't happen when the server tells the client that it
183 blob, but that doesn't happen when the server tells the client that it
183 already has the blob.
184 already has the blob.
184 """
185 """
185 if (not isinstance(self.cachevfs, nullvfs)
186 if (not isinstance(self.cachevfs, nullvfs)
186 and not self.vfs.exists(oid)):
187 and not self.vfs.exists(oid)):
187 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
188 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
188 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
189 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
189
190
190 def _linktousercache(self, oid):
191 def _linktousercache(self, oid):
191 # XXX: should we verify the content of the cache, and hardlink back to
192 # XXX: should we verify the content of the cache, and hardlink back to
192 # the local store on success, but truncate, write and link on failure?
193 # the local store on success, but truncate, write and link on failure?
193 if (not self.cachevfs.exists(oid)
194 if (not self.cachevfs.exists(oid)
194 and not isinstance(self.cachevfs, nullvfs)):
195 and not isinstance(self.cachevfs, nullvfs)):
195 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
196 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
196 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
197 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
197
198
198 def read(self, oid, verify=True):
199 def read(self, oid, verify=True):
199 """Read blob from local blobstore."""
200 """Read blob from local blobstore."""
200 if not self.vfs.exists(oid):
201 if not self.vfs.exists(oid):
201 blob = self._read(self.cachevfs, oid, verify)
202 blob = self._read(self.cachevfs, oid, verify)
202
203
203 # Even if revlog will verify the content, it needs to be verified
204 # Even if revlog will verify the content, it needs to be verified
204 # now before making the hardlink to avoid propagating corrupt blobs.
205 # now before making the hardlink to avoid propagating corrupt blobs.
205 # Don't abort if corruption is detected, because `hg verify` will
206 # Don't abort if corruption is detected, because `hg verify` will
206 # give more useful info about the corruption- simply don't add the
207 # give more useful info about the corruption- simply don't add the
207 # hardlink.
208 # hardlink.
208 if verify or hashlib.sha256(blob).hexdigest() == oid:
209 if verify or hashlib.sha256(blob).hexdigest() == oid:
209 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
210 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
210 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
211 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
211 else:
212 else:
212 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
213 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
213 blob = self._read(self.vfs, oid, verify)
214 blob = self._read(self.vfs, oid, verify)
214 return blob
215 return blob
215
216
216 def _read(self, vfs, oid, verify):
217 def _read(self, vfs, oid, verify):
217 """Read blob (after verifying) from the given store"""
218 """Read blob (after verifying) from the given store"""
218 blob = vfs.read(oid)
219 blob = vfs.read(oid)
219 if verify:
220 if verify:
220 _verify(oid, blob)
221 _verify(oid, blob)
221 return blob
222 return blob
222
223
223 def verify(self, oid):
224 def verify(self, oid):
224 """Indicate whether or not the hash of the underlying file matches its
225 """Indicate whether or not the hash of the underlying file matches its
225 name."""
226 name."""
226 sha256 = hashlib.sha256()
227 sha256 = hashlib.sha256()
227
228
228 with self.open(oid) as fp:
229 with self.open(oid) as fp:
229 for chunk in util.filechunkiter(fp, size=1048576):
230 for chunk in util.filechunkiter(fp, size=1048576):
230 sha256.update(chunk)
231 sha256.update(chunk)
231
232
232 return oid == sha256.hexdigest()
233 return oid == sha256.hexdigest()
233
234
234 def has(self, oid):
235 def has(self, oid):
235 """Returns True if the local blobstore contains the requested blob,
236 """Returns True if the local blobstore contains the requested blob,
236 False otherwise."""
237 False otherwise."""
237 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
238 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
238
239
239 def _urlerrorreason(urlerror):
240 def _urlerrorreason(urlerror):
240 '''Create a friendly message for the given URLError to be used in an
241 '''Create a friendly message for the given URLError to be used in an
241 LfsRemoteError message.
242 LfsRemoteError message.
242 '''
243 '''
243 inst = urlerror
244 inst = urlerror
244
245
245 if isinstance(urlerror.reason, Exception):
246 if isinstance(urlerror.reason, Exception):
246 inst = urlerror.reason
247 inst = urlerror.reason
247
248
248 if util.safehasattr(inst, 'reason'):
249 if util.safehasattr(inst, 'reason'):
249 try: # usually it is in the form (errno, strerror)
250 try: # usually it is in the form (errno, strerror)
250 reason = inst.reason.args[1]
251 reason = inst.reason.args[1]
251 except (AttributeError, IndexError):
252 except (AttributeError, IndexError):
252 # it might be anything, for example a string
253 # it might be anything, for example a string
253 reason = inst.reason
254 reason = inst.reason
254 if isinstance(reason, pycompat.unicode):
255 if isinstance(reason, pycompat.unicode):
255 # SSLError of Python 2.7.9 contains a unicode
256 # SSLError of Python 2.7.9 contains a unicode
256 reason = encoding.unitolocal(reason)
257 reason = encoding.unitolocal(reason)
257 return reason
258 return reason
258 elif getattr(inst, "strerror", None):
259 elif getattr(inst, "strerror", None):
259 return encoding.strtolocal(inst.strerror)
260 return encoding.strtolocal(inst.strerror)
260 else:
261 else:
261 return stringutil.forcebytestr(urlerror)
262 return stringutil.forcebytestr(urlerror)
262
263
263 class _gitlfsremote(object):
264 class _gitlfsremote(object):
264
265
265 def __init__(self, repo, url):
266 def __init__(self, repo, url):
266 ui = repo.ui
267 ui = repo.ui
267 self.ui = ui
268 self.ui = ui
268 baseurl, authinfo = url.authinfo()
269 baseurl, authinfo = url.authinfo()
269 self.baseurl = baseurl.rstrip('/')
270 self.baseurl = baseurl.rstrip('/')
270 useragent = repo.ui.config('experimental', 'lfs.user-agent')
271 useragent = repo.ui.config('experimental', 'lfs.user-agent')
271 if not useragent:
272 if not useragent:
272 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
273 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
273 self.urlopener = urlmod.opener(ui, authinfo, useragent)
274 self.urlopener = urlmod.opener(ui, authinfo, useragent)
274 self.retry = ui.configint('lfs', 'retry')
275 self.retry = ui.configint('lfs', 'retry')
275
276
276 def writebatch(self, pointers, fromstore):
277 def writebatch(self, pointers, fromstore):
277 """Batch upload from local to remote blobstore."""
278 """Batch upload from local to remote blobstore."""
278 self._batch(_deduplicate(pointers), fromstore, 'upload')
279 self._batch(_deduplicate(pointers), fromstore, 'upload')
279
280
280 def readbatch(self, pointers, tostore):
281 def readbatch(self, pointers, tostore):
281 """Batch download from remote to local blostore."""
282 """Batch download from remote to local blostore."""
282 self._batch(_deduplicate(pointers), tostore, 'download')
283 self._batch(_deduplicate(pointers), tostore, 'download')
283
284
284 def _batchrequest(self, pointers, action):
285 def _batchrequest(self, pointers, action):
285 """Get metadata about objects pointed by pointers for given action
286 """Get metadata about objects pointed by pointers for given action
286
287
287 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
288 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
288 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
289 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
289 """
290 """
290 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
291 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
291 requestdata = json.dumps({
292 requestdata = json.dumps({
292 'objects': objects,
293 'objects': objects,
293 'operation': action,
294 'operation': action,
294 })
295 })
295 url = '%s/objects/batch' % self.baseurl
296 url = '%s/objects/batch' % self.baseurl
296 batchreq = util.urlreq.request(url, data=requestdata)
297 batchreq = util.urlreq.request(url, data=requestdata)
297 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
298 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
298 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
299 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
299 try:
300 try:
300 rsp = self.urlopener.open(batchreq)
301 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
301 rawjson = rsp.read()
302 rawjson = rsp.read()
302 except util.urlerr.httperror as ex:
303 except util.urlerr.httperror as ex:
303 hints = {
304 hints = {
304 400: _('check that lfs serving is enabled on %s and "%s" is '
305 400: _('check that lfs serving is enabled on %s and "%s" is '
305 'supported') % (self.baseurl, action),
306 'supported') % (self.baseurl, action),
306 404: _('the "lfs.url" config may be used to override %s')
307 404: _('the "lfs.url" config may be used to override %s')
307 % self.baseurl,
308 % self.baseurl,
308 }
309 }
309 hint = hints.get(ex.code, _('api=%s, action=%s') % (url, action))
310 hint = hints.get(ex.code, _('api=%s, action=%s') % (url, action))
310 raise LfsRemoteError(_('LFS HTTP error: %s') % ex, hint=hint)
311 raise LfsRemoteError(_('LFS HTTP error: %s') % ex, hint=hint)
311 except util.urlerr.urlerror as ex:
312 except util.urlerr.urlerror as ex:
312 hint = (_('the "lfs.url" config may be used to override %s')
313 hint = (_('the "lfs.url" config may be used to override %s')
313 % self.baseurl)
314 % self.baseurl)
314 raise LfsRemoteError(_('LFS error: %s') % _urlerrorreason(ex),
315 raise LfsRemoteError(_('LFS error: %s') % _urlerrorreason(ex),
315 hint=hint)
316 hint=hint)
316 try:
317 try:
317 response = json.loads(rawjson)
318 response = json.loads(rawjson)
318 except ValueError:
319 except ValueError:
319 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
320 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
320 % rawjson)
321 % rawjson)
321
322
322 if self.ui.debugflag:
323 if self.ui.debugflag:
323 self.ui.debug('Status: %d\n' % rsp.status)
324 self.ui.debug('Status: %d\n' % rsp.status)
324 # lfs-test-server and hg serve return headers in different order
325 # lfs-test-server and hg serve return headers in different order
325 self.ui.debug('%s\n'
326 self.ui.debug('%s\n'
326 % '\n'.join(sorted(str(rsp.info()).splitlines())))
327 % '\n'.join(sorted(str(rsp.info()).splitlines())))
327
328
328 if 'objects' in response:
329 if 'objects' in response:
329 response['objects'] = sorted(response['objects'],
330 response['objects'] = sorted(response['objects'],
330 key=lambda p: p['oid'])
331 key=lambda p: p['oid'])
331 self.ui.debug('%s\n'
332 self.ui.debug('%s\n'
332 % json.dumps(response, indent=2,
333 % json.dumps(response, indent=2,
333 separators=('', ': '), sort_keys=True))
334 separators=('', ': '), sort_keys=True))
334
335
335 return response
336 return response
336
337
337 def _checkforservererror(self, pointers, responses, action):
338 def _checkforservererror(self, pointers, responses, action):
338 """Scans errors from objects
339 """Scans errors from objects
339
340
340 Raises LfsRemoteError if any objects have an error"""
341 Raises LfsRemoteError if any objects have an error"""
341 for response in responses:
342 for response in responses:
342 # The server should return 404 when objects cannot be found. Some
343 # The server should return 404 when objects cannot be found. Some
343 # server implementation (ex. lfs-test-server) does not set "error"
344 # server implementation (ex. lfs-test-server) does not set "error"
344 # but just removes "download" from "actions". Treat that case
345 # but just removes "download" from "actions". Treat that case
345 # as the same as 404 error.
346 # as the same as 404 error.
346 if 'error' not in response:
347 if 'error' not in response:
347 if (action == 'download'
348 if (action == 'download'
348 and action not in response.get('actions', [])):
349 and action not in response.get('actions', [])):
349 code = 404
350 code = 404
350 else:
351 else:
351 continue
352 continue
352 else:
353 else:
353 # An error dict without a code doesn't make much sense, so
354 # An error dict without a code doesn't make much sense, so
354 # treat as a server error.
355 # treat as a server error.
355 code = response.get('error').get('code', 500)
356 code = response.get('error').get('code', 500)
356
357
357 ptrmap = {p.oid(): p for p in pointers}
358 ptrmap = {p.oid(): p for p in pointers}
358 p = ptrmap.get(response['oid'], None)
359 p = ptrmap.get(response['oid'], None)
359 if p:
360 if p:
360 filename = getattr(p, 'filename', 'unknown')
361 filename = getattr(p, 'filename', 'unknown')
361 errors = {
362 errors = {
362 404: 'The object does not exist',
363 404: 'The object does not exist',
363 410: 'The object was removed by the owner',
364 410: 'The object was removed by the owner',
364 422: 'Validation error',
365 422: 'Validation error',
365 500: 'Internal server error',
366 500: 'Internal server error',
366 }
367 }
367 msg = errors.get(code, 'status code %d' % code)
368 msg = errors.get(code, 'status code %d' % code)
368 raise LfsRemoteError(_('LFS server error for "%s": %s')
369 raise LfsRemoteError(_('LFS server error for "%s": %s')
369 % (filename, msg))
370 % (filename, msg))
370 else:
371 else:
371 raise LfsRemoteError(
372 raise LfsRemoteError(
372 _('LFS server error. Unsolicited response for oid %s')
373 _('LFS server error. Unsolicited response for oid %s')
373 % response['oid'])
374 % response['oid'])
374
375
375 def _extractobjects(self, response, pointers, action):
376 def _extractobjects(self, response, pointers, action):
376 """extract objects from response of the batch API
377 """extract objects from response of the batch API
377
378
378 response: parsed JSON object returned by batch API
379 response: parsed JSON object returned by batch API
379 return response['objects'] filtered by action
380 return response['objects'] filtered by action
380 raise if any object has an error
381 raise if any object has an error
381 """
382 """
382 # Scan errors from objects - fail early
383 # Scan errors from objects - fail early
383 objects = response.get('objects', [])
384 objects = response.get('objects', [])
384 self._checkforservererror(pointers, objects, action)
385 self._checkforservererror(pointers, objects, action)
385
386
386 # Filter objects with given action. Practically, this skips uploading
387 # Filter objects with given action. Practically, this skips uploading
387 # objects which exist in the server.
388 # objects which exist in the server.
388 filteredobjects = [o for o in objects if action in o.get('actions', [])]
389 filteredobjects = [o for o in objects if action in o.get('actions', [])]
389
390
390 return filteredobjects
391 return filteredobjects
391
392
392 def _basictransfer(self, obj, action, localstore):
393 def _basictransfer(self, obj, action, localstore):
393 """Download or upload a single object using basic transfer protocol
394 """Download or upload a single object using basic transfer protocol
394
395
395 obj: dict, an object description returned by batch API
396 obj: dict, an object description returned by batch API
396 action: string, one of ['upload', 'download']
397 action: string, one of ['upload', 'download']
397 localstore: blobstore.local
398 localstore: blobstore.local
398
399
399 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
400 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
400 basic-transfers.md
401 basic-transfers.md
401 """
402 """
402 oid = pycompat.bytestr(obj['oid'])
403 oid = pycompat.bytestr(obj['oid'])
403
404
404 href = pycompat.bytestr(obj['actions'][action].get('href'))
405 href = pycompat.bytestr(obj['actions'][action].get('href'))
405 headers = obj['actions'][action].get('header', {}).items()
406 headers = obj['actions'][action].get('header', {}).items()
406
407
407 request = util.urlreq.request(href)
408 request = util.urlreq.request(href)
408 if action == 'upload':
409 if action == 'upload':
409 # If uploading blobs, read data from local blobstore.
410 # If uploading blobs, read data from local blobstore.
410 if not localstore.verify(oid):
411 if not localstore.verify(oid):
411 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
412 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
412 hint=_('run hg verify'))
413 hint=_('run hg verify'))
413 request.data = filewithprogress(localstore.open(oid), None)
414 request.data = filewithprogress(localstore.open(oid), None)
414 request.get_method = lambda: 'PUT'
415 request.get_method = lambda: 'PUT'
415 request.add_header('Content-Type', 'application/octet-stream')
416 request.add_header('Content-Type', 'application/octet-stream')
416
417
417 for k, v in headers:
418 for k, v in headers:
418 request.add_header(k, v)
419 request.add_header(k, v)
419
420
420 response = b''
421 response = b''
421 try:
422 try:
422 req = self.urlopener.open(request)
423 with contextlib.closing(self.urlopener.open(request)) as req:
423
424 ui = self.ui # Shorten debug lines
424 if self.ui.debugflag:
425 if self.ui.debugflag:
425 self.ui.debug('Status: %d\n' % req.status)
426 ui.debug('Status: %d\n' % req.status)
426 # lfs-test-server and hg serve return headers in different order
427 # lfs-test-server and hg serve return headers in different
427 self.ui.debug('%s\n'
428 # order
429 ui.debug('%s\n'
428 % '\n'.join(sorted(str(req.info()).splitlines())))
430 % '\n'.join(sorted(str(req.info()).splitlines())))
429
431
430 if action == 'download':
432 if action == 'download':
431 # If downloading blobs, store downloaded data to local blobstore
433 # If downloading blobs, store downloaded data to local
434 # blobstore
432 localstore.download(oid, req)
435 localstore.download(oid, req)
433 else:
436 else:
434 while True:
437 while True:
435 data = req.read(1048576)
438 data = req.read(1048576)
436 if not data:
439 if not data:
437 break
440 break
438 response += data
441 response += data
439 if response:
442 if response:
440 self.ui.debug('lfs %s response: %s' % (action, response))
443 ui.debug('lfs %s response: %s' % (action, response))
441 except util.urlerr.httperror as ex:
444 except util.urlerr.httperror as ex:
442 if self.ui.debugflag:
445 if self.ui.debugflag:
443 self.ui.debug('%s: %s\n' % (oid, ex.read()))
446 self.ui.debug('%s: %s\n' % (oid, ex.read()))
444 raise LfsRemoteError(_('LFS HTTP error: %s (oid=%s, action=%s)')
447 raise LfsRemoteError(_('LFS HTTP error: %s (oid=%s, action=%s)')
445 % (ex, oid, action))
448 % (ex, oid, action))
446 except util.urlerr.urlerror as ex:
449 except util.urlerr.urlerror as ex:
447 hint = (_('attempted connection to %s')
450 hint = (_('attempted connection to %s')
448 % util.urllibcompat.getfullurl(request))
451 % util.urllibcompat.getfullurl(request))
449 raise LfsRemoteError(_('LFS error: %s') % _urlerrorreason(ex),
452 raise LfsRemoteError(_('LFS error: %s') % _urlerrorreason(ex),
450 hint=hint)
453 hint=hint)
451
454
452 def _batch(self, pointers, localstore, action):
455 def _batch(self, pointers, localstore, action):
453 if action not in ['upload', 'download']:
456 if action not in ['upload', 'download']:
454 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
457 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
455
458
456 response = self._batchrequest(pointers, action)
459 response = self._batchrequest(pointers, action)
457 objects = self._extractobjects(response, pointers, action)
460 objects = self._extractobjects(response, pointers, action)
458 total = sum(x.get('size', 0) for x in objects)
461 total = sum(x.get('size', 0) for x in objects)
459 sizes = {}
462 sizes = {}
460 for obj in objects:
463 for obj in objects:
461 sizes[obj.get('oid')] = obj.get('size', 0)
464 sizes[obj.get('oid')] = obj.get('size', 0)
462 topic = {'upload': _('lfs uploading'),
465 topic = {'upload': _('lfs uploading'),
463 'download': _('lfs downloading')}[action]
466 'download': _('lfs downloading')}[action]
464 if len(objects) > 1:
467 if len(objects) > 1:
465 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
468 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
466 % (len(objects), util.bytecount(total)))
469 % (len(objects), util.bytecount(total)))
467
470
468 def transfer(chunk):
471 def transfer(chunk):
469 for obj in chunk:
472 for obj in chunk:
470 objsize = obj.get('size', 0)
473 objsize = obj.get('size', 0)
471 if self.ui.verbose:
474 if self.ui.verbose:
472 if action == 'download':
475 if action == 'download':
473 msg = _('lfs: downloading %s (%s)\n')
476 msg = _('lfs: downloading %s (%s)\n')
474 elif action == 'upload':
477 elif action == 'upload':
475 msg = _('lfs: uploading %s (%s)\n')
478 msg = _('lfs: uploading %s (%s)\n')
476 self.ui.note(msg % (obj.get('oid'),
479 self.ui.note(msg % (obj.get('oid'),
477 util.bytecount(objsize)))
480 util.bytecount(objsize)))
478 retry = self.retry
481 retry = self.retry
479 while True:
482 while True:
480 try:
483 try:
481 self._basictransfer(obj, action, localstore)
484 self._basictransfer(obj, action, localstore)
482 yield 1, obj.get('oid')
485 yield 1, obj.get('oid')
483 break
486 break
484 except socket.error as ex:
487 except socket.error as ex:
485 if retry > 0:
488 if retry > 0:
486 self.ui.note(
489 self.ui.note(
487 _('lfs: failed: %r (remaining retry %d)\n')
490 _('lfs: failed: %r (remaining retry %d)\n')
488 % (ex, retry))
491 % (ex, retry))
489 retry -= 1
492 retry -= 1
490 continue
493 continue
491 raise
494 raise
492
495
493 # Until https multiplexing gets sorted out
496 # Until https multiplexing gets sorted out
494 if self.ui.configbool('experimental', 'lfs.worker-enable'):
497 if self.ui.configbool('experimental', 'lfs.worker-enable'):
495 oids = worker.worker(self.ui, 0.1, transfer, (),
498 oids = worker.worker(self.ui, 0.1, transfer, (),
496 sorted(objects, key=lambda o: o.get('oid')))
499 sorted(objects, key=lambda o: o.get('oid')))
497 else:
500 else:
498 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
501 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
499
502
500 with self.ui.makeprogress(topic, total=total) as progress:
503 with self.ui.makeprogress(topic, total=total) as progress:
501 progress.update(0)
504 progress.update(0)
502 processed = 0
505 processed = 0
503 blobs = 0
506 blobs = 0
504 for _one, oid in oids:
507 for _one, oid in oids:
505 processed += sizes[oid]
508 processed += sizes[oid]
506 blobs += 1
509 blobs += 1
507 progress.update(processed)
510 progress.update(processed)
508 self.ui.note(_('lfs: processed: %s\n') % oid)
511 self.ui.note(_('lfs: processed: %s\n') % oid)
509
512
510 if blobs > 0:
513 if blobs > 0:
511 if action == 'upload':
514 if action == 'upload':
512 self.ui.status(_('lfs: uploaded %d files (%s)\n')
515 self.ui.status(_('lfs: uploaded %d files (%s)\n')
513 % (blobs, util.bytecount(processed)))
516 % (blobs, util.bytecount(processed)))
514 elif action == 'download':
517 elif action == 'download':
515 self.ui.status(_('lfs: downloaded %d files (%s)\n')
518 self.ui.status(_('lfs: downloaded %d files (%s)\n')
516 % (blobs, util.bytecount(processed)))
519 % (blobs, util.bytecount(processed)))
517
520
518 def __del__(self):
521 def __del__(self):
519 # copied from mercurial/httppeer.py
522 # copied from mercurial/httppeer.py
520 urlopener = getattr(self, 'urlopener', None)
523 urlopener = getattr(self, 'urlopener', None)
521 if urlopener:
524 if urlopener:
522 for h in urlopener.handlers:
525 for h in urlopener.handlers:
523 h.close()
526 h.close()
524 getattr(h, "close_all", lambda : None)()
527 getattr(h, "close_all", lambda : None)()
525
528
526 class _dummyremote(object):
529 class _dummyremote(object):
527 """Dummy store storing blobs to temp directory."""
530 """Dummy store storing blobs to temp directory."""
528
531
529 def __init__(self, repo, url):
532 def __init__(self, repo, url):
530 fullpath = repo.vfs.join('lfs', url.path)
533 fullpath = repo.vfs.join('lfs', url.path)
531 self.vfs = lfsvfs(fullpath)
534 self.vfs = lfsvfs(fullpath)
532
535
533 def writebatch(self, pointers, fromstore):
536 def writebatch(self, pointers, fromstore):
534 for p in _deduplicate(pointers):
537 for p in _deduplicate(pointers):
535 content = fromstore.read(p.oid(), verify=True)
538 content = fromstore.read(p.oid(), verify=True)
536 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
539 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
537 fp.write(content)
540 fp.write(content)
538
541
539 def readbatch(self, pointers, tostore):
542 def readbatch(self, pointers, tostore):
540 for p in _deduplicate(pointers):
543 for p in _deduplicate(pointers):
541 with self.vfs(p.oid(), 'rb') as fp:
544 with self.vfs(p.oid(), 'rb') as fp:
542 tostore.download(p.oid(), fp)
545 tostore.download(p.oid(), fp)
543
546
544 class _nullremote(object):
547 class _nullremote(object):
545 """Null store storing blobs to /dev/null."""
548 """Null store storing blobs to /dev/null."""
546
549
547 def __init__(self, repo, url):
550 def __init__(self, repo, url):
548 pass
551 pass
549
552
550 def writebatch(self, pointers, fromstore):
553 def writebatch(self, pointers, fromstore):
551 pass
554 pass
552
555
553 def readbatch(self, pointers, tostore):
556 def readbatch(self, pointers, tostore):
554 pass
557 pass
555
558
556 class _promptremote(object):
559 class _promptremote(object):
557 """Prompt user to set lfs.url when accessed."""
560 """Prompt user to set lfs.url when accessed."""
558
561
559 def __init__(self, repo, url):
562 def __init__(self, repo, url):
560 pass
563 pass
561
564
562 def writebatch(self, pointers, fromstore, ui=None):
565 def writebatch(self, pointers, fromstore, ui=None):
563 self._prompt()
566 self._prompt()
564
567
565 def readbatch(self, pointers, tostore, ui=None):
568 def readbatch(self, pointers, tostore, ui=None):
566 self._prompt()
569 self._prompt()
567
570
568 def _prompt(self):
571 def _prompt(self):
569 raise error.Abort(_('lfs.url needs to be configured'))
572 raise error.Abort(_('lfs.url needs to be configured'))
570
573
571 _storemap = {
574 _storemap = {
572 'https': _gitlfsremote,
575 'https': _gitlfsremote,
573 'http': _gitlfsremote,
576 'http': _gitlfsremote,
574 'file': _dummyremote,
577 'file': _dummyremote,
575 'null': _nullremote,
578 'null': _nullremote,
576 None: _promptremote,
579 None: _promptremote,
577 }
580 }
578
581
579 def _deduplicate(pointers):
582 def _deduplicate(pointers):
580 """Remove any duplicate oids that exist in the list"""
583 """Remove any duplicate oids that exist in the list"""
581 reduced = util.sortdict()
584 reduced = util.sortdict()
582 for p in pointers:
585 for p in pointers:
583 reduced[p.oid()] = p
586 reduced[p.oid()] = p
584 return reduced.values()
587 return reduced.values()
585
588
586 def _verify(oid, content):
589 def _verify(oid, content):
587 realoid = hashlib.sha256(content).hexdigest()
590 realoid = hashlib.sha256(content).hexdigest()
588 if realoid != oid:
591 if realoid != oid:
589 raise LfsCorruptionError(_('detected corrupt lfs object: %s') % oid,
592 raise LfsCorruptionError(_('detected corrupt lfs object: %s') % oid,
590 hint=_('run hg verify'))
593 hint=_('run hg verify'))
591
594
592 def remote(repo, remote=None):
595 def remote(repo, remote=None):
593 """remotestore factory. return a store in _storemap depending on config
596 """remotestore factory. return a store in _storemap depending on config
594
597
595 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
598 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
596 infer the endpoint, based on the remote repository using the same path
599 infer the endpoint, based on the remote repository using the same path
597 adjustments as git. As an extension, 'http' is supported as well so that
600 adjustments as git. As an extension, 'http' is supported as well so that
598 ``hg serve`` works out of the box.
601 ``hg serve`` works out of the box.
599
602
600 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
603 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
601 """
604 """
602 lfsurl = repo.ui.config('lfs', 'url')
605 lfsurl = repo.ui.config('lfs', 'url')
603 url = util.url(lfsurl or '')
606 url = util.url(lfsurl or '')
604 if lfsurl is None:
607 if lfsurl is None:
605 if remote:
608 if remote:
606 path = remote
609 path = remote
607 elif util.safehasattr(repo, '_subtoppath'):
610 elif util.safehasattr(repo, '_subtoppath'):
608 # The pull command sets this during the optional update phase, which
611 # The pull command sets this during the optional update phase, which
609 # tells exactly where the pull originated, whether 'paths.default'
612 # tells exactly where the pull originated, whether 'paths.default'
610 # or explicit.
613 # or explicit.
611 path = repo._subtoppath
614 path = repo._subtoppath
612 else:
615 else:
613 # TODO: investigate 'paths.remote:lfsurl' style path customization,
616 # TODO: investigate 'paths.remote:lfsurl' style path customization,
614 # and fall back to inferring from 'paths.remote' if unspecified.
617 # and fall back to inferring from 'paths.remote' if unspecified.
615 path = repo.ui.config('paths', 'default') or ''
618 path = repo.ui.config('paths', 'default') or ''
616
619
617 defaulturl = util.url(path)
620 defaulturl = util.url(path)
618
621
619 # TODO: support local paths as well.
622 # TODO: support local paths as well.
620 # TODO: consider the ssh -> https transformation that git applies
623 # TODO: consider the ssh -> https transformation that git applies
621 if defaulturl.scheme in (b'http', b'https'):
624 if defaulturl.scheme in (b'http', b'https'):
622 if defaulturl.path and defaulturl.path[:-1] != b'/':
625 if defaulturl.path and defaulturl.path[:-1] != b'/':
623 defaulturl.path += b'/'
626 defaulturl.path += b'/'
624 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
627 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
625
628
626 url = util.url(bytes(defaulturl))
629 url = util.url(bytes(defaulturl))
627 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
630 repo.ui.note(_('lfs: assuming remote store: %s\n') % url)
628
631
629 scheme = url.scheme
632 scheme = url.scheme
630 if scheme not in _storemap:
633 if scheme not in _storemap:
631 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
634 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
632 return _storemap[scheme](repo, url)
635 return _storemap[scheme](repo, url)
633
636
634 class LfsRemoteError(error.StorageError):
637 class LfsRemoteError(error.StorageError):
635 pass
638 pass
636
639
637 class LfsCorruptionError(error.Abort):
640 class LfsCorruptionError(error.Abort):
638 """Raised when a corrupt blob is detected, aborting an operation
641 """Raised when a corrupt blob is detected, aborting an operation
639
642
640 It exists to allow specialized handling on the server side."""
643 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now