##// END OF EJS Templates
py3: byteify the decoded JSON responses upon receipt in the LFS blobstore...
Matt Harbison -
r41474:7df10ea7 default
parent child Browse files
Show More
@@ -1,654 +1,658
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import hashlib
12 import hashlib
13 import json
13 import json
14 import os
14 import os
15 import re
15 import re
16 import socket
16 import socket
17
17
18 from mercurial.i18n import _
18 from mercurial.i18n import _
19
19
20 from mercurial import (
20 from mercurial import (
21 encoding,
21 encoding,
22 error,
22 error,
23 node,
23 node,
24 pathutil,
24 pathutil,
25 pycompat,
25 pycompat,
26 url as urlmod,
26 url as urlmod,
27 util,
27 util,
28 vfs as vfsmod,
28 vfs as vfsmod,
29 worker,
29 worker,
30 )
30 )
31
31
32 from mercurial.utils import (
32 from mercurial.utils import (
33 stringutil,
33 stringutil,
34 )
34 )
35
35
36 from ..largefiles import lfutil
36 from ..largefiles import lfutil
37
37
38 # 64 bytes for SHA256
38 # 64 bytes for SHA256
39 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
39 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
40
40
41 class lfsvfs(vfsmod.vfs):
41 class lfsvfs(vfsmod.vfs):
42 def join(self, path):
42 def join(self, path):
43 """split the path at first two characters, like: XX/XXXXX..."""
43 """split the path at first two characters, like: XX/XXXXX..."""
44 if not _lfsre.match(path):
44 if not _lfsre.match(path):
45 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
45 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
46 return super(lfsvfs, self).join(path[0:2], path[2:])
46 return super(lfsvfs, self).join(path[0:2], path[2:])
47
47
48 def walk(self, path=None, onerror=None):
48 def walk(self, path=None, onerror=None):
49 """Yield (dirpath, [], oids) tuple for blobs under path
49 """Yield (dirpath, [], oids) tuple for blobs under path
50
50
51 Oids only exist in the root of this vfs, so dirpath is always ''.
51 Oids only exist in the root of this vfs, so dirpath is always ''.
52 """
52 """
53 root = os.path.normpath(self.base)
53 root = os.path.normpath(self.base)
54 # when dirpath == root, dirpath[prefixlen:] becomes empty
54 # when dirpath == root, dirpath[prefixlen:] becomes empty
55 # because len(dirpath) < prefixlen.
55 # because len(dirpath) < prefixlen.
56 prefixlen = len(pathutil.normasprefix(root))
56 prefixlen = len(pathutil.normasprefix(root))
57 oids = []
57 oids = []
58
58
59 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path
59 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path
60 or b''),
60 or b''),
61 onerror=onerror):
61 onerror=onerror):
62 dirpath = dirpath[prefixlen:]
62 dirpath = dirpath[prefixlen:]
63
63
64 # Silently skip unexpected files and directories
64 # Silently skip unexpected files and directories
65 if len(dirpath) == 2:
65 if len(dirpath) == 2:
66 oids.extend([dirpath + f for f in files
66 oids.extend([dirpath + f for f in files
67 if _lfsre.match(dirpath + f)])
67 if _lfsre.match(dirpath + f)])
68
68
69 yield ('', [], oids)
69 yield ('', [], oids)
70
70
71 class nullvfs(lfsvfs):
71 class nullvfs(lfsvfs):
72 def __init__(self):
72 def __init__(self):
73 pass
73 pass
74
74
75 def exists(self, oid):
75 def exists(self, oid):
76 return False
76 return False
77
77
78 def read(self, oid):
78 def read(self, oid):
79 # store.read() calls into here if the blob doesn't exist in its
79 # store.read() calls into here if the blob doesn't exist in its
80 # self.vfs. Raise the same error as a normal vfs when asked to read a
80 # self.vfs. Raise the same error as a normal vfs when asked to read a
81 # file that doesn't exist. The only difference is the full file path
81 # file that doesn't exist. The only difference is the full file path
82 # isn't available in the error.
82 # isn't available in the error.
83 raise IOError(errno.ENOENT,
83 raise IOError(errno.ENOENT,
84 pycompat.sysstr(b'%s: No such file or directory' % oid))
84 pycompat.sysstr(b'%s: No such file or directory' % oid))
85
85
86 def walk(self, path=None, onerror=None):
86 def walk(self, path=None, onerror=None):
87 return (b'', [], [])
87 return (b'', [], [])
88
88
89 def write(self, oid, data):
89 def write(self, oid, data):
90 pass
90 pass
91
91
92 class filewithprogress(object):
92 class filewithprogress(object):
93 """a file-like object that supports __len__ and read.
93 """a file-like object that supports __len__ and read.
94
94
95 Useful to provide progress information for how many bytes are read.
95 Useful to provide progress information for how many bytes are read.
96 """
96 """
97
97
98 def __init__(self, fp, callback):
98 def __init__(self, fp, callback):
99 self._fp = fp
99 self._fp = fp
100 self._callback = callback # func(readsize)
100 self._callback = callback # func(readsize)
101 fp.seek(0, os.SEEK_END)
101 fp.seek(0, os.SEEK_END)
102 self._len = fp.tell()
102 self._len = fp.tell()
103 fp.seek(0)
103 fp.seek(0)
104
104
105 def __len__(self):
105 def __len__(self):
106 return self._len
106 return self._len
107
107
108 def read(self, size):
108 def read(self, size):
109 if self._fp is None:
109 if self._fp is None:
110 return b''
110 return b''
111 data = self._fp.read(size)
111 data = self._fp.read(size)
112 if data:
112 if data:
113 if self._callback:
113 if self._callback:
114 self._callback(len(data))
114 self._callback(len(data))
115 else:
115 else:
116 self._fp.close()
116 self._fp.close()
117 self._fp = None
117 self._fp = None
118 return data
118 return data
119
119
120 class local(object):
120 class local(object):
121 """Local blobstore for large file contents.
121 """Local blobstore for large file contents.
122
122
123 This blobstore is used both as a cache and as a staging area for large blobs
123 This blobstore is used both as a cache and as a staging area for large blobs
124 to be uploaded to the remote blobstore.
124 to be uploaded to the remote blobstore.
125 """
125 """
126
126
127 def __init__(self, repo):
127 def __init__(self, repo):
128 fullpath = repo.svfs.join(b'lfs/objects')
128 fullpath = repo.svfs.join(b'lfs/objects')
129 self.vfs = lfsvfs(fullpath)
129 self.vfs = lfsvfs(fullpath)
130
130
131 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
131 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
132 self.cachevfs = nullvfs()
132 self.cachevfs = nullvfs()
133 else:
133 else:
134 usercache = lfutil._usercachedir(repo.ui, b'lfs')
134 usercache = lfutil._usercachedir(repo.ui, b'lfs')
135 self.cachevfs = lfsvfs(usercache)
135 self.cachevfs = lfsvfs(usercache)
136 self.ui = repo.ui
136 self.ui = repo.ui
137
137
138 def open(self, oid):
138 def open(self, oid):
139 """Open a read-only file descriptor to the named blob, in either the
139 """Open a read-only file descriptor to the named blob, in either the
140 usercache or the local store."""
140 usercache or the local store."""
141 # The usercache is the most likely place to hold the file. Commit will
141 # The usercache is the most likely place to hold the file. Commit will
142 # write to both it and the local store, as will anything that downloads
142 # write to both it and the local store, as will anything that downloads
143 # the blobs. However, things like clone without an update won't
143 # the blobs. However, things like clone without an update won't
144 # populate the local store. For an init + push of a local clone,
144 # populate the local store. For an init + push of a local clone,
145 # the usercache is the only place it _could_ be. If not present, the
145 # the usercache is the only place it _could_ be. If not present, the
146 # missing file msg here will indicate the local repo, not the usercache.
146 # missing file msg here will indicate the local repo, not the usercache.
147 if self.cachevfs.exists(oid):
147 if self.cachevfs.exists(oid):
148 return self.cachevfs(oid, b'rb')
148 return self.cachevfs(oid, b'rb')
149
149
150 return self.vfs(oid, b'rb')
150 return self.vfs(oid, b'rb')
151
151
152 def download(self, oid, src):
152 def download(self, oid, src):
153 """Read the blob from the remote source in chunks, verify the content,
153 """Read the blob from the remote source in chunks, verify the content,
154 and write to this local blobstore."""
154 and write to this local blobstore."""
155 sha256 = hashlib.sha256()
155 sha256 = hashlib.sha256()
156
156
157 with self.vfs(oid, b'wb', atomictemp=True) as fp:
157 with self.vfs(oid, b'wb', atomictemp=True) as fp:
158 for chunk in util.filechunkiter(src, size=1048576):
158 for chunk in util.filechunkiter(src, size=1048576):
159 fp.write(chunk)
159 fp.write(chunk)
160 sha256.update(chunk)
160 sha256.update(chunk)
161
161
162 realoid = node.hex(sha256.digest())
162 realoid = node.hex(sha256.digest())
163 if realoid != oid:
163 if realoid != oid:
164 raise LfsCorruptionError(_(b'corrupt remote lfs object: %s')
164 raise LfsCorruptionError(_(b'corrupt remote lfs object: %s')
165 % oid)
165 % oid)
166
166
167 self._linktousercache(oid)
167 self._linktousercache(oid)
168
168
169 def write(self, oid, data):
169 def write(self, oid, data):
170 """Write blob to local blobstore.
170 """Write blob to local blobstore.
171
171
172 This should only be called from the filelog during a commit or similar.
172 This should only be called from the filelog during a commit or similar.
173 As such, there is no need to verify the data. Imports from a remote
173 As such, there is no need to verify the data. Imports from a remote
174 store must use ``download()`` instead."""
174 store must use ``download()`` instead."""
175 with self.vfs(oid, b'wb', atomictemp=True) as fp:
175 with self.vfs(oid, b'wb', atomictemp=True) as fp:
176 fp.write(data)
176 fp.write(data)
177
177
178 self._linktousercache(oid)
178 self._linktousercache(oid)
179
179
180 def linkfromusercache(self, oid):
180 def linkfromusercache(self, oid):
181 """Link blobs found in the user cache into this store.
181 """Link blobs found in the user cache into this store.
182
182
183 The server module needs to do this when it lets the client know not to
183 The server module needs to do this when it lets the client know not to
184 upload the blob, to ensure it is always available in this store.
184 upload the blob, to ensure it is always available in this store.
185 Normally this is done implicitly when the client reads or writes the
185 Normally this is done implicitly when the client reads or writes the
186 blob, but that doesn't happen when the server tells the client that it
186 blob, but that doesn't happen when the server tells the client that it
187 already has the blob.
187 already has the blob.
188 """
188 """
189 if (not isinstance(self.cachevfs, nullvfs)
189 if (not isinstance(self.cachevfs, nullvfs)
190 and not self.vfs.exists(oid)):
190 and not self.vfs.exists(oid)):
191 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
191 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
192 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
192 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
193
193
194 def _linktousercache(self, oid):
194 def _linktousercache(self, oid):
195 # XXX: should we verify the content of the cache, and hardlink back to
195 # XXX: should we verify the content of the cache, and hardlink back to
196 # the local store on success, but truncate, write and link on failure?
196 # the local store on success, but truncate, write and link on failure?
197 if (not self.cachevfs.exists(oid)
197 if (not self.cachevfs.exists(oid)
198 and not isinstance(self.cachevfs, nullvfs)):
198 and not isinstance(self.cachevfs, nullvfs)):
199 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
199 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
200 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
200 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
201
201
202 def read(self, oid, verify=True):
202 def read(self, oid, verify=True):
203 """Read blob from local blobstore."""
203 """Read blob from local blobstore."""
204 if not self.vfs.exists(oid):
204 if not self.vfs.exists(oid):
205 blob = self._read(self.cachevfs, oid, verify)
205 blob = self._read(self.cachevfs, oid, verify)
206
206
207 # Even if revlog will verify the content, it needs to be verified
207 # Even if revlog will verify the content, it needs to be verified
208 # now before making the hardlink to avoid propagating corrupt blobs.
208 # now before making the hardlink to avoid propagating corrupt blobs.
209 # Don't abort if corruption is detected, because `hg verify` will
209 # Don't abort if corruption is detected, because `hg verify` will
210 # give more useful info about the corruption- simply don't add the
210 # give more useful info about the corruption- simply don't add the
211 # hardlink.
211 # hardlink.
212 if verify or node.hex(hashlib.sha256(blob).digest()) == oid:
212 if verify or node.hex(hashlib.sha256(blob).digest()) == oid:
213 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
213 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
214 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
214 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
215 else:
215 else:
216 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
216 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
217 blob = self._read(self.vfs, oid, verify)
217 blob = self._read(self.vfs, oid, verify)
218 return blob
218 return blob
219
219
220 def _read(self, vfs, oid, verify):
220 def _read(self, vfs, oid, verify):
221 """Read blob (after verifying) from the given store"""
221 """Read blob (after verifying) from the given store"""
222 blob = vfs.read(oid)
222 blob = vfs.read(oid)
223 if verify:
223 if verify:
224 _verify(oid, blob)
224 _verify(oid, blob)
225 return blob
225 return blob
226
226
227 def verify(self, oid):
227 def verify(self, oid):
228 """Indicate whether or not the hash of the underlying file matches its
228 """Indicate whether or not the hash of the underlying file matches its
229 name."""
229 name."""
230 sha256 = hashlib.sha256()
230 sha256 = hashlib.sha256()
231
231
232 with self.open(oid) as fp:
232 with self.open(oid) as fp:
233 for chunk in util.filechunkiter(fp, size=1048576):
233 for chunk in util.filechunkiter(fp, size=1048576):
234 sha256.update(chunk)
234 sha256.update(chunk)
235
235
236 return oid == node.hex(sha256.digest())
236 return oid == node.hex(sha256.digest())
237
237
238 def has(self, oid):
238 def has(self, oid):
239 """Returns True if the local blobstore contains the requested blob,
239 """Returns True if the local blobstore contains the requested blob,
240 False otherwise."""
240 False otherwise."""
241 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
241 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
242
242
243 def _urlerrorreason(urlerror):
243 def _urlerrorreason(urlerror):
244 '''Create a friendly message for the given URLError to be used in an
244 '''Create a friendly message for the given URLError to be used in an
245 LfsRemoteError message.
245 LfsRemoteError message.
246 '''
246 '''
247 inst = urlerror
247 inst = urlerror
248
248
249 if isinstance(urlerror.reason, Exception):
249 if isinstance(urlerror.reason, Exception):
250 inst = urlerror.reason
250 inst = urlerror.reason
251
251
252 if util.safehasattr(inst, 'reason'):
252 if util.safehasattr(inst, 'reason'):
253 try: # usually it is in the form (errno, strerror)
253 try: # usually it is in the form (errno, strerror)
254 reason = inst.reason.args[1]
254 reason = inst.reason.args[1]
255 except (AttributeError, IndexError):
255 except (AttributeError, IndexError):
256 # it might be anything, for example a string
256 # it might be anything, for example a string
257 reason = inst.reason
257 reason = inst.reason
258 if isinstance(reason, pycompat.unicode):
258 if isinstance(reason, pycompat.unicode):
259 # SSLError of Python 2.7.9 contains a unicode
259 # SSLError of Python 2.7.9 contains a unicode
260 reason = encoding.unitolocal(reason)
260 reason = encoding.unitolocal(reason)
261 return reason
261 return reason
262 elif getattr(inst, "strerror", None):
262 elif getattr(inst, "strerror", None):
263 return encoding.strtolocal(inst.strerror)
263 return encoding.strtolocal(inst.strerror)
264 else:
264 else:
265 return stringutil.forcebytestr(urlerror)
265 return stringutil.forcebytestr(urlerror)
266
266
267 class _gitlfsremote(object):
267 class _gitlfsremote(object):
268
268
269 def __init__(self, repo, url):
269 def __init__(self, repo, url):
270 ui = repo.ui
270 ui = repo.ui
271 self.ui = ui
271 self.ui = ui
272 baseurl, authinfo = url.authinfo()
272 baseurl, authinfo = url.authinfo()
273 self.baseurl = baseurl.rstrip(b'/')
273 self.baseurl = baseurl.rstrip(b'/')
274 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
274 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
275 if not useragent:
275 if not useragent:
276 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
276 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
277 self.urlopener = urlmod.opener(ui, authinfo, useragent)
277 self.urlopener = urlmod.opener(ui, authinfo, useragent)
278 self.retry = ui.configint(b'lfs', b'retry')
278 self.retry = ui.configint(b'lfs', b'retry')
279
279
280 def writebatch(self, pointers, fromstore):
280 def writebatch(self, pointers, fromstore):
281 """Batch upload from local to remote blobstore."""
281 """Batch upload from local to remote blobstore."""
282 self._batch(_deduplicate(pointers), fromstore, b'upload')
282 self._batch(_deduplicate(pointers), fromstore, b'upload')
283
283
284 def readbatch(self, pointers, tostore):
284 def readbatch(self, pointers, tostore):
285 """Batch download from remote to local blostore."""
285 """Batch download from remote to local blostore."""
286 self._batch(_deduplicate(pointers), tostore, b'download')
286 self._batch(_deduplicate(pointers), tostore, b'download')
287
287
288 def _batchrequest(self, pointers, action):
288 def _batchrequest(self, pointers, action):
289 """Get metadata about objects pointed by pointers for given action
289 """Get metadata about objects pointed by pointers for given action
290
290
291 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
291 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
292 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
292 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
293 """
293 """
294 objects = [{r'oid': pycompat.strurl(p.oid()),
294 objects = [{r'oid': pycompat.strurl(p.oid()),
295 r'size': p.size()} for p in pointers]
295 r'size': p.size()} for p in pointers]
296 requestdata = pycompat.bytesurl(json.dumps({
296 requestdata = pycompat.bytesurl(json.dumps({
297 r'objects': objects,
297 r'objects': objects,
298 r'operation': pycompat.strurl(action),
298 r'operation': pycompat.strurl(action),
299 }))
299 }))
300 url = b'%s/objects/batch' % self.baseurl
300 url = b'%s/objects/batch' % self.baseurl
301 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
301 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
302 batchreq.add_header(r'Accept', r'application/vnd.git-lfs+json')
302 batchreq.add_header(r'Accept', r'application/vnd.git-lfs+json')
303 batchreq.add_header(r'Content-Type', r'application/vnd.git-lfs+json')
303 batchreq.add_header(r'Content-Type', r'application/vnd.git-lfs+json')
304 try:
304 try:
305 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
305 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
306 rawjson = rsp.read()
306 rawjson = rsp.read()
307 except util.urlerr.httperror as ex:
307 except util.urlerr.httperror as ex:
308 hints = {
308 hints = {
309 400: _(b'check that lfs serving is enabled on %s and "%s" is '
309 400: _(b'check that lfs serving is enabled on %s and "%s" is '
310 'supported') % (self.baseurl, action),
310 b'supported') % (self.baseurl, action),
311 404: _(b'the "lfs.url" config may be used to override %s')
311 404: _(b'the "lfs.url" config may be used to override %s')
312 % self.baseurl,
312 % self.baseurl,
313 }
313 }
314 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
314 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
315 raise LfsRemoteError(
315 raise LfsRemoteError(
316 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
316 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
317 hint=hint)
317 hint=hint)
318 except util.urlerr.urlerror as ex:
318 except util.urlerr.urlerror as ex:
319 hint = (_(b'the "lfs.url" config may be used to override %s')
319 hint = (_(b'the "lfs.url" config may be used to override %s')
320 % self.baseurl)
320 % self.baseurl)
321 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
321 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
322 hint=hint)
322 hint=hint)
323 try:
323 try:
324 response = json.loads(rawjson)
324 response = json.loads(rawjson)
325 except ValueError:
325 except ValueError:
326 raise LfsRemoteError(_(b'LFS server returns invalid JSON: %s')
326 raise LfsRemoteError(_(b'LFS server returns invalid JSON: %s')
327 % rawjson.encode("utf-8"))
327 % rawjson.encode("utf-8"))
328
328
329 if self.ui.debugflag:
329 if self.ui.debugflag:
330 self.ui.debug(b'Status: %d\n' % rsp.status)
330 self.ui.debug(b'Status: %d\n' % rsp.status)
331 # lfs-test-server and hg serve return headers in different order
331 # lfs-test-server and hg serve return headers in different order
332 headers = pycompat.bytestr(rsp.info())
332 headers = pycompat.bytestr(rsp.info())
333 self.ui.debug(b'%s\n'
333 self.ui.debug(b'%s\n'
334 % b'\n'.join(sorted(headers.splitlines())))
334 % b'\n'.join(sorted(headers.splitlines())))
335
335
336 if r'objects' in response:
336 if r'objects' in response:
337 response[r'objects'] = sorted(response[r'objects'],
337 response[r'objects'] = sorted(response[r'objects'],
338 key=lambda p: p[r'oid'])
338 key=lambda p: p[r'oid'])
339 self.ui.debug(b'%s\n'
339 self.ui.debug(b'%s\n'
340 % pycompat.bytesurl(
340 % pycompat.bytesurl(
341 json.dumps(response, indent=2,
341 json.dumps(response, indent=2,
342 separators=(r'', r': '),
342 separators=(r'', r': '),
343 sort_keys=True)))
343 sort_keys=True)))
344
344
345 return response
345 def encodestr(x):
346 if isinstance(x, pycompat.unicode):
347 return x.encode(u'utf-8')
348 return x
349
350 return pycompat.rapply(encodestr, response)
346
351
347 def _checkforservererror(self, pointers, responses, action):
352 def _checkforservererror(self, pointers, responses, action):
348 """Scans errors from objects
353 """Scans errors from objects
349
354
350 Raises LfsRemoteError if any objects have an error"""
355 Raises LfsRemoteError if any objects have an error"""
351 for response in responses:
356 for response in responses:
352 # The server should return 404 when objects cannot be found. Some
357 # The server should return 404 when objects cannot be found. Some
353 # server implementation (ex. lfs-test-server) does not set "error"
358 # server implementation (ex. lfs-test-server) does not set "error"
354 # but just removes "download" from "actions". Treat that case
359 # but just removes "download" from "actions". Treat that case
355 # as the same as 404 error.
360 # as the same as 404 error.
356 if b'error' not in response:
361 if b'error' not in response:
357 if (action == b'download'
362 if (action == b'download'
358 and action not in response.get(b'actions', [])):
363 and action not in response.get(b'actions', [])):
359 code = 404
364 code = 404
360 else:
365 else:
361 continue
366 continue
362 else:
367 else:
363 # An error dict without a code doesn't make much sense, so
368 # An error dict without a code doesn't make much sense, so
364 # treat as a server error.
369 # treat as a server error.
365 code = response.get(b'error').get(b'code', 500)
370 code = response.get(b'error').get(b'code', 500)
366
371
367 ptrmap = {p.oid(): p for p in pointers}
372 ptrmap = {p.oid(): p for p in pointers}
368 p = ptrmap.get(response[b'oid'], None)
373 p = ptrmap.get(response[b'oid'], None)
369 if p:
374 if p:
370 filename = getattr(p, 'filename', b'unknown')
375 filename = getattr(p, 'filename', b'unknown')
371 errors = {
376 errors = {
372 404: b'The object does not exist',
377 404: b'The object does not exist',
373 410: b'The object was removed by the owner',
378 410: b'The object was removed by the owner',
374 422: b'Validation error',
379 422: b'Validation error',
375 500: b'Internal server error',
380 500: b'Internal server error',
376 }
381 }
377 msg = errors.get(code, b'status code %d' % code)
382 msg = errors.get(code, b'status code %d' % code)
378 raise LfsRemoteError(_(b'LFS server error for "%s": %s')
383 raise LfsRemoteError(_(b'LFS server error for "%s": %s')
379 % (filename, msg))
384 % (filename, msg))
380 else:
385 else:
381 raise LfsRemoteError(
386 raise LfsRemoteError(
382 _(b'LFS server error. Unsolicited response for oid %s')
387 _(b'LFS server error. Unsolicited response for oid %s')
383 % response[b'oid'])
388 % response[b'oid'])
384
389
385 def _extractobjects(self, response, pointers, action):
390 def _extractobjects(self, response, pointers, action):
386 """extract objects from response of the batch API
391 """extract objects from response of the batch API
387
392
388 response: parsed JSON object returned by batch API
393 response: parsed JSON object returned by batch API
389 return response['objects'] filtered by action
394 return response['objects'] filtered by action
390 raise if any object has an error
395 raise if any object has an error
391 """
396 """
392 # Scan errors from objects - fail early
397 # Scan errors from objects - fail early
393 objects = response.get(b'objects', [])
398 objects = response.get(b'objects', [])
394 self._checkforservererror(pointers, objects, action)
399 self._checkforservererror(pointers, objects, action)
395
400
396 # Filter objects with given action. Practically, this skips uploading
401 # Filter objects with given action. Practically, this skips uploading
397 # objects which exist in the server.
402 # objects which exist in the server.
398 filteredobjects = [o for o in objects
403 filteredobjects = [o for o in objects
399 if action in o.get(b'actions', [])]
404 if action in o.get(b'actions', [])]
400
405
401 return filteredobjects
406 return filteredobjects
402
407
403 def _basictransfer(self, obj, action, localstore):
408 def _basictransfer(self, obj, action, localstore):
404 """Download or upload a single object using basic transfer protocol
409 """Download or upload a single object using basic transfer protocol
405
410
406 obj: dict, an object description returned by batch API
411 obj: dict, an object description returned by batch API
407 action: string, one of ['upload', 'download']
412 action: string, one of ['upload', 'download']
408 localstore: blobstore.local
413 localstore: blobstore.local
409
414
410 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
415 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
411 basic-transfers.md
416 basic-transfers.md
412 """
417 """
413 oid = pycompat.bytestr(obj['oid'])
418 oid = obj[b'oid']
419 href = obj[b'actions'][action].get(b'href')
420 headers = obj[b'actions'][action].get(b'header', {}).items()
414
421
415 href = pycompat.bytestr(obj['actions'][action].get('href'))
422 request = util.urlreq.request(pycompat.strurl(href))
416 headers = obj['actions'][action].get('header', {}).items()
417
418 request = util.urlreq.request(href)
419 if action == b'upload':
423 if action == b'upload':
420 # If uploading blobs, read data from local blobstore.
424 # If uploading blobs, read data from local blobstore.
421 if not localstore.verify(oid):
425 if not localstore.verify(oid):
422 raise error.Abort(_(b'detected corrupt lfs object: %s') % oid,
426 raise error.Abort(_(b'detected corrupt lfs object: %s') % oid,
423 hint=_(b'run hg verify'))
427 hint=_(b'run hg verify'))
424 request.data = filewithprogress(localstore.open(oid), None)
428 request.data = filewithprogress(localstore.open(oid), None)
425 request.get_method = lambda: r'PUT'
429 request.get_method = lambda: r'PUT'
426 request.add_header(r'Content-Type', r'application/octet-stream')
430 request.add_header(r'Content-Type', r'application/octet-stream')
427
431
428 for k, v in headers:
432 for k, v in headers:
429 request.add_header(k, v)
433 request.add_header(pycompat.strurl(k), pycompat.strurl(v))
430
434
431 response = b''
435 response = b''
432 try:
436 try:
433 with contextlib.closing(self.urlopener.open(request)) as req:
437 with contextlib.closing(self.urlopener.open(request)) as req:
434 ui = self.ui # Shorten debug lines
438 ui = self.ui # Shorten debug lines
435 if self.ui.debugflag:
439 if self.ui.debugflag:
436 ui.debug(b'Status: %d\n' % req.status)
440 ui.debug(b'Status: %d\n' % req.status)
437 # lfs-test-server and hg serve return headers in different
441 # lfs-test-server and hg serve return headers in different
438 # order
442 # order
439 headers = pycompat.bytestr(req.info())
443 headers = pycompat.bytestr(req.info())
440 ui.debug(b'%s\n'
444 ui.debug(b'%s\n'
441 % b'\n'.join(sorted(headers.splitlines())))
445 % b'\n'.join(sorted(headers.splitlines())))
442
446
443 if action == b'download':
447 if action == b'download':
444 # If downloading blobs, store downloaded data to local
448 # If downloading blobs, store downloaded data to local
445 # blobstore
449 # blobstore
446 localstore.download(oid, req)
450 localstore.download(oid, req)
447 else:
451 else:
448 while True:
452 while True:
449 data = req.read(1048576)
453 data = req.read(1048576)
450 if not data:
454 if not data:
451 break
455 break
452 response += data
456 response += data
453 if response:
457 if response:
454 ui.debug(b'lfs %s response: %s' % (action, response))
458 ui.debug(b'lfs %s response: %s' % (action, response))
455 except util.urlerr.httperror as ex:
459 except util.urlerr.httperror as ex:
456 if self.ui.debugflag:
460 if self.ui.debugflag:
457 self.ui.debug(b'%s: %s\n' % (oid, ex.read())) # XXX: also bytes?
461 self.ui.debug(b'%s: %s\n' % (oid, ex.read())) # XXX: also bytes?
458 raise LfsRemoteError(_(b'LFS HTTP error: %s (oid=%s, action=%s)')
462 raise LfsRemoteError(_(b'LFS HTTP error: %s (oid=%s, action=%s)')
459 % (stringutil.forcebytestr(ex), oid, action))
463 % (stringutil.forcebytestr(ex), oid, action))
460 except util.urlerr.urlerror as ex:
464 except util.urlerr.urlerror as ex:
461 hint = (_(b'attempted connection to %s')
465 hint = (_(b'attempted connection to %s')
462 % pycompat.bytesurl(util.urllibcompat.getfullurl(request)))
466 % pycompat.bytesurl(util.urllibcompat.getfullurl(request)))
463 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
467 raise LfsRemoteError(_(b'LFS error: %s') % _urlerrorreason(ex),
464 hint=hint)
468 hint=hint)
465
469
466 def _batch(self, pointers, localstore, action):
470 def _batch(self, pointers, localstore, action):
467 if action not in [b'upload', b'download']:
471 if action not in [b'upload', b'download']:
468 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
472 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
469
473
470 response = self._batchrequest(pointers, action)
474 response = self._batchrequest(pointers, action)
471 objects = self._extractobjects(response, pointers, action)
475 objects = self._extractobjects(response, pointers, action)
472 total = sum(x.get(b'size', 0) for x in objects)
476 total = sum(x.get(b'size', 0) for x in objects)
473 sizes = {}
477 sizes = {}
474 for obj in objects:
478 for obj in objects:
475 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
479 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
476 topic = {b'upload': _(b'lfs uploading'),
480 topic = {b'upload': _(b'lfs uploading'),
477 b'download': _(b'lfs downloading')}[action]
481 b'download': _(b'lfs downloading')}[action]
478 if len(objects) > 1:
482 if len(objects) > 1:
479 self.ui.note(_(b'lfs: need to transfer %d objects (%s)\n')
483 self.ui.note(_(b'lfs: need to transfer %d objects (%s)\n')
480 % (len(objects), util.bytecount(total)))
484 % (len(objects), util.bytecount(total)))
481
485
482 def transfer(chunk):
486 def transfer(chunk):
483 for obj in chunk:
487 for obj in chunk:
484 objsize = obj.get(b'size', 0)
488 objsize = obj.get(b'size', 0)
485 if self.ui.verbose:
489 if self.ui.verbose:
486 if action == b'download':
490 if action == b'download':
487 msg = _(b'lfs: downloading %s (%s)\n')
491 msg = _(b'lfs: downloading %s (%s)\n')
488 elif action == b'upload':
492 elif action == b'upload':
489 msg = _(b'lfs: uploading %s (%s)\n')
493 msg = _(b'lfs: uploading %s (%s)\n')
490 self.ui.note(msg % (obj.get(b'oid'),
494 self.ui.note(msg % (obj.get(b'oid'),
491 util.bytecount(objsize)))
495 util.bytecount(objsize)))
492 retry = self.retry
496 retry = self.retry
493 while True:
497 while True:
494 try:
498 try:
495 self._basictransfer(obj, action, localstore)
499 self._basictransfer(obj, action, localstore)
496 yield 1, obj.get(b'oid')
500 yield 1, obj.get(b'oid')
497 break
501 break
498 except socket.error as ex:
502 except socket.error as ex:
499 if retry > 0:
503 if retry > 0:
500 self.ui.note(
504 self.ui.note(
501 _(b'lfs: failed: %r (remaining retry %d)\n')
505 _(b'lfs: failed: %r (remaining retry %d)\n')
502 % (stringutil.forcebytestr(ex), retry))
506 % (stringutil.forcebytestr(ex), retry))
503 retry -= 1
507 retry -= 1
504 continue
508 continue
505 raise
509 raise
506
510
507 # Until https multiplexing gets sorted out
511 # Until https multiplexing gets sorted out
508 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
512 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
509 oids = worker.worker(self.ui, 0.1, transfer, (),
513 oids = worker.worker(self.ui, 0.1, transfer, (),
510 sorted(objects, key=lambda o: o.get(b'oid')))
514 sorted(objects, key=lambda o: o.get(b'oid')))
511 else:
515 else:
512 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
516 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
513
517
514 with self.ui.makeprogress(topic, total=total) as progress:
518 with self.ui.makeprogress(topic, total=total) as progress:
515 progress.update(0)
519 progress.update(0)
516 processed = 0
520 processed = 0
517 blobs = 0
521 blobs = 0
518 for _one, oid in oids:
522 for _one, oid in oids:
519 processed += sizes[oid]
523 processed += sizes[oid]
520 blobs += 1
524 blobs += 1
521 progress.update(processed)
525 progress.update(processed)
522 self.ui.note(_(b'lfs: processed: %s\n') % oid)
526 self.ui.note(_(b'lfs: processed: %s\n') % oid)
523
527
524 if blobs > 0:
528 if blobs > 0:
525 if action == b'upload':
529 if action == b'upload':
526 self.ui.status(_(b'lfs: uploaded %d files (%s)\n')
530 self.ui.status(_(b'lfs: uploaded %d files (%s)\n')
527 % (blobs, util.bytecount(processed)))
531 % (blobs, util.bytecount(processed)))
528 elif action == b'download':
532 elif action == b'download':
529 self.ui.status(_(b'lfs: downloaded %d files (%s)\n')
533 self.ui.status(_(b'lfs: downloaded %d files (%s)\n')
530 % (blobs, util.bytecount(processed)))
534 % (blobs, util.bytecount(processed)))
531
535
532 def __del__(self):
536 def __del__(self):
533 # copied from mercurial/httppeer.py
537 # copied from mercurial/httppeer.py
534 urlopener = getattr(self, 'urlopener', None)
538 urlopener = getattr(self, 'urlopener', None)
535 if urlopener:
539 if urlopener:
536 for h in urlopener.handlers:
540 for h in urlopener.handlers:
537 h.close()
541 h.close()
538 getattr(h, "close_all", lambda : None)()
542 getattr(h, "close_all", lambda : None)()
539
543
540 class _dummyremote(object):
544 class _dummyremote(object):
541 """Dummy store storing blobs to temp directory."""
545 """Dummy store storing blobs to temp directory."""
542
546
543 def __init__(self, repo, url):
547 def __init__(self, repo, url):
544 fullpath = repo.vfs.join(b'lfs', url.path)
548 fullpath = repo.vfs.join(b'lfs', url.path)
545 self.vfs = lfsvfs(fullpath)
549 self.vfs = lfsvfs(fullpath)
546
550
547 def writebatch(self, pointers, fromstore):
551 def writebatch(self, pointers, fromstore):
548 for p in _deduplicate(pointers):
552 for p in _deduplicate(pointers):
549 content = fromstore.read(p.oid(), verify=True)
553 content = fromstore.read(p.oid(), verify=True)
550 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
554 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
551 fp.write(content)
555 fp.write(content)
552
556
553 def readbatch(self, pointers, tostore):
557 def readbatch(self, pointers, tostore):
554 for p in _deduplicate(pointers):
558 for p in _deduplicate(pointers):
555 with self.vfs(p.oid(), b'rb') as fp:
559 with self.vfs(p.oid(), b'rb') as fp:
556 tostore.download(p.oid(), fp)
560 tostore.download(p.oid(), fp)
557
561
558 class _nullremote(object):
562 class _nullremote(object):
559 """Null store storing blobs to /dev/null."""
563 """Null store storing blobs to /dev/null."""
560
564
561 def __init__(self, repo, url):
565 def __init__(self, repo, url):
562 pass
566 pass
563
567
564 def writebatch(self, pointers, fromstore):
568 def writebatch(self, pointers, fromstore):
565 pass
569 pass
566
570
567 def readbatch(self, pointers, tostore):
571 def readbatch(self, pointers, tostore):
568 pass
572 pass
569
573
570 class _promptremote(object):
574 class _promptremote(object):
571 """Prompt user to set lfs.url when accessed."""
575 """Prompt user to set lfs.url when accessed."""
572
576
573 def __init__(self, repo, url):
577 def __init__(self, repo, url):
574 pass
578 pass
575
579
576 def writebatch(self, pointers, fromstore, ui=None):
580 def writebatch(self, pointers, fromstore, ui=None):
577 self._prompt()
581 self._prompt()
578
582
579 def readbatch(self, pointers, tostore, ui=None):
583 def readbatch(self, pointers, tostore, ui=None):
580 self._prompt()
584 self._prompt()
581
585
582 def _prompt(self):
586 def _prompt(self):
583 raise error.Abort(_(b'lfs.url needs to be configured'))
587 raise error.Abort(_(b'lfs.url needs to be configured'))
584
588
585 _storemap = {
589 _storemap = {
586 b'https': _gitlfsremote,
590 b'https': _gitlfsremote,
587 b'http': _gitlfsremote,
591 b'http': _gitlfsremote,
588 b'file': _dummyremote,
592 b'file': _dummyremote,
589 b'null': _nullremote,
593 b'null': _nullremote,
590 None: _promptremote,
594 None: _promptremote,
591 }
595 }
592
596
593 def _deduplicate(pointers):
597 def _deduplicate(pointers):
594 """Remove any duplicate oids that exist in the list"""
598 """Remove any duplicate oids that exist in the list"""
595 reduced = util.sortdict()
599 reduced = util.sortdict()
596 for p in pointers:
600 for p in pointers:
597 reduced[p.oid()] = p
601 reduced[p.oid()] = p
598 return reduced.values()
602 return reduced.values()
599
603
600 def _verify(oid, content):
604 def _verify(oid, content):
601 realoid = node.hex(hashlib.sha256(content).digest())
605 realoid = node.hex(hashlib.sha256(content).digest())
602 if realoid != oid:
606 if realoid != oid:
603 raise LfsCorruptionError(_(b'detected corrupt lfs object: %s') % oid,
607 raise LfsCorruptionError(_(b'detected corrupt lfs object: %s') % oid,
604 hint=_(b'run hg verify'))
608 hint=_(b'run hg verify'))
605
609
606 def remote(repo, remote=None):
610 def remote(repo, remote=None):
607 """remotestore factory. return a store in _storemap depending on config
611 """remotestore factory. return a store in _storemap depending on config
608
612
609 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
613 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
610 infer the endpoint, based on the remote repository using the same path
614 infer the endpoint, based on the remote repository using the same path
611 adjustments as git. As an extension, 'http' is supported as well so that
615 adjustments as git. As an extension, 'http' is supported as well so that
612 ``hg serve`` works out of the box.
616 ``hg serve`` works out of the box.
613
617
614 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
618 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
615 """
619 """
616 lfsurl = repo.ui.config(b'lfs', b'url')
620 lfsurl = repo.ui.config(b'lfs', b'url')
617 url = util.url(lfsurl or '')
621 url = util.url(lfsurl or '')
618 if lfsurl is None:
622 if lfsurl is None:
619 if remote:
623 if remote:
620 path = remote
624 path = remote
621 elif util.safehasattr(repo, '_subtoppath'):
625 elif util.safehasattr(repo, '_subtoppath'):
622 # The pull command sets this during the optional update phase, which
626 # The pull command sets this during the optional update phase, which
623 # tells exactly where the pull originated, whether 'paths.default'
627 # tells exactly where the pull originated, whether 'paths.default'
624 # or explicit.
628 # or explicit.
625 path = repo._subtoppath
629 path = repo._subtoppath
626 else:
630 else:
627 # TODO: investigate 'paths.remote:lfsurl' style path customization,
631 # TODO: investigate 'paths.remote:lfsurl' style path customization,
628 # and fall back to inferring from 'paths.remote' if unspecified.
632 # and fall back to inferring from 'paths.remote' if unspecified.
629 path = repo.ui.config(b'paths', b'default') or b''
633 path = repo.ui.config(b'paths', b'default') or b''
630
634
631 defaulturl = util.url(path)
635 defaulturl = util.url(path)
632
636
633 # TODO: support local paths as well.
637 # TODO: support local paths as well.
634 # TODO: consider the ssh -> https transformation that git applies
638 # TODO: consider the ssh -> https transformation that git applies
635 if defaulturl.scheme in (b'http', b'https'):
639 if defaulturl.scheme in (b'http', b'https'):
636 if defaulturl.path and defaulturl.path[:-1] != b'/':
640 if defaulturl.path and defaulturl.path[:-1] != b'/':
637 defaulturl.path += b'/'
641 defaulturl.path += b'/'
638 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
642 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
639
643
640 url = util.url(bytes(defaulturl))
644 url = util.url(bytes(defaulturl))
641 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
645 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
642
646
643 scheme = url.scheme
647 scheme = url.scheme
644 if scheme not in _storemap:
648 if scheme not in _storemap:
645 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
649 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
646 return _storemap[scheme](repo, url)
650 return _storemap[scheme](repo, url)
647
651
648 class LfsRemoteError(error.StorageError):
652 class LfsRemoteError(error.StorageError):
649 pass
653 pass
650
654
651 class LfsCorruptionError(error.Abort):
655 class LfsCorruptionError(error.Abort):
652 """Raised when a corrupt blob is detected, aborting an operation
656 """Raised when a corrupt blob is detected, aborting an operation
653
657
654 It exists to allow specialized handling on the server side."""
658 It exists to allow specialized handling on the server side."""
General Comments 0
You need to be logged in to leave comments. Login now