##// END OF EJS Templates
py3: raw stringify various JSON and HTTP headers in the LFS blobstore module...
Matt Harbison -
r41472:40efcf78 default
parent child Browse files
Show More
@@ -1,651 +1,654 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import hashlib
12 import hashlib
13 import json
13 import json
14 import os
14 import os
15 import re
15 import re
16 import socket
16 import socket
17
17
18 from mercurial.i18n import _
18 from mercurial.i18n import _
19
19
20 from mercurial import (
20 from mercurial import (
21 encoding,
21 encoding,
22 error,
22 error,
23 node,
23 node,
24 pathutil,
24 pathutil,
25 pycompat,
25 pycompat,
26 url as urlmod,
26 url as urlmod,
27 util,
27 util,
28 vfs as vfsmod,
28 vfs as vfsmod,
29 worker,
29 worker,
30 )
30 )
31
31
32 from mercurial.utils import (
32 from mercurial.utils import (
33 stringutil,
33 stringutil,
34 )
34 )
35
35
36 from ..largefiles import lfutil
36 from ..largefiles import lfutil
37
37
38 # 64 bytes for SHA256
38 # 64 bytes for SHA256
39 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
39 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
40
40
41 class lfsvfs(vfsmod.vfs):
41 class lfsvfs(vfsmod.vfs):
42 def join(self, path):
42 def join(self, path):
43 """split the path at first two characters, like: XX/XXXXX..."""
43 """split the path at first two characters, like: XX/XXXXX..."""
44 if not _lfsre.match(path):
44 if not _lfsre.match(path):
45 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
45 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
46 return super(lfsvfs, self).join(path[0:2], path[2:])
46 return super(lfsvfs, self).join(path[0:2], path[2:])
47
47
48 def walk(self, path=None, onerror=None):
48 def walk(self, path=None, onerror=None):
49 """Yield (dirpath, [], oids) tuple for blobs under path
49 """Yield (dirpath, [], oids) tuple for blobs under path
50
50
51 Oids only exist in the root of this vfs, so dirpath is always ''.
51 Oids only exist in the root of this vfs, so dirpath is always ''.
52 """
52 """
53 root = os.path.normpath(self.base)
53 root = os.path.normpath(self.base)
54 # when dirpath == root, dirpath[prefixlen:] becomes empty
54 # when dirpath == root, dirpath[prefixlen:] becomes empty
55 # because len(dirpath) < prefixlen.
55 # because len(dirpath) < prefixlen.
56 prefixlen = len(pathutil.normasprefix(root))
56 prefixlen = len(pathutil.normasprefix(root))
57 oids = []
57 oids = []
58
58
59 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path
59 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path
60 or b''),
60 or b''),
61 onerror=onerror):
61 onerror=onerror):
62 dirpath = dirpath[prefixlen:]
62 dirpath = dirpath[prefixlen:]
63
63
64 # Silently skip unexpected files and directories
64 # Silently skip unexpected files and directories
65 if len(dirpath) == 2:
65 if len(dirpath) == 2:
66 oids.extend([dirpath + f for f in files
66 oids.extend([dirpath + f for f in files
67 if _lfsre.match(dirpath + f)])
67 if _lfsre.match(dirpath + f)])
68
68
69 yield ('', [], oids)
69 yield ('', [], oids)
70
70
71 class nullvfs(lfsvfs):
71 class nullvfs(lfsvfs):
72 def __init__(self):
72 def __init__(self):
73 pass
73 pass
74
74
75 def exists(self, oid):
75 def exists(self, oid):
76 return False
76 return False
77
77
78 def read(self, oid):
78 def read(self, oid):
79 # store.read() calls into here if the blob doesn't exist in its
79 # store.read() calls into here if the blob doesn't exist in its
80 # self.vfs. Raise the same error as a normal vfs when asked to read a
80 # self.vfs. Raise the same error as a normal vfs when asked to read a
81 # file that doesn't exist. The only difference is the full file path
81 # file that doesn't exist. The only difference is the full file path
82 # isn't available in the error.
82 # isn't available in the error.
83 raise IOError(errno.ENOENT,
83 raise IOError(errno.ENOENT,
84 pycompat.sysstr(b'%s: No such file or directory' % oid))
84 pycompat.sysstr(b'%s: No such file or directory' % oid))
85
85
86 def walk(self, path=None, onerror=None):
86 def walk(self, path=None, onerror=None):
87 return (b'', [], [])
87 return (b'', [], [])
88
88
89 def write(self, oid, data):
89 def write(self, oid, data):
90 pass
90 pass
91
91
92 class filewithprogress(object):
92 class filewithprogress(object):
93 """a file-like object that supports __len__ and read.
93 """a file-like object that supports __len__ and read.
94
94
95 Useful to provide progress information for how many bytes are read.
95 Useful to provide progress information for how many bytes are read.
96 """
96 """
97
97
98 def __init__(self, fp, callback):
98 def __init__(self, fp, callback):
99 self._fp = fp
99 self._fp = fp
100 self._callback = callback # func(readsize)
100 self._callback = callback # func(readsize)
101 fp.seek(0, os.SEEK_END)
101 fp.seek(0, os.SEEK_END)
102 self._len = fp.tell()
102 self._len = fp.tell()
103 fp.seek(0)
103 fp.seek(0)
104
104
105 def __len__(self):
105 def __len__(self):
106 return self._len
106 return self._len
107
107
108 def read(self, size):
108 def read(self, size):
109 if self._fp is None:
109 if self._fp is None:
110 return b''
110 return b''
111 data = self._fp.read(size)
111 data = self._fp.read(size)
112 if data:
112 if data:
113 if self._callback:
113 if self._callback:
114 self._callback(len(data))
114 self._callback(len(data))
115 else:
115 else:
116 self._fp.close()
116 self._fp.close()
117 self._fp = None
117 self._fp = None
118 return data
118 return data
119
119
120 class local(object):
120 class local(object):
121 """Local blobstore for large file contents.
121 """Local blobstore for large file contents.
122
122
123 This blobstore is used both as a cache and as a staging area for large blobs
123 This blobstore is used both as a cache and as a staging area for large blobs
124 to be uploaded to the remote blobstore.
124 to be uploaded to the remote blobstore.
125 """
125 """
126
126
127 def __init__(self, repo):
127 def __init__(self, repo):
128 fullpath = repo.svfs.join(b'lfs/objects')
128 fullpath = repo.svfs.join(b'lfs/objects')
129 self.vfs = lfsvfs(fullpath)
129 self.vfs = lfsvfs(fullpath)
130
130
131 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
131 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
132 self.cachevfs = nullvfs()
132 self.cachevfs = nullvfs()
133 else:
133 else:
134 usercache = lfutil._usercachedir(repo.ui, b'lfs')
134 usercache = lfutil._usercachedir(repo.ui, b'lfs')
135 self.cachevfs = lfsvfs(usercache)
135 self.cachevfs = lfsvfs(usercache)
136 self.ui = repo.ui
136 self.ui = repo.ui
137
137
138 def open(self, oid):
138 def open(self, oid):
139 """Open a read-only file descriptor to the named blob, in either the
139 """Open a read-only file descriptor to the named blob, in either the
140 usercache or the local store."""
140 usercache or the local store."""
141 # The usercache is the most likely place to hold the file. Commit will
141 # The usercache is the most likely place to hold the file. Commit will
142 # write to both it and the local store, as will anything that downloads
142 # write to both it and the local store, as will anything that downloads
143 # the blobs. However, things like clone without an update won't
143 # the blobs. However, things like clone without an update won't
144 # populate the local store. For an init + push of a local clone,
144 # populate the local store. For an init + push of a local clone,
145 # the usercache is the only place it _could_ be. If not present, the
145 # the usercache is the only place it _could_ be. If not present, the
146 # missing file msg here will indicate the local repo, not the usercache.
146 # missing file msg here will indicate the local repo, not the usercache.
147 if self.cachevfs.exists(oid):
147 if self.cachevfs.exists(oid):
148 return self.cachevfs(oid, b'rb')
148 return self.cachevfs(oid, b'rb')
149
149
150 return self.vfs(oid, b'rb')
150 return self.vfs(oid, b'rb')
151
151
152 def download(self, oid, src):
152 def download(self, oid, src):
153 """Read the blob from the remote source in chunks, verify the content,
153 """Read the blob from the remote source in chunks, verify the content,
154 and write to this local blobstore."""
154 and write to this local blobstore."""
155 sha256 = hashlib.sha256()
155 sha256 = hashlib.sha256()
156
156
157 with self.vfs(oid, b'wb', atomictemp=True) as fp:
157 with self.vfs(oid, b'wb', atomictemp=True) as fp:
158 for chunk in util.filechunkiter(src, size=1048576):
158 for chunk in util.filechunkiter(src, size=1048576):
159 fp.write(chunk)
159 fp.write(chunk)
160 sha256.update(chunk)
160 sha256.update(chunk)
161
161
162 realoid = node.hex(sha256.digest())
162 realoid = node.hex(sha256.digest())
163 if realoid != oid:
163 if realoid != oid:
164 raise LfsCorruptionError(_(b'corrupt remote lfs object: %s')
164 raise LfsCorruptionError(_(b'corrupt remote lfs object: %s')
165 % oid)
165 % oid)
166
166
167 self._linktousercache(oid)
167 self._linktousercache(oid)
168
168
169 def write(self, oid, data):
169 def write(self, oid, data):
170 """Write blob to local blobstore.
170 """Write blob to local blobstore.
171
171
172 This should only be called from the filelog during a commit or similar.
172 This should only be called from the filelog during a commit or similar.
173 As such, there is no need to verify the data. Imports from a remote
173 As such, there is no need to verify the data. Imports from a remote
174 store must use ``download()`` instead."""
174 store must use ``download()`` instead."""
175 with self.vfs(oid, b'wb', atomictemp=True) as fp:
175 with self.vfs(oid, b'wb', atomictemp=True) as fp:
176 fp.write(data)
176 fp.write(data)
177
177
178 self._linktousercache(oid)
178 self._linktousercache(oid)
179
179
180 def linkfromusercache(self, oid):
180 def linkfromusercache(self, oid):
181 """Link blobs found in the user cache into this store.
181 """Link blobs found in the user cache into this store.
182
182
183 The server module needs to do this when it lets the client know not to
183 The server module needs to do this when it lets the client know not to
184 upload the blob, to ensure it is always available in this store.
184 upload the blob, to ensure it is always available in this store.
185 Normally this is done implicitly when the client reads or writes the
185 Normally this is done implicitly when the client reads or writes the
186 blob, but that doesn't happen when the server tells the client that it
186 blob, but that doesn't happen when the server tells the client that it
187 already has the blob.
187 already has the blob.
188 """
188 """
189 if (not isinstance(self.cachevfs, nullvfs)
189 if (not isinstance(self.cachevfs, nullvfs)
190 and not self.vfs.exists(oid)):
190 and not self.vfs.exists(oid)):
191 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
191 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
192 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
192 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
193
193
194 def _linktousercache(self, oid):
194 def _linktousercache(self, oid):
195 # XXX: should we verify the content of the cache, and hardlink back to
195 # XXX: should we verify the content of the cache, and hardlink back to
196 # the local store on success, but truncate, write and link on failure?
196 # the local store on success, but truncate, write and link on failure?
197 if (not self.cachevfs.exists(oid)
197 if (not self.cachevfs.exists(oid)
198 and not isinstance(self.cachevfs, nullvfs)):
198 and not isinstance(self.cachevfs, nullvfs)):
199 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
199 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
200 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
200 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
201
201
202 def read(self, oid, verify=True):
202 def read(self, oid, verify=True):
203 """Read blob from local blobstore."""
203 """Read blob from local blobstore."""
204 if not self.vfs.exists(oid):
204 if not self.vfs.exists(oid):
205 blob = self._read(self.cachevfs, oid, verify)
205 blob = self._read(self.cachevfs, oid, verify)
206
206
207 # Even if revlog will verify the content, it needs to be verified
207 # Even if revlog will verify the content, it needs to be verified
208 # now before making the hardlink to avoid propagating corrupt blobs.
208 # now before making the hardlink to avoid propagating corrupt blobs.
209 # Don't abort if corruption is detected, because `hg verify` will
209 # Don't abort if corruption is detected, because `hg verify` will
210 # give more useful info about the corruption- simply don't add the
210 # give more useful info about the corruption- simply don't add the
211 # hardlink.
211 # hardlink.
212 if verify or node.hex(hashlib.sha256(blob).digest()) == oid:
212 if verify or node.hex(hashlib.sha256(blob).digest()) == oid:
213 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
213 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
214 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
214 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
215 else:
215 else:
216 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
216 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
217 blob = self._read(self.vfs, oid, verify)
217 blob = self._read(self.vfs, oid, verify)
218 return blob
218 return blob
219
219
220 def _read(self, vfs, oid, verify):
220 def _read(self, vfs, oid, verify):
221 """Read blob (after verifying) from the given store"""
221 """Read blob (after verifying) from the given store"""
222 blob = vfs.read(oid)
222 blob = vfs.read(oid)
223 if verify:
223 if verify:
224 _verify(oid, blob)
224 _verify(oid, blob)
225 return blob
225 return blob
226
226
227 def verify(self, oid):
227 def verify(self, oid):
228 """Indicate whether or not the hash of the underlying file matches its
228 """Indicate whether or not the hash of the underlying file matches its
229 name."""
229 name."""
230 sha256 = hashlib.sha256()
230 sha256 = hashlib.sha256()
231
231
232 with self.open(oid) as fp:
232 with self.open(oid) as fp:
233 for chunk in util.filechunkiter(fp, size=1048576):
233 for chunk in util.filechunkiter(fp, size=1048576):
234 sha256.update(chunk)
234 sha256.update(chunk)
235
235
236 return oid == node.hex(sha256.digest())
236 return oid == node.hex(sha256.digest())
237
237
238 def has(self, oid):
238 def has(self, oid):
239 """Returns True if the local blobstore contains the requested blob,
239 """Returns True if the local blobstore contains the requested blob,
240 False otherwise."""
240 False otherwise."""
241 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
241 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
242
242
243 def _urlerrorreason(urlerror):
243 def _urlerrorreason(urlerror):
244 '''Create a friendly message for the given URLError to be used in an
244 '''Create a friendly message for the given URLError to be used in an
245 LfsRemoteError message.
245 LfsRemoteError message.
246 '''
246 '''
247 inst = urlerror
247 inst = urlerror
248
248
249 if isinstance(urlerror.reason, Exception):
249 if isinstance(urlerror.reason, Exception):
250 inst = urlerror.reason
250 inst = urlerror.reason
251
251
252 if util.safehasattr(inst, 'reason'):
252 if util.safehasattr(inst, 'reason'):
253 try: # usually it is in the form (errno, strerror)
253 try: # usually it is in the form (errno, strerror)
254 reason = inst.reason.args[1]
254 reason = inst.reason.args[1]
255 except (AttributeError, IndexError):
255 except (AttributeError, IndexError):
256 # it might be anything, for example a string
256 # it might be anything, for example a string
257 reason = inst.reason
257 reason = inst.reason
258 if isinstance(reason, pycompat.unicode):
258 if isinstance(reason, pycompat.unicode):
259 # SSLError of Python 2.7.9 contains a unicode
259 # SSLError of Python 2.7.9 contains a unicode
260 reason = encoding.unitolocal(reason)
260 reason = encoding.unitolocal(reason)
261 return reason
261 return reason
262 elif getattr(inst, "strerror", None):
262 elif getattr(inst, "strerror", None):
263 return encoding.strtolocal(inst.strerror)
263 return encoding.strtolocal(inst.strerror)
264 else:
264 else:
265 return stringutil.forcebytestr(urlerror)
265 return stringutil.forcebytestr(urlerror)
266
266
267 class _gitlfsremote(object):
267 class _gitlfsremote(object):
268
268
269 def __init__(self, repo, url):
269 def __init__(self, repo, url):
270 ui = repo.ui
270 ui = repo.ui
271 self.ui = ui
271 self.ui = ui
272 baseurl, authinfo = url.authinfo()
272 baseurl, authinfo = url.authinfo()
273 self.baseurl = baseurl.rstrip(b'/')
273 self.baseurl = baseurl.rstrip(b'/')
274 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
274 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
275 if not useragent:
275 if not useragent:
276 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
276 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
277 self.urlopener = urlmod.opener(ui, authinfo, useragent)
277 self.urlopener = urlmod.opener(ui, authinfo, useragent)
278 self.retry = ui.configint(b'lfs', b'retry')
278 self.retry = ui.configint(b'lfs', b'retry')
279
279
280 def writebatch(self, pointers, fromstore):
280 def writebatch(self, pointers, fromstore):
281 """Batch upload from local to remote blobstore."""
281 """Batch upload from local to remote blobstore."""
282 self._batch(_deduplicate(pointers), fromstore, b'upload')
282 self._batch(_deduplicate(pointers), fromstore, b'upload')
283
283
284 def readbatch(self, pointers, tostore):
284 def readbatch(self, pointers, tostore):
285 """Batch download from remote to local blostore."""
285 """Batch download from remote to local blostore."""
286 self._batch(_deduplicate(pointers), tostore, b'download')
286 self._batch(_deduplicate(pointers), tostore, b'download')
287
287
288 def _batchrequest(self, pointers, action):
288 def _batchrequest(self, pointers, action):
289 """Get metadata about objects pointed by pointers for given action
289 """Get metadata about objects pointed by pointers for given action
290
290
291 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
291 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
292 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
292 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
293 """
293 """
294 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
294 objects = [{r'oid': pycompat.strurl(p.oid()),
295 requestdata = json.dumps({
295 r'size': p.size()} for p in pointers]
296 'objects': objects,
296 requestdata = pycompat.bytesurl(json.dumps({
297 'operation': action,
297 r'objects': objects,
298 })
298 r'operation': pycompat.strurl(action),
299 }))
299 url = b'%s/objects/batch' % self.baseurl
300 url = b'%s/objects/batch' % self.baseurl
300 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
301 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
301 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
302 batchreq.add_header(r'Accept', r'application/vnd.git-lfs+json')
302 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
303 batchreq.add_header(r'Content-Type', r'application/vnd.git-lfs+json')
303 try:
304 try:
304 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
305 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
305 rawjson = rsp.read()
306 rawjson = rsp.read()
306 except util.urlerr.httperror as ex:
307 except util.urlerr.httperror as ex:
307 hints = {
308 hints = {
308 400: _(b'check that lfs serving is enabled on %s and "%s" is '
309 400: _(b'check that lfs serving is enabled on %s and "%s" is '
309 'supported') % (self.baseurl, action),
310 'supported') % (self.baseurl, action),
310 404: _(b'the "lfs.url" config may be used to override %s')
311 404: _(b'the "lfs.url" config may be used to override %s')
311 % self.baseurl,
312 % self.baseurl,
312 }
313 }
313 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
314 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
314 raise LfsRemoteError(
315 raise LfsRemoteError(
315 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
316 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
316 hint=hint)
317 hint=hint)
317 except util.urlerr.urlerror as ex:
318 except util.urlerr.urlerror as ex:
318 hint = (_(b'the "lfs.url" config may be used to override %s')
319 hint = (_(b'the "lfs.url" config may be used to override %s')
319 % self.baseurl)
320 % self.baseurl)
320 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
321 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
321 hint=hint)
322 hint=hint)
322 try:
323 try:
323 response = json.loads(rawjson)
324 response = json.loads(rawjson)
324 except ValueError:
325 except ValueError:
325 raise LfsRemoteError(_(b'LFS server returns invalid JSON: %s')
326 raise LfsRemoteError(_(b'LFS server returns invalid JSON: %s')
326 % rawjson.encode("utf-8"))
327 % rawjson.encode("utf-8"))
327
328
328 if self.ui.debugflag:
329 if self.ui.debugflag:
329 self.ui.debug(b'Status: %d\n' % rsp.status)
330 self.ui.debug(b'Status: %d\n' % rsp.status)
330 # lfs-test-server and hg serve return headers in different order
331 # lfs-test-server and hg serve return headers in different order
331 headers = pycompat.bytestr(rsp.info())
332 headers = pycompat.bytestr(rsp.info())
332 self.ui.debug(b'%s\n'
333 self.ui.debug(b'%s\n'
333 % b'\n'.join(sorted(headers.splitlines())))
334 % b'\n'.join(sorted(headers.splitlines())))
334
335
335 if 'objects' in response:
336 if r'objects' in response:
336 response['objects'] = sorted(response['objects'],
337 response[r'objects'] = sorted(response[r'objects'],
337 key=lambda p: p['oid'])
338 key=lambda p: p[r'oid'])
338 self.ui.debug('%s\n'
339 self.ui.debug(b'%s\n'
339 % json.dumps(response, indent=2,
340 % pycompat.bytesurl(
340 separators=('', ': '), sort_keys=True))
341 json.dumps(response, indent=2,
342 separators=(r'', r': '),
343 sort_keys=True)))
341
344
342 return response
345 return response
343
346
344 def _checkforservererror(self, pointers, responses, action):
347 def _checkforservererror(self, pointers, responses, action):
345 """Scans errors from objects
348 """Scans errors from objects
346
349
347 Raises LfsRemoteError if any objects have an error"""
350 Raises LfsRemoteError if any objects have an error"""
348 for response in responses:
351 for response in responses:
349 # The server should return 404 when objects cannot be found. Some
352 # The server should return 404 when objects cannot be found. Some
350 # server implementation (ex. lfs-test-server) does not set "error"
353 # server implementation (ex. lfs-test-server) does not set "error"
351 # but just removes "download" from "actions". Treat that case
354 # but just removes "download" from "actions". Treat that case
352 # as the same as 404 error.
355 # as the same as 404 error.
353 if b'error' not in response:
356 if b'error' not in response:
354 if (action == b'download'
357 if (action == b'download'
355 and action not in response.get(b'actions', [])):
358 and action not in response.get(b'actions', [])):
356 code = 404
359 code = 404
357 else:
360 else:
358 continue
361 continue
359 else:
362 else:
360 # An error dict without a code doesn't make much sense, so
363 # An error dict without a code doesn't make much sense, so
361 # treat as a server error.
364 # treat as a server error.
362 code = response.get(b'error').get(b'code', 500)
365 code = response.get(b'error').get(b'code', 500)
363
366
364 ptrmap = {p.oid(): p for p in pointers}
367 ptrmap = {p.oid(): p for p in pointers}
365 p = ptrmap.get(response[b'oid'], None)
368 p = ptrmap.get(response[b'oid'], None)
366 if p:
369 if p:
367 filename = getattr(p, 'filename', b'unknown')
370 filename = getattr(p, 'filename', b'unknown')
368 errors = {
371 errors = {
369 404: b'The object does not exist',
372 404: b'The object does not exist',
370 410: b'The object was removed by the owner',
373 410: b'The object was removed by the owner',
371 422: b'Validation error',
374 422: b'Validation error',
372 500: b'Internal server error',
375 500: b'Internal server error',
373 }
376 }
374 msg = errors.get(code, b'status code %d' % code)
377 msg = errors.get(code, b'status code %d' % code)
375 raise LfsRemoteError(_(b'LFS server error for "%s": %s')
378 raise LfsRemoteError(_(b'LFS server error for "%s": %s')
376 % (filename, msg))
379 % (filename, msg))
377 else:
380 else:
378 raise LfsRemoteError(
381 raise LfsRemoteError(
379 _(b'LFS server error. Unsolicited response for oid %s')
382 _(b'LFS server error. Unsolicited response for oid %s')
380 % response[b'oid'])
383 % response[b'oid'])
381
384
382 def _extractobjects(self, response, pointers, action):
385 def _extractobjects(self, response, pointers, action):
383 """extract objects from response of the batch API
386 """extract objects from response of the batch API
384
387
385 response: parsed JSON object returned by batch API
388 response: parsed JSON object returned by batch API
386 return response['objects'] filtered by action
389 return response['objects'] filtered by action
387 raise if any object has an error
390 raise if any object has an error
388 """
391 """
389 # Scan errors from objects - fail early
392 # Scan errors from objects - fail early
390 objects = response.get(b'objects', [])
393 objects = response.get(b'objects', [])
391 self._checkforservererror(pointers, objects, action)
394 self._checkforservererror(pointers, objects, action)
392
395
393 # Filter objects with given action. Practically, this skips uploading
396 # Filter objects with given action. Practically, this skips uploading
394 # objects which exist in the server.
397 # objects which exist in the server.
395 filteredobjects = [o for o in objects
398 filteredobjects = [o for o in objects
396 if action in o.get(b'actions', [])]
399 if action in o.get(b'actions', [])]
397
400
398 return filteredobjects
401 return filteredobjects
399
402
400 def _basictransfer(self, obj, action, localstore):
403 def _basictransfer(self, obj, action, localstore):
401 """Download or upload a single object using basic transfer protocol
404 """Download or upload a single object using basic transfer protocol
402
405
403 obj: dict, an object description returned by batch API
406 obj: dict, an object description returned by batch API
404 action: string, one of ['upload', 'download']
407 action: string, one of ['upload', 'download']
405 localstore: blobstore.local
408 localstore: blobstore.local
406
409
407 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
410 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
408 basic-transfers.md
411 basic-transfers.md
409 """
412 """
410 oid = pycompat.bytestr(obj['oid'])
413 oid = pycompat.bytestr(obj['oid'])
411
414
412 href = pycompat.bytestr(obj['actions'][action].get('href'))
415 href = pycompat.bytestr(obj['actions'][action].get('href'))
413 headers = obj['actions'][action].get('header', {}).items()
416 headers = obj['actions'][action].get('header', {}).items()
414
417
415 request = util.urlreq.request(href)
418 request = util.urlreq.request(href)
416 if action == b'upload':
419 if action == b'upload':
417 # If uploading blobs, read data from local blobstore.
420 # If uploading blobs, read data from local blobstore.
418 if not localstore.verify(oid):
421 if not localstore.verify(oid):
419 raise error.Abort(_(b'detected corrupt lfs object: %s') % oid,
422 raise error.Abort(_(b'detected corrupt lfs object: %s') % oid,
420 hint=_(b'run hg verify'))
423 hint=_(b'run hg verify'))
421 request.data = filewithprogress(localstore.open(oid), None)
424 request.data = filewithprogress(localstore.open(oid), None)
422 request.get_method = lambda: 'PUT'
425 request.get_method = lambda: r'PUT'
423 request.add_header('Content-Type', 'application/octet-stream')
426 request.add_header(r'Content-Type', r'application/octet-stream')
424
427
425 for k, v in headers:
428 for k, v in headers:
426 request.add_header(k, v)
429 request.add_header(k, v)
427
430
428 response = b''
431 response = b''
429 try:
432 try:
430 with contextlib.closing(self.urlopener.open(request)) as req:
433 with contextlib.closing(self.urlopener.open(request)) as req:
431 ui = self.ui # Shorten debug lines
434 ui = self.ui # Shorten debug lines
432 if self.ui.debugflag:
435 if self.ui.debugflag:
433 ui.debug(b'Status: %d\n' % req.status)
436 ui.debug(b'Status: %d\n' % req.status)
434 # lfs-test-server and hg serve return headers in different
437 # lfs-test-server and hg serve return headers in different
435 # order
438 # order
436 headers = pycompat.bytestr(req.info())
439 headers = pycompat.bytestr(req.info())
437 ui.debug(b'%s\n'
440 ui.debug(b'%s\n'
438 % b'\n'.join(sorted(headers.splitlines())))
441 % b'\n'.join(sorted(headers.splitlines())))
439
442
440 if action == b'download':
443 if action == b'download':
441 # If downloading blobs, store downloaded data to local
444 # If downloading blobs, store downloaded data to local
442 # blobstore
445 # blobstore
443 localstore.download(oid, req)
446 localstore.download(oid, req)
444 else:
447 else:
445 while True:
448 while True:
446 data = req.read(1048576)
449 data = req.read(1048576)
447 if not data:
450 if not data:
448 break
451 break
449 response += data
452 response += data
450 if response:
453 if response:
451 ui.debug(b'lfs %s response: %s' % (action, response))
454 ui.debug(b'lfs %s response: %s' % (action, response))
452 except util.urlerr.httperror as ex:
455 except util.urlerr.httperror as ex:
453 if self.ui.debugflag:
456 if self.ui.debugflag:
454 self.ui.debug(b'%s: %s\n' % (oid, ex.read())) # XXX: also bytes?
457 self.ui.debug(b'%s: %s\n' % (oid, ex.read())) # XXX: also bytes?
455 raise LfsRemoteError(_(b'LFS HTTP error: %s (oid=%s, action=%s)')
458 raise LfsRemoteError(_(b'LFS HTTP error: %s (oid=%s, action=%s)')
456 % (stringutil.forcebytestr(ex), oid, action))
459 % (stringutil.forcebytestr(ex), oid, action))
457 except util.urlerr.urlerror as ex:
460 except util.urlerr.urlerror as ex:
458 hint = (_(b'attempted connection to %s')
461 hint = (_(b'attempted connection to %s')
459 % pycompat.bytesurl(util.urllibcompat.getfullurl(request)))
462 % pycompat.bytesurl(util.urllibcompat.getfullurl(request)))
460 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
463 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
461 hint=hint)
464 hint=hint)
462
465
463 def _batch(self, pointers, localstore, action):
466 def _batch(self, pointers, localstore, action):
464 if action not in [b'upload', b'download']:
467 if action not in [b'upload', b'download']:
465 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
468 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
466
469
467 response = self._batchrequest(pointers, action)
470 response = self._batchrequest(pointers, action)
468 objects = self._extractobjects(response, pointers, action)
471 objects = self._extractobjects(response, pointers, action)
469 total = sum(x.get(b'size', 0) for x in objects)
472 total = sum(x.get(b'size', 0) for x in objects)
470 sizes = {}
473 sizes = {}
471 for obj in objects:
474 for obj in objects:
472 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
475 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
473 topic = {b'upload': _(b'lfs uploading'),
476 topic = {b'upload': _(b'lfs uploading'),
474 b'download': _(b'lfs downloading')}[action]
477 b'download': _(b'lfs downloading')}[action]
475 if len(objects) > 1:
478 if len(objects) > 1:
476 self.ui.note(_(b'lfs: need to transfer %d objects (%s)\n')
479 self.ui.note(_(b'lfs: need to transfer %d objects (%s)\n')
477 % (len(objects), util.bytecount(total)))
480 % (len(objects), util.bytecount(total)))
478
481
479 def transfer(chunk):
482 def transfer(chunk):
480 for obj in chunk:
483 for obj in chunk:
481 objsize = obj.get(b'size', 0)
484 objsize = obj.get(b'size', 0)
482 if self.ui.verbose:
485 if self.ui.verbose:
483 if action == b'download':
486 if action == b'download':
484 msg = _(b'lfs: downloading %s (%s)\n')
487 msg = _(b'lfs: downloading %s (%s)\n')
485 elif action == b'upload':
488 elif action == b'upload':
486 msg = _(b'lfs: uploading %s (%s)\n')
489 msg = _(b'lfs: uploading %s (%s)\n')
487 self.ui.note(msg % (obj.get(b'oid'),
490 self.ui.note(msg % (obj.get(b'oid'),
488 util.bytecount(objsize)))
491 util.bytecount(objsize)))
489 retry = self.retry
492 retry = self.retry
490 while True:
493 while True:
491 try:
494 try:
492 self._basictransfer(obj, action, localstore)
495 self._basictransfer(obj, action, localstore)
493 yield 1, obj.get(b'oid')
496 yield 1, obj.get(b'oid')
494 break
497 break
495 except socket.error as ex:
498 except socket.error as ex:
496 if retry > 0:
499 if retry > 0:
497 self.ui.note(
500 self.ui.note(
498 _(b'lfs: failed: %r (remaining retry %d)\n')
501 _(b'lfs: failed: %r (remaining retry %d)\n')
499 % (stringutil.forcebytestr(ex), retry))
502 % (stringutil.forcebytestr(ex), retry))
500 retry -= 1
503 retry -= 1
501 continue
504 continue
502 raise
505 raise
503
506
504 # Until https multiplexing gets sorted out
507 # Until https multiplexing gets sorted out
505 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
508 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
506 oids = worker.worker(self.ui, 0.1, transfer, (),
509 oids = worker.worker(self.ui, 0.1, transfer, (),
507 sorted(objects, key=lambda o: o.get(b'oid')))
510 sorted(objects, key=lambda o: o.get(b'oid')))
508 else:
511 else:
509 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
512 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
510
513
511 with self.ui.makeprogress(topic, total=total) as progress:
514 with self.ui.makeprogress(topic, total=total) as progress:
512 progress.update(0)
515 progress.update(0)
513 processed = 0
516 processed = 0
514 blobs = 0
517 blobs = 0
515 for _one, oid in oids:
518 for _one, oid in oids:
516 processed += sizes[oid]
519 processed += sizes[oid]
517 blobs += 1
520 blobs += 1
518 progress.update(processed)
521 progress.update(processed)
519 self.ui.note(_(b'lfs: processed: %s\n') % oid)
522 self.ui.note(_(b'lfs: processed: %s\n') % oid)
520
523
521 if blobs > 0:
524 if blobs > 0:
522 if action == b'upload':
525 if action == b'upload':
523 self.ui.status(_(b'lfs: uploaded %d files (%s)\n')
526 self.ui.status(_(b'lfs: uploaded %d files (%s)\n')
524 % (blobs, util.bytecount(processed)))
527 % (blobs, util.bytecount(processed)))
525 elif action == b'download':
528 elif action == b'download':
526 self.ui.status(_(b'lfs: downloaded %d files (%s)\n')
529 self.ui.status(_(b'lfs: downloaded %d files (%s)\n')
527 % (blobs, util.bytecount(processed)))
530 % (blobs, util.bytecount(processed)))
528
531
529 def __del__(self):
532 def __del__(self):
530 # copied from mercurial/httppeer.py
533 # copied from mercurial/httppeer.py
531 urlopener = getattr(self, 'urlopener', None)
534 urlopener = getattr(self, 'urlopener', None)
532 if urlopener:
535 if urlopener:
533 for h in urlopener.handlers:
536 for h in urlopener.handlers:
534 h.close()
537 h.close()
535 getattr(h, "close_all", lambda : None)()
538 getattr(h, "close_all", lambda : None)()
536
539
537 class _dummyremote(object):
540 class _dummyremote(object):
538 """Dummy store storing blobs to temp directory."""
541 """Dummy store storing blobs to temp directory."""
539
542
540 def __init__(self, repo, url):
543 def __init__(self, repo, url):
541 fullpath = repo.vfs.join(b'lfs', url.path)
544 fullpath = repo.vfs.join(b'lfs', url.path)
542 self.vfs = lfsvfs(fullpath)
545 self.vfs = lfsvfs(fullpath)
543
546
544 def writebatch(self, pointers, fromstore):
547 def writebatch(self, pointers, fromstore):
545 for p in _deduplicate(pointers):
548 for p in _deduplicate(pointers):
546 content = fromstore.read(p.oid(), verify=True)
549 content = fromstore.read(p.oid(), verify=True)
547 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
550 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
548 fp.write(content)
551 fp.write(content)
549
552
550 def readbatch(self, pointers, tostore):
553 def readbatch(self, pointers, tostore):
551 for p in _deduplicate(pointers):
554 for p in _deduplicate(pointers):
552 with self.vfs(p.oid(), b'rb') as fp:
555 with self.vfs(p.oid(), b'rb') as fp:
553 tostore.download(p.oid(), fp)
556 tostore.download(p.oid(), fp)
554
557
555 class _nullremote(object):
558 class _nullremote(object):
556 """Null store storing blobs to /dev/null."""
559 """Null store storing blobs to /dev/null."""
557
560
558 def __init__(self, repo, url):
561 def __init__(self, repo, url):
559 pass
562 pass
560
563
561 def writebatch(self, pointers, fromstore):
564 def writebatch(self, pointers, fromstore):
562 pass
565 pass
563
566
564 def readbatch(self, pointers, tostore):
567 def readbatch(self, pointers, tostore):
565 pass
568 pass
566
569
567 class _promptremote(object):
570 class _promptremote(object):
568 """Prompt user to set lfs.url when accessed."""
571 """Prompt user to set lfs.url when accessed."""
569
572
570 def __init__(self, repo, url):
573 def __init__(self, repo, url):
571 pass
574 pass
572
575
573 def writebatch(self, pointers, fromstore, ui=None):
576 def writebatch(self, pointers, fromstore, ui=None):
574 self._prompt()
577 self._prompt()
575
578
576 def readbatch(self, pointers, tostore, ui=None):
579 def readbatch(self, pointers, tostore, ui=None):
577 self._prompt()
580 self._prompt()
578
581
579 def _prompt(self):
582 def _prompt(self):
580 raise error.Abort(_(b'lfs.url needs to be configured'))
583 raise error.Abort(_(b'lfs.url needs to be configured'))
581
584
582 _storemap = {
585 _storemap = {
583 b'https': _gitlfsremote,
586 b'https': _gitlfsremote,
584 b'http': _gitlfsremote,
587 b'http': _gitlfsremote,
585 b'file': _dummyremote,
588 b'file': _dummyremote,
586 b'null': _nullremote,
589 b'null': _nullremote,
587 None: _promptremote,
590 None: _promptremote,
588 }
591 }
589
592
590 def _deduplicate(pointers):
593 def _deduplicate(pointers):
591 """Remove any duplicate oids that exist in the list"""
594 """Remove any duplicate oids that exist in the list"""
592 reduced = util.sortdict()
595 reduced = util.sortdict()
593 for p in pointers:
596 for p in pointers:
594 reduced[p.oid()] = p
597 reduced[p.oid()] = p
595 return reduced.values()
598 return reduced.values()
596
599
597 def _verify(oid, content):
600 def _verify(oid, content):
598 realoid = node.hex(hashlib.sha256(content).digest())
601 realoid = node.hex(hashlib.sha256(content).digest())
599 if realoid != oid:
602 if realoid != oid:
600 raise LfsCorruptionError(_(b'detected corrupt lfs object: %s') % oid,
603 raise LfsCorruptionError(_(b'detected corrupt lfs object: %s') % oid,
601 hint=_(b'run hg verify'))
604 hint=_(b'run hg verify'))
602
605
603 def remote(repo, remote=None):
606 def remote(repo, remote=None):
604 """remotestore factory. return a store in _storemap depending on config
607 """remotestore factory. return a store in _storemap depending on config
605
608
606 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
609 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
607 infer the endpoint, based on the remote repository using the same path
610 infer the endpoint, based on the remote repository using the same path
608 adjustments as git. As an extension, 'http' is supported as well so that
611 adjustments as git. As an extension, 'http' is supported as well so that
609 ``hg serve`` works out of the box.
612 ``hg serve`` works out of the box.
610
613
611 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
614 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
612 """
615 """
613 lfsurl = repo.ui.config(b'lfs', b'url')
616 lfsurl = repo.ui.config(b'lfs', b'url')
614 url = util.url(lfsurl or '')
617 url = util.url(lfsurl or '')
615 if lfsurl is None:
618 if lfsurl is None:
616 if remote:
619 if remote:
617 path = remote
620 path = remote
618 elif util.safehasattr(repo, '_subtoppath'):
621 elif util.safehasattr(repo, '_subtoppath'):
619 # The pull command sets this during the optional update phase, which
622 # The pull command sets this during the optional update phase, which
620 # tells exactly where the pull originated, whether 'paths.default'
623 # tells exactly where the pull originated, whether 'paths.default'
621 # or explicit.
624 # or explicit.
622 path = repo._subtoppath
625 path = repo._subtoppath
623 else:
626 else:
624 # TODO: investigate 'paths.remote:lfsurl' style path customization,
627 # TODO: investigate 'paths.remote:lfsurl' style path customization,
625 # and fall back to inferring from 'paths.remote' if unspecified.
628 # and fall back to inferring from 'paths.remote' if unspecified.
626 path = repo.ui.config(b'paths', b'default') or b''
629 path = repo.ui.config(b'paths', b'default') or b''
627
630
628 defaulturl = util.url(path)
631 defaulturl = util.url(path)
629
632
630 # TODO: support local paths as well.
633 # TODO: support local paths as well.
631 # TODO: consider the ssh -> https transformation that git applies
634 # TODO: consider the ssh -> https transformation that git applies
632 if defaulturl.scheme in (b'http', b'https'):
635 if defaulturl.scheme in (b'http', b'https'):
633 if defaulturl.path and defaulturl.path[:-1] != b'/':
636 if defaulturl.path and defaulturl.path[:-1] != b'/':
634 defaulturl.path += b'/'
637 defaulturl.path += b'/'
635 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
638 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
636
639
637 url = util.url(bytes(defaulturl))
640 url = util.url(bytes(defaulturl))
638 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
641 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
639
642
640 scheme = url.scheme
643 scheme = url.scheme
641 if scheme not in _storemap:
644 if scheme not in _storemap:
642 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
645 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
643 return _storemap[scheme](repo, url)
646 return _storemap[scheme](repo, url)
644
647
645 class LfsRemoteError(error.StorageError):
648 class LfsRemoteError(error.StorageError):
646 pass
649 pass
647
650
648 class LfsCorruptionError(error.Abort):
651 class LfsCorruptionError(error.Abort):
649 """Raised when a corrupt blob is detected, aborting an operation
652 """Raised when a corrupt blob is detected, aborting an operation
650
653
651 It exists to allow specialized handling on the server side."""
654 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now