##// END OF EJS Templates
lfs: add the 'Content-Type' header called out in the file transfer spec...
Matt Harbison -
r37260:3e293808 default
parent child Browse files
Show More
@@ -1,515 +1,516
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import json
11 import json
12 import os
12 import os
13 import re
13 import re
14 import socket
14 import socket
15
15
16 from mercurial.i18n import _
16 from mercurial.i18n import _
17
17
18 from mercurial import (
18 from mercurial import (
19 error,
19 error,
20 pathutil,
20 pathutil,
21 pycompat,
21 pycompat,
22 url as urlmod,
22 url as urlmod,
23 util,
23 util,
24 vfs as vfsmod,
24 vfs as vfsmod,
25 worker,
25 worker,
26 )
26 )
27
27
28 from ..largefiles import lfutil
28 from ..largefiles import lfutil
29
29
30 # 64 bytes for SHA256
30 # 64 bytes for SHA256
31 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
31 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
32
32
33 class lfsvfs(vfsmod.vfs):
33 class lfsvfs(vfsmod.vfs):
34 def join(self, path):
34 def join(self, path):
35 """split the path at first two characters, like: XX/XXXXX..."""
35 """split the path at first two characters, like: XX/XXXXX..."""
36 if not _lfsre.match(path):
36 if not _lfsre.match(path):
37 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 raise error.ProgrammingError('unexpected lfs path: %s' % path)
38 return super(lfsvfs, self).join(path[0:2], path[2:])
38 return super(lfsvfs, self).join(path[0:2], path[2:])
39
39
40 def walk(self, path=None, onerror=None):
40 def walk(self, path=None, onerror=None):
41 """Yield (dirpath, [], oids) tuple for blobs under path
41 """Yield (dirpath, [], oids) tuple for blobs under path
42
42
43 Oids only exist in the root of this vfs, so dirpath is always ''.
43 Oids only exist in the root of this vfs, so dirpath is always ''.
44 """
44 """
45 root = os.path.normpath(self.base)
45 root = os.path.normpath(self.base)
46 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 # when dirpath == root, dirpath[prefixlen:] becomes empty
47 # because len(dirpath) < prefixlen.
47 # because len(dirpath) < prefixlen.
48 prefixlen = len(pathutil.normasprefix(root))
48 prefixlen = len(pathutil.normasprefix(root))
49 oids = []
49 oids = []
50
50
51 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
52 onerror=onerror):
52 onerror=onerror):
53 dirpath = dirpath[prefixlen:]
53 dirpath = dirpath[prefixlen:]
54
54
55 # Silently skip unexpected files and directories
55 # Silently skip unexpected files and directories
56 if len(dirpath) == 2:
56 if len(dirpath) == 2:
57 oids.extend([dirpath + f for f in files
57 oids.extend([dirpath + f for f in files
58 if _lfsre.match(dirpath + f)])
58 if _lfsre.match(dirpath + f)])
59
59
60 yield ('', [], oids)
60 yield ('', [], oids)
61
61
62 class filewithprogress(object):
62 class filewithprogress(object):
63 """a file-like object that supports __len__ and read.
63 """a file-like object that supports __len__ and read.
64
64
65 Useful to provide progress information for how many bytes are read.
65 Useful to provide progress information for how many bytes are read.
66 """
66 """
67
67
68 def __init__(self, fp, callback):
68 def __init__(self, fp, callback):
69 self._fp = fp
69 self._fp = fp
70 self._callback = callback # func(readsize)
70 self._callback = callback # func(readsize)
71 fp.seek(0, os.SEEK_END)
71 fp.seek(0, os.SEEK_END)
72 self._len = fp.tell()
72 self._len = fp.tell()
73 fp.seek(0)
73 fp.seek(0)
74
74
75 def __len__(self):
75 def __len__(self):
76 return self._len
76 return self._len
77
77
78 def read(self, size):
78 def read(self, size):
79 if self._fp is None:
79 if self._fp is None:
80 return b''
80 return b''
81 data = self._fp.read(size)
81 data = self._fp.read(size)
82 if data:
82 if data:
83 if self._callback:
83 if self._callback:
84 self._callback(len(data))
84 self._callback(len(data))
85 else:
85 else:
86 self._fp.close()
86 self._fp.close()
87 self._fp = None
87 self._fp = None
88 return data
88 return data
89
89
90 class local(object):
90 class local(object):
91 """Local blobstore for large file contents.
91 """Local blobstore for large file contents.
92
92
93 This blobstore is used both as a cache and as a staging area for large blobs
93 This blobstore is used both as a cache and as a staging area for large blobs
94 to be uploaded to the remote blobstore.
94 to be uploaded to the remote blobstore.
95 """
95 """
96
96
97 def __init__(self, repo):
97 def __init__(self, repo):
98 fullpath = repo.svfs.join('lfs/objects')
98 fullpath = repo.svfs.join('lfs/objects')
99 self.vfs = lfsvfs(fullpath)
99 self.vfs = lfsvfs(fullpath)
100 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 usercache = lfutil._usercachedir(repo.ui, 'lfs')
101 self.cachevfs = lfsvfs(usercache)
101 self.cachevfs = lfsvfs(usercache)
102 self.ui = repo.ui
102 self.ui = repo.ui
103
103
104 def open(self, oid):
104 def open(self, oid):
105 """Open a read-only file descriptor to the named blob, in either the
105 """Open a read-only file descriptor to the named blob, in either the
106 usercache or the local store."""
106 usercache or the local store."""
107 # The usercache is the most likely place to hold the file. Commit will
107 # The usercache is the most likely place to hold the file. Commit will
108 # write to both it and the local store, as will anything that downloads
108 # write to both it and the local store, as will anything that downloads
109 # the blobs. However, things like clone without an update won't
109 # the blobs. However, things like clone without an update won't
110 # populate the local store. For an init + push of a local clone,
110 # populate the local store. For an init + push of a local clone,
111 # the usercache is the only place it _could_ be. If not present, the
111 # the usercache is the only place it _could_ be. If not present, the
112 # missing file msg here will indicate the local repo, not the usercache.
112 # missing file msg here will indicate the local repo, not the usercache.
113 if self.cachevfs.exists(oid):
113 if self.cachevfs.exists(oid):
114 return self.cachevfs(oid, 'rb')
114 return self.cachevfs(oid, 'rb')
115
115
116 return self.vfs(oid, 'rb')
116 return self.vfs(oid, 'rb')
117
117
118 def download(self, oid, src):
118 def download(self, oid, src):
119 """Read the blob from the remote source in chunks, verify the content,
119 """Read the blob from the remote source in chunks, verify the content,
120 and write to this local blobstore."""
120 and write to this local blobstore."""
121 sha256 = hashlib.sha256()
121 sha256 = hashlib.sha256()
122
122
123 with self.vfs(oid, 'wb', atomictemp=True) as fp:
123 with self.vfs(oid, 'wb', atomictemp=True) as fp:
124 for chunk in util.filechunkiter(src, size=1048576):
124 for chunk in util.filechunkiter(src, size=1048576):
125 fp.write(chunk)
125 fp.write(chunk)
126 sha256.update(chunk)
126 sha256.update(chunk)
127
127
128 realoid = sha256.hexdigest()
128 realoid = sha256.hexdigest()
129 if realoid != oid:
129 if realoid != oid:
130 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
130 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
131
131
132 # XXX: should we verify the content of the cache, and hardlink back to
132 # XXX: should we verify the content of the cache, and hardlink back to
133 # the local store on success, but truncate, write and link on failure?
133 # the local store on success, but truncate, write and link on failure?
134 if not self.cachevfs.exists(oid):
134 if not self.cachevfs.exists(oid):
135 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
135 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
136 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
136 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
137
137
138 def write(self, oid, data):
138 def write(self, oid, data):
139 """Write blob to local blobstore.
139 """Write blob to local blobstore.
140
140
141 This should only be called from the filelog during a commit or similar.
141 This should only be called from the filelog during a commit or similar.
142 As such, there is no need to verify the data. Imports from a remote
142 As such, there is no need to verify the data. Imports from a remote
143 store must use ``download()`` instead."""
143 store must use ``download()`` instead."""
144 with self.vfs(oid, 'wb', atomictemp=True) as fp:
144 with self.vfs(oid, 'wb', atomictemp=True) as fp:
145 fp.write(data)
145 fp.write(data)
146
146
147 # XXX: should we verify the content of the cache, and hardlink back to
147 # XXX: should we verify the content of the cache, and hardlink back to
148 # the local store on success, but truncate, write and link on failure?
148 # the local store on success, but truncate, write and link on failure?
149 if not self.cachevfs.exists(oid):
149 if not self.cachevfs.exists(oid):
150 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
150 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
151 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
151 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
152
152
153 def read(self, oid, verify=True):
153 def read(self, oid, verify=True):
154 """Read blob from local blobstore."""
154 """Read blob from local blobstore."""
155 if not self.vfs.exists(oid):
155 if not self.vfs.exists(oid):
156 blob = self._read(self.cachevfs, oid, verify)
156 blob = self._read(self.cachevfs, oid, verify)
157
157
158 # Even if revlog will verify the content, it needs to be verified
158 # Even if revlog will verify the content, it needs to be verified
159 # now before making the hardlink to avoid propagating corrupt blobs.
159 # now before making the hardlink to avoid propagating corrupt blobs.
160 # Don't abort if corruption is detected, because `hg verify` will
160 # Don't abort if corruption is detected, because `hg verify` will
161 # give more useful info about the corruption- simply don't add the
161 # give more useful info about the corruption- simply don't add the
162 # hardlink.
162 # hardlink.
163 if verify or hashlib.sha256(blob).hexdigest() == oid:
163 if verify or hashlib.sha256(blob).hexdigest() == oid:
164 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
164 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
165 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
165 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
166 else:
166 else:
167 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
167 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
168 blob = self._read(self.vfs, oid, verify)
168 blob = self._read(self.vfs, oid, verify)
169 return blob
169 return blob
170
170
171 def _read(self, vfs, oid, verify):
171 def _read(self, vfs, oid, verify):
172 """Read blob (after verifying) from the given store"""
172 """Read blob (after verifying) from the given store"""
173 blob = vfs.read(oid)
173 blob = vfs.read(oid)
174 if verify:
174 if verify:
175 _verify(oid, blob)
175 _verify(oid, blob)
176 return blob
176 return blob
177
177
178 def verify(self, oid):
178 def verify(self, oid):
179 """Indicate whether or not the hash of the underlying file matches its
179 """Indicate whether or not the hash of the underlying file matches its
180 name."""
180 name."""
181 sha256 = hashlib.sha256()
181 sha256 = hashlib.sha256()
182
182
183 with self.open(oid) as fp:
183 with self.open(oid) as fp:
184 for chunk in util.filechunkiter(fp, size=1048576):
184 for chunk in util.filechunkiter(fp, size=1048576):
185 sha256.update(chunk)
185 sha256.update(chunk)
186
186
187 return oid == sha256.hexdigest()
187 return oid == sha256.hexdigest()
188
188
189 def has(self, oid):
189 def has(self, oid):
190 """Returns True if the local blobstore contains the requested blob,
190 """Returns True if the local blobstore contains the requested blob,
191 False otherwise."""
191 False otherwise."""
192 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
192 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
193
193
194 class _gitlfsremote(object):
194 class _gitlfsremote(object):
195
195
196 def __init__(self, repo, url):
196 def __init__(self, repo, url):
197 ui = repo.ui
197 ui = repo.ui
198 self.ui = ui
198 self.ui = ui
199 baseurl, authinfo = url.authinfo()
199 baseurl, authinfo = url.authinfo()
200 self.baseurl = baseurl.rstrip('/')
200 self.baseurl = baseurl.rstrip('/')
201 useragent = repo.ui.config('experimental', 'lfs.user-agent')
201 useragent = repo.ui.config('experimental', 'lfs.user-agent')
202 if not useragent:
202 if not useragent:
203 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
203 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
204 self.urlopener = urlmod.opener(ui, authinfo, useragent)
204 self.urlopener = urlmod.opener(ui, authinfo, useragent)
205 self.retry = ui.configint('lfs', 'retry')
205 self.retry = ui.configint('lfs', 'retry')
206
206
207 def writebatch(self, pointers, fromstore):
207 def writebatch(self, pointers, fromstore):
208 """Batch upload from local to remote blobstore."""
208 """Batch upload from local to remote blobstore."""
209 self._batch(_deduplicate(pointers), fromstore, 'upload')
209 self._batch(_deduplicate(pointers), fromstore, 'upload')
210
210
211 def readbatch(self, pointers, tostore):
211 def readbatch(self, pointers, tostore):
212 """Batch download from remote to local blostore."""
212 """Batch download from remote to local blostore."""
213 self._batch(_deduplicate(pointers), tostore, 'download')
213 self._batch(_deduplicate(pointers), tostore, 'download')
214
214
215 def _batchrequest(self, pointers, action):
215 def _batchrequest(self, pointers, action):
216 """Get metadata about objects pointed by pointers for given action
216 """Get metadata about objects pointed by pointers for given action
217
217
218 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
218 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
219 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
219 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
220 """
220 """
221 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
221 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
222 requestdata = json.dumps({
222 requestdata = json.dumps({
223 'objects': objects,
223 'objects': objects,
224 'operation': action,
224 'operation': action,
225 })
225 })
226 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
226 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
227 data=requestdata)
227 data=requestdata)
228 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
228 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
229 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
229 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
230 try:
230 try:
231 rsp = self.urlopener.open(batchreq)
231 rsp = self.urlopener.open(batchreq)
232 rawjson = rsp.read()
232 rawjson = rsp.read()
233 except util.urlerr.httperror as ex:
233 except util.urlerr.httperror as ex:
234 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
234 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
235 % (ex, action))
235 % (ex, action))
236 try:
236 try:
237 response = json.loads(rawjson)
237 response = json.loads(rawjson)
238 except ValueError:
238 except ValueError:
239 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
239 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
240 % rawjson)
240 % rawjson)
241
241
242 if self.ui.debugflag:
242 if self.ui.debugflag:
243 self.ui.debug('Status: %d\n' % rsp.status)
243 self.ui.debug('Status: %d\n' % rsp.status)
244 # lfs-test-server and hg serve return headers in different order
244 # lfs-test-server and hg serve return headers in different order
245 self.ui.debug('%s\n'
245 self.ui.debug('%s\n'
246 % '\n'.join(sorted(str(rsp.info()).splitlines())))
246 % '\n'.join(sorted(str(rsp.info()).splitlines())))
247
247
248 if 'objects' in response:
248 if 'objects' in response:
249 response['objects'] = sorted(response['objects'],
249 response['objects'] = sorted(response['objects'],
250 key=lambda p: p['oid'])
250 key=lambda p: p['oid'])
251 self.ui.debug('%s\n'
251 self.ui.debug('%s\n'
252 % json.dumps(response, indent=2,
252 % json.dumps(response, indent=2,
253 separators=('', ': '), sort_keys=True))
253 separators=('', ': '), sort_keys=True))
254
254
255 return response
255 return response
256
256
257 def _checkforservererror(self, pointers, responses, action):
257 def _checkforservererror(self, pointers, responses, action):
258 """Scans errors from objects
258 """Scans errors from objects
259
259
260 Raises LfsRemoteError if any objects have an error"""
260 Raises LfsRemoteError if any objects have an error"""
261 for response in responses:
261 for response in responses:
262 # The server should return 404 when objects cannot be found. Some
262 # The server should return 404 when objects cannot be found. Some
263 # server implementation (ex. lfs-test-server) does not set "error"
263 # server implementation (ex. lfs-test-server) does not set "error"
264 # but just removes "download" from "actions". Treat that case
264 # but just removes "download" from "actions". Treat that case
265 # as the same as 404 error.
265 # as the same as 404 error.
266 if 'error' not in response:
266 if 'error' not in response:
267 if (action == 'download'
267 if (action == 'download'
268 and action not in response.get('actions', [])):
268 and action not in response.get('actions', [])):
269 code = 404
269 code = 404
270 else:
270 else:
271 continue
271 continue
272 else:
272 else:
273 # An error dict without a code doesn't make much sense, so
273 # An error dict without a code doesn't make much sense, so
274 # treat as a server error.
274 # treat as a server error.
275 code = response.get('error').get('code', 500)
275 code = response.get('error').get('code', 500)
276
276
277 ptrmap = {p.oid(): p for p in pointers}
277 ptrmap = {p.oid(): p for p in pointers}
278 p = ptrmap.get(response['oid'], None)
278 p = ptrmap.get(response['oid'], None)
279 if p:
279 if p:
280 filename = getattr(p, 'filename', 'unknown')
280 filename = getattr(p, 'filename', 'unknown')
281 errors = {
281 errors = {
282 404: 'The object does not exist',
282 404: 'The object does not exist',
283 410: 'The object was removed by the owner',
283 410: 'The object was removed by the owner',
284 422: 'Validation error',
284 422: 'Validation error',
285 500: 'Internal server error',
285 500: 'Internal server error',
286 }
286 }
287 msg = errors.get(code, 'status code %d' % code)
287 msg = errors.get(code, 'status code %d' % code)
288 raise LfsRemoteError(_('LFS server error for "%s": %s')
288 raise LfsRemoteError(_('LFS server error for "%s": %s')
289 % (filename, msg))
289 % (filename, msg))
290 else:
290 else:
291 raise LfsRemoteError(
291 raise LfsRemoteError(
292 _('LFS server error. Unsolicited response for oid %s')
292 _('LFS server error. Unsolicited response for oid %s')
293 % response['oid'])
293 % response['oid'])
294
294
295 def _extractobjects(self, response, pointers, action):
295 def _extractobjects(self, response, pointers, action):
296 """extract objects from response of the batch API
296 """extract objects from response of the batch API
297
297
298 response: parsed JSON object returned by batch API
298 response: parsed JSON object returned by batch API
299 return response['objects'] filtered by action
299 return response['objects'] filtered by action
300 raise if any object has an error
300 raise if any object has an error
301 """
301 """
302 # Scan errors from objects - fail early
302 # Scan errors from objects - fail early
303 objects = response.get('objects', [])
303 objects = response.get('objects', [])
304 self._checkforservererror(pointers, objects, action)
304 self._checkforservererror(pointers, objects, action)
305
305
306 # Filter objects with given action. Practically, this skips uploading
306 # Filter objects with given action. Practically, this skips uploading
307 # objects which exist in the server.
307 # objects which exist in the server.
308 filteredobjects = [o for o in objects if action in o.get('actions', [])]
308 filteredobjects = [o for o in objects if action in o.get('actions', [])]
309
309
310 return filteredobjects
310 return filteredobjects
311
311
312 def _basictransfer(self, obj, action, localstore):
312 def _basictransfer(self, obj, action, localstore):
313 """Download or upload a single object using basic transfer protocol
313 """Download or upload a single object using basic transfer protocol
314
314
315 obj: dict, an object description returned by batch API
315 obj: dict, an object description returned by batch API
316 action: string, one of ['upload', 'download']
316 action: string, one of ['upload', 'download']
317 localstore: blobstore.local
317 localstore: blobstore.local
318
318
319 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
319 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
320 basic-transfers.md
320 basic-transfers.md
321 """
321 """
322 oid = pycompat.bytestr(obj['oid'])
322 oid = pycompat.bytestr(obj['oid'])
323
323
324 href = pycompat.bytestr(obj['actions'][action].get('href'))
324 href = pycompat.bytestr(obj['actions'][action].get('href'))
325 headers = obj['actions'][action].get('header', {}).items()
325 headers = obj['actions'][action].get('header', {}).items()
326
326
327 request = util.urlreq.request(href)
327 request = util.urlreq.request(href)
328 if action == 'upload':
328 if action == 'upload':
329 # If uploading blobs, read data from local blobstore.
329 # If uploading blobs, read data from local blobstore.
330 if not localstore.verify(oid):
330 if not localstore.verify(oid):
331 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
331 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
332 hint=_('run hg verify'))
332 hint=_('run hg verify'))
333 request.data = filewithprogress(localstore.open(oid), None)
333 request.data = filewithprogress(localstore.open(oid), None)
334 request.get_method = lambda: 'PUT'
334 request.get_method = lambda: 'PUT'
335 request.add_header('Content-Type', 'application/octet-stream')
335
336
336 for k, v in headers:
337 for k, v in headers:
337 request.add_header(k, v)
338 request.add_header(k, v)
338
339
339 response = b''
340 response = b''
340 try:
341 try:
341 req = self.urlopener.open(request)
342 req = self.urlopener.open(request)
342
343
343 if self.ui.debugflag:
344 if self.ui.debugflag:
344 self.ui.debug('Status: %d\n' % req.status)
345 self.ui.debug('Status: %d\n' % req.status)
345 # lfs-test-server and hg serve return headers in different order
346 # lfs-test-server and hg serve return headers in different order
346 self.ui.debug('%s\n'
347 self.ui.debug('%s\n'
347 % '\n'.join(sorted(str(req.info()).splitlines())))
348 % '\n'.join(sorted(str(req.info()).splitlines())))
348
349
349 if action == 'download':
350 if action == 'download':
350 # If downloading blobs, store downloaded data to local blobstore
351 # If downloading blobs, store downloaded data to local blobstore
351 localstore.download(oid, req)
352 localstore.download(oid, req)
352 else:
353 else:
353 while True:
354 while True:
354 data = req.read(1048576)
355 data = req.read(1048576)
355 if not data:
356 if not data:
356 break
357 break
357 response += data
358 response += data
358 if response:
359 if response:
359 self.ui.debug('lfs %s response: %s' % (action, response))
360 self.ui.debug('lfs %s response: %s' % (action, response))
360 except util.urlerr.httperror as ex:
361 except util.urlerr.httperror as ex:
361 if self.ui.debugflag:
362 if self.ui.debugflag:
362 self.ui.debug('%s: %s\n' % (oid, ex.read()))
363 self.ui.debug('%s: %s\n' % (oid, ex.read()))
363 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
364 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
364 % (ex, oid, action))
365 % (ex, oid, action))
365
366
366 def _batch(self, pointers, localstore, action):
367 def _batch(self, pointers, localstore, action):
367 if action not in ['upload', 'download']:
368 if action not in ['upload', 'download']:
368 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
369 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
369
370
370 response = self._batchrequest(pointers, action)
371 response = self._batchrequest(pointers, action)
371 objects = self._extractobjects(response, pointers, action)
372 objects = self._extractobjects(response, pointers, action)
372 total = sum(x.get('size', 0) for x in objects)
373 total = sum(x.get('size', 0) for x in objects)
373 sizes = {}
374 sizes = {}
374 for obj in objects:
375 for obj in objects:
375 sizes[obj.get('oid')] = obj.get('size', 0)
376 sizes[obj.get('oid')] = obj.get('size', 0)
376 topic = {'upload': _('lfs uploading'),
377 topic = {'upload': _('lfs uploading'),
377 'download': _('lfs downloading')}[action]
378 'download': _('lfs downloading')}[action]
378 if len(objects) > 1:
379 if len(objects) > 1:
379 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
380 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
380 % (len(objects), util.bytecount(total)))
381 % (len(objects), util.bytecount(total)))
381 self.ui.progress(topic, 0, total=total)
382 self.ui.progress(topic, 0, total=total)
382 def transfer(chunk):
383 def transfer(chunk):
383 for obj in chunk:
384 for obj in chunk:
384 objsize = obj.get('size', 0)
385 objsize = obj.get('size', 0)
385 if self.ui.verbose:
386 if self.ui.verbose:
386 if action == 'download':
387 if action == 'download':
387 msg = _('lfs: downloading %s (%s)\n')
388 msg = _('lfs: downloading %s (%s)\n')
388 elif action == 'upload':
389 elif action == 'upload':
389 msg = _('lfs: uploading %s (%s)\n')
390 msg = _('lfs: uploading %s (%s)\n')
390 self.ui.note(msg % (obj.get('oid'),
391 self.ui.note(msg % (obj.get('oid'),
391 util.bytecount(objsize)))
392 util.bytecount(objsize)))
392 retry = self.retry
393 retry = self.retry
393 while True:
394 while True:
394 try:
395 try:
395 self._basictransfer(obj, action, localstore)
396 self._basictransfer(obj, action, localstore)
396 yield 1, obj.get('oid')
397 yield 1, obj.get('oid')
397 break
398 break
398 except socket.error as ex:
399 except socket.error as ex:
399 if retry > 0:
400 if retry > 0:
400 self.ui.note(
401 self.ui.note(
401 _('lfs: failed: %r (remaining retry %d)\n')
402 _('lfs: failed: %r (remaining retry %d)\n')
402 % (ex, retry))
403 % (ex, retry))
403 retry -= 1
404 retry -= 1
404 continue
405 continue
405 raise
406 raise
406
407
407 # Until https multiplexing gets sorted out
408 # Until https multiplexing gets sorted out
408 if self.ui.configbool('experimental', 'lfs.worker-enable'):
409 if self.ui.configbool('experimental', 'lfs.worker-enable'):
409 oids = worker.worker(self.ui, 0.1, transfer, (),
410 oids = worker.worker(self.ui, 0.1, transfer, (),
410 sorted(objects, key=lambda o: o.get('oid')))
411 sorted(objects, key=lambda o: o.get('oid')))
411 else:
412 else:
412 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
413 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
413
414
414 processed = 0
415 processed = 0
415 blobs = 0
416 blobs = 0
416 for _one, oid in oids:
417 for _one, oid in oids:
417 processed += sizes[oid]
418 processed += sizes[oid]
418 blobs += 1
419 blobs += 1
419 self.ui.progress(topic, processed, total=total)
420 self.ui.progress(topic, processed, total=total)
420 self.ui.note(_('lfs: processed: %s\n') % oid)
421 self.ui.note(_('lfs: processed: %s\n') % oid)
421 self.ui.progress(topic, pos=None, total=total)
422 self.ui.progress(topic, pos=None, total=total)
422
423
423 if blobs > 0:
424 if blobs > 0:
424 if action == 'upload':
425 if action == 'upload':
425 self.ui.status(_('lfs: uploaded %d files (%s)\n')
426 self.ui.status(_('lfs: uploaded %d files (%s)\n')
426 % (blobs, util.bytecount(processed)))
427 % (blobs, util.bytecount(processed)))
427 # TODO: coalesce the download requests, and comment this in
428 # TODO: coalesce the download requests, and comment this in
428 #elif action == 'download':
429 #elif action == 'download':
429 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
430 # self.ui.status(_('lfs: downloaded %d files (%s)\n')
430 # % (blobs, util.bytecount(processed)))
431 # % (blobs, util.bytecount(processed)))
431
432
432 def __del__(self):
433 def __del__(self):
433 # copied from mercurial/httppeer.py
434 # copied from mercurial/httppeer.py
434 urlopener = getattr(self, 'urlopener', None)
435 urlopener = getattr(self, 'urlopener', None)
435 if urlopener:
436 if urlopener:
436 for h in urlopener.handlers:
437 for h in urlopener.handlers:
437 h.close()
438 h.close()
438 getattr(h, "close_all", lambda : None)()
439 getattr(h, "close_all", lambda : None)()
439
440
440 class _dummyremote(object):
441 class _dummyremote(object):
441 """Dummy store storing blobs to temp directory."""
442 """Dummy store storing blobs to temp directory."""
442
443
443 def __init__(self, repo, url):
444 def __init__(self, repo, url):
444 fullpath = repo.vfs.join('lfs', url.path)
445 fullpath = repo.vfs.join('lfs', url.path)
445 self.vfs = lfsvfs(fullpath)
446 self.vfs = lfsvfs(fullpath)
446
447
447 def writebatch(self, pointers, fromstore):
448 def writebatch(self, pointers, fromstore):
448 for p in _deduplicate(pointers):
449 for p in _deduplicate(pointers):
449 content = fromstore.read(p.oid(), verify=True)
450 content = fromstore.read(p.oid(), verify=True)
450 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
451 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
451 fp.write(content)
452 fp.write(content)
452
453
453 def readbatch(self, pointers, tostore):
454 def readbatch(self, pointers, tostore):
454 for p in _deduplicate(pointers):
455 for p in _deduplicate(pointers):
455 with self.vfs(p.oid(), 'rb') as fp:
456 with self.vfs(p.oid(), 'rb') as fp:
456 tostore.download(p.oid(), fp)
457 tostore.download(p.oid(), fp)
457
458
458 class _nullremote(object):
459 class _nullremote(object):
459 """Null store storing blobs to /dev/null."""
460 """Null store storing blobs to /dev/null."""
460
461
461 def __init__(self, repo, url):
462 def __init__(self, repo, url):
462 pass
463 pass
463
464
464 def writebatch(self, pointers, fromstore):
465 def writebatch(self, pointers, fromstore):
465 pass
466 pass
466
467
467 def readbatch(self, pointers, tostore):
468 def readbatch(self, pointers, tostore):
468 pass
469 pass
469
470
470 class _promptremote(object):
471 class _promptremote(object):
471 """Prompt user to set lfs.url when accessed."""
472 """Prompt user to set lfs.url when accessed."""
472
473
473 def __init__(self, repo, url):
474 def __init__(self, repo, url):
474 pass
475 pass
475
476
476 def writebatch(self, pointers, fromstore, ui=None):
477 def writebatch(self, pointers, fromstore, ui=None):
477 self._prompt()
478 self._prompt()
478
479
479 def readbatch(self, pointers, tostore, ui=None):
480 def readbatch(self, pointers, tostore, ui=None):
480 self._prompt()
481 self._prompt()
481
482
482 def _prompt(self):
483 def _prompt(self):
483 raise error.Abort(_('lfs.url needs to be configured'))
484 raise error.Abort(_('lfs.url needs to be configured'))
484
485
485 _storemap = {
486 _storemap = {
486 'https': _gitlfsremote,
487 'https': _gitlfsremote,
487 'http': _gitlfsremote,
488 'http': _gitlfsremote,
488 'file': _dummyremote,
489 'file': _dummyremote,
489 'null': _nullremote,
490 'null': _nullremote,
490 None: _promptremote,
491 None: _promptremote,
491 }
492 }
492
493
493 def _deduplicate(pointers):
494 def _deduplicate(pointers):
494 """Remove any duplicate oids that exist in the list"""
495 """Remove any duplicate oids that exist in the list"""
495 reduced = util.sortdict()
496 reduced = util.sortdict()
496 for p in pointers:
497 for p in pointers:
497 reduced[p.oid()] = p
498 reduced[p.oid()] = p
498 return reduced.values()
499 return reduced.values()
499
500
500 def _verify(oid, content):
501 def _verify(oid, content):
501 realoid = hashlib.sha256(content).hexdigest()
502 realoid = hashlib.sha256(content).hexdigest()
502 if realoid != oid:
503 if realoid != oid:
503 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
504 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
504 hint=_('run hg verify'))
505 hint=_('run hg verify'))
505
506
506 def remote(repo):
507 def remote(repo):
507 """remotestore factory. return a store in _storemap depending on config"""
508 """remotestore factory. return a store in _storemap depending on config"""
508 url = util.url(repo.ui.config('lfs', 'url') or '')
509 url = util.url(repo.ui.config('lfs', 'url') or '')
509 scheme = url.scheme
510 scheme = url.scheme
510 if scheme not in _storemap:
511 if scheme not in _storemap:
511 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
512 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
512 return _storemap[scheme](repo, url)
513 return _storemap[scheme](repo, url)
513
514
514 class LfsRemoteError(error.RevlogError):
515 class LfsRemoteError(error.RevlogError):
515 pass
516 pass
General Comments 0
You need to be logged in to leave comments. Login now