##// END OF EJS Templates
lfs: separate a debug message from the subsequent abort message
Matt Harbison -
r35771:069df0b9 default
parent child Browse files
Show More
@@ -1,463 +1,463 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import json
11 import json
12 import os
12 import os
13 import re
13 import re
14 import socket
14 import socket
15
15
16 from mercurial.i18n import _
16 from mercurial.i18n import _
17
17
18 from mercurial import (
18 from mercurial import (
19 error,
19 error,
20 pathutil,
20 pathutil,
21 url as urlmod,
21 url as urlmod,
22 util,
22 util,
23 vfs as vfsmod,
23 vfs as vfsmod,
24 worker,
24 worker,
25 )
25 )
26
26
27 from ..largefiles import lfutil
27 from ..largefiles import lfutil
28
28
29 # 64 bytes for SHA256
29 # 64 bytes for SHA256
30 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
30 _lfsre = re.compile(r'\A[a-f0-9]{64}\Z')
31
31
32 class lfsvfs(vfsmod.vfs):
32 class lfsvfs(vfsmod.vfs):
33 def join(self, path):
33 def join(self, path):
34 """split the path at first two characters, like: XX/XXXXX..."""
34 """split the path at first two characters, like: XX/XXXXX..."""
35 if not _lfsre.match(path):
35 if not _lfsre.match(path):
36 raise error.ProgrammingError('unexpected lfs path: %s' % path)
36 raise error.ProgrammingError('unexpected lfs path: %s' % path)
37 return super(lfsvfs, self).join(path[0:2], path[2:])
37 return super(lfsvfs, self).join(path[0:2], path[2:])
38
38
39 def walk(self, path=None, onerror=None):
39 def walk(self, path=None, onerror=None):
40 """Yield (dirpath, [], oids) tuple for blobs under path
40 """Yield (dirpath, [], oids) tuple for blobs under path
41
41
42 Oids only exist in the root of this vfs, so dirpath is always ''.
42 Oids only exist in the root of this vfs, so dirpath is always ''.
43 """
43 """
44 root = os.path.normpath(self.base)
44 root = os.path.normpath(self.base)
45 # when dirpath == root, dirpath[prefixlen:] becomes empty
45 # when dirpath == root, dirpath[prefixlen:] becomes empty
46 # because len(dirpath) < prefixlen.
46 # because len(dirpath) < prefixlen.
47 prefixlen = len(pathutil.normasprefix(root))
47 prefixlen = len(pathutil.normasprefix(root))
48 oids = []
48 oids = []
49
49
50 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
50 for dirpath, dirs, files in os.walk(self.reljoin(self.base, path or ''),
51 onerror=onerror):
51 onerror=onerror):
52 dirpath = dirpath[prefixlen:]
52 dirpath = dirpath[prefixlen:]
53
53
54 # Silently skip unexpected files and directories
54 # Silently skip unexpected files and directories
55 if len(dirpath) == 2:
55 if len(dirpath) == 2:
56 oids.extend([dirpath + f for f in files
56 oids.extend([dirpath + f for f in files
57 if _lfsre.match(dirpath + f)])
57 if _lfsre.match(dirpath + f)])
58
58
59 yield ('', [], oids)
59 yield ('', [], oids)
60
60
61 class filewithprogress(object):
61 class filewithprogress(object):
62 """a file-like object that supports __len__ and read.
62 """a file-like object that supports __len__ and read.
63
63
64 Useful to provide progress information for how many bytes are read.
64 Useful to provide progress information for how many bytes are read.
65 """
65 """
66
66
67 def __init__(self, fp, callback):
67 def __init__(self, fp, callback):
68 self._fp = fp
68 self._fp = fp
69 self._callback = callback # func(readsize)
69 self._callback = callback # func(readsize)
70 fp.seek(0, os.SEEK_END)
70 fp.seek(0, os.SEEK_END)
71 self._len = fp.tell()
71 self._len = fp.tell()
72 fp.seek(0)
72 fp.seek(0)
73
73
74 def __len__(self):
74 def __len__(self):
75 return self._len
75 return self._len
76
76
77 def read(self, size):
77 def read(self, size):
78 if self._fp is None:
78 if self._fp is None:
79 return b''
79 return b''
80 data = self._fp.read(size)
80 data = self._fp.read(size)
81 if data:
81 if data:
82 if self._callback:
82 if self._callback:
83 self._callback(len(data))
83 self._callback(len(data))
84 else:
84 else:
85 self._fp.close()
85 self._fp.close()
86 self._fp = None
86 self._fp = None
87 return data
87 return data
88
88
89 class local(object):
89 class local(object):
90 """Local blobstore for large file contents.
90 """Local blobstore for large file contents.
91
91
92 This blobstore is used both as a cache and as a staging area for large blobs
92 This blobstore is used both as a cache and as a staging area for large blobs
93 to be uploaded to the remote blobstore.
93 to be uploaded to the remote blobstore.
94 """
94 """
95
95
96 def __init__(self, repo):
96 def __init__(self, repo):
97 fullpath = repo.svfs.join('lfs/objects')
97 fullpath = repo.svfs.join('lfs/objects')
98 self.vfs = lfsvfs(fullpath)
98 self.vfs = lfsvfs(fullpath)
99 usercache = lfutil._usercachedir(repo.ui, 'lfs')
99 usercache = lfutil._usercachedir(repo.ui, 'lfs')
100 self.cachevfs = lfsvfs(usercache)
100 self.cachevfs = lfsvfs(usercache)
101 self.ui = repo.ui
101 self.ui = repo.ui
102
102
103 def open(self, oid):
103 def open(self, oid):
104 """Open a read-only file descriptor to the named blob, in either the
104 """Open a read-only file descriptor to the named blob, in either the
105 usercache or the local store."""
105 usercache or the local store."""
106 # The usercache is the most likely place to hold the file. Commit will
106 # The usercache is the most likely place to hold the file. Commit will
107 # write to both it and the local store, as will anything that downloads
107 # write to both it and the local store, as will anything that downloads
108 # the blobs. However, things like clone without an update won't
108 # the blobs. However, things like clone without an update won't
109 # populate the local store. For an init + push of a local clone,
109 # populate the local store. For an init + push of a local clone,
110 # the usercache is the only place it _could_ be. If not present, the
110 # the usercache is the only place it _could_ be. If not present, the
111 # missing file msg here will indicate the local repo, not the usercache.
111 # missing file msg here will indicate the local repo, not the usercache.
112 if self.cachevfs.exists(oid):
112 if self.cachevfs.exists(oid):
113 return self.cachevfs(oid, 'rb')
113 return self.cachevfs(oid, 'rb')
114
114
115 return self.vfs(oid, 'rb')
115 return self.vfs(oid, 'rb')
116
116
117 def download(self, oid, src):
117 def download(self, oid, src):
118 """Read the blob from the remote source in chunks, verify the content,
118 """Read the blob from the remote source in chunks, verify the content,
119 and write to this local blobstore."""
119 and write to this local blobstore."""
120 sha256 = hashlib.sha256()
120 sha256 = hashlib.sha256()
121
121
122 with self.vfs(oid, 'wb', atomictemp=True) as fp:
122 with self.vfs(oid, 'wb', atomictemp=True) as fp:
123 for chunk in util.filechunkiter(src, size=1048576):
123 for chunk in util.filechunkiter(src, size=1048576):
124 fp.write(chunk)
124 fp.write(chunk)
125 sha256.update(chunk)
125 sha256.update(chunk)
126
126
127 realoid = sha256.hexdigest()
127 realoid = sha256.hexdigest()
128 if realoid != oid:
128 if realoid != oid:
129 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
129 raise error.Abort(_('corrupt remote lfs object: %s') % oid)
130
130
131 # XXX: should we verify the content of the cache, and hardlink back to
131 # XXX: should we verify the content of the cache, and hardlink back to
132 # the local store on success, but truncate, write and link on failure?
132 # the local store on success, but truncate, write and link on failure?
133 if not self.cachevfs.exists(oid):
133 if not self.cachevfs.exists(oid):
134 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
134 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
135 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
135 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
136
136
137 def write(self, oid, data):
137 def write(self, oid, data):
138 """Write blob to local blobstore.
138 """Write blob to local blobstore.
139
139
140 This should only be called from the filelog during a commit or similar.
140 This should only be called from the filelog during a commit or similar.
141 As such, there is no need to verify the data. Imports from a remote
141 As such, there is no need to verify the data. Imports from a remote
142 store must use ``download()`` instead."""
142 store must use ``download()`` instead."""
143 with self.vfs(oid, 'wb', atomictemp=True) as fp:
143 with self.vfs(oid, 'wb', atomictemp=True) as fp:
144 fp.write(data)
144 fp.write(data)
145
145
146 # XXX: should we verify the content of the cache, and hardlink back to
146 # XXX: should we verify the content of the cache, and hardlink back to
147 # the local store on success, but truncate, write and link on failure?
147 # the local store on success, but truncate, write and link on failure?
148 if not self.cachevfs.exists(oid):
148 if not self.cachevfs.exists(oid):
149 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
149 self.ui.note(_('lfs: adding %s to the usercache\n') % oid)
150 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
150 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
151
151
152 def read(self, oid, verify=True):
152 def read(self, oid, verify=True):
153 """Read blob from local blobstore."""
153 """Read blob from local blobstore."""
154 if not self.vfs.exists(oid):
154 if not self.vfs.exists(oid):
155 blob = self._read(self.cachevfs, oid, verify)
155 blob = self._read(self.cachevfs, oid, verify)
156
156
157 # Even if revlog will verify the content, it needs to be verified
157 # Even if revlog will verify the content, it needs to be verified
158 # now before making the hardlink to avoid propagating corrupt blobs.
158 # now before making the hardlink to avoid propagating corrupt blobs.
159 # Don't abort if corruption is detected, because `hg verify` will
159 # Don't abort if corruption is detected, because `hg verify` will
160 # give more useful info about the corruption- simply don't add the
160 # give more useful info about the corruption- simply don't add the
161 # hardlink.
161 # hardlink.
162 if verify or hashlib.sha256(blob).hexdigest() == oid:
162 if verify or hashlib.sha256(blob).hexdigest() == oid:
163 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
163 self.ui.note(_('lfs: found %s in the usercache\n') % oid)
164 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
164 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
165 else:
165 else:
166 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
166 self.ui.note(_('lfs: found %s in the local lfs store\n') % oid)
167 blob = self._read(self.vfs, oid, verify)
167 blob = self._read(self.vfs, oid, verify)
168 return blob
168 return blob
169
169
170 def _read(self, vfs, oid, verify):
170 def _read(self, vfs, oid, verify):
171 """Read blob (after verifying) from the given store"""
171 """Read blob (after verifying) from the given store"""
172 blob = vfs.read(oid)
172 blob = vfs.read(oid)
173 if verify:
173 if verify:
174 _verify(oid, blob)
174 _verify(oid, blob)
175 return blob
175 return blob
176
176
177 def has(self, oid):
177 def has(self, oid):
178 """Returns True if the local blobstore contains the requested blob,
178 """Returns True if the local blobstore contains the requested blob,
179 False otherwise."""
179 False otherwise."""
180 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
180 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
181
181
182 class _gitlfsremote(object):
182 class _gitlfsremote(object):
183
183
184 def __init__(self, repo, url):
184 def __init__(self, repo, url):
185 ui = repo.ui
185 ui = repo.ui
186 self.ui = ui
186 self.ui = ui
187 baseurl, authinfo = url.authinfo()
187 baseurl, authinfo = url.authinfo()
188 self.baseurl = baseurl.rstrip('/')
188 self.baseurl = baseurl.rstrip('/')
189 useragent = repo.ui.config('experimental', 'lfs.user-agent')
189 useragent = repo.ui.config('experimental', 'lfs.user-agent')
190 if not useragent:
190 if not useragent:
191 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
191 useragent = 'git-lfs/2.3.4 (Mercurial %s)' % util.version()
192 self.urlopener = urlmod.opener(ui, authinfo, useragent)
192 self.urlopener = urlmod.opener(ui, authinfo, useragent)
193 self.retry = ui.configint('lfs', 'retry')
193 self.retry = ui.configint('lfs', 'retry')
194
194
195 def writebatch(self, pointers, fromstore):
195 def writebatch(self, pointers, fromstore):
196 """Batch upload from local to remote blobstore."""
196 """Batch upload from local to remote blobstore."""
197 self._batch(pointers, fromstore, 'upload')
197 self._batch(pointers, fromstore, 'upload')
198
198
199 def readbatch(self, pointers, tostore):
199 def readbatch(self, pointers, tostore):
200 """Batch download from remote to local blostore."""
200 """Batch download from remote to local blostore."""
201 self._batch(pointers, tostore, 'download')
201 self._batch(pointers, tostore, 'download')
202
202
203 def _batchrequest(self, pointers, action):
203 def _batchrequest(self, pointers, action):
204 """Get metadata about objects pointed by pointers for given action
204 """Get metadata about objects pointed by pointers for given action
205
205
206 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
206 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
207 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
207 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
208 """
208 """
209 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
209 objects = [{'oid': p.oid(), 'size': p.size()} for p in pointers]
210 requestdata = json.dumps({
210 requestdata = json.dumps({
211 'objects': objects,
211 'objects': objects,
212 'operation': action,
212 'operation': action,
213 })
213 })
214 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
214 batchreq = util.urlreq.request('%s/objects/batch' % self.baseurl,
215 data=requestdata)
215 data=requestdata)
216 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
216 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
217 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
217 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
218 try:
218 try:
219 rawjson = self.urlopener.open(batchreq).read()
219 rawjson = self.urlopener.open(batchreq).read()
220 except util.urlerr.httperror as ex:
220 except util.urlerr.httperror as ex:
221 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
221 raise LfsRemoteError(_('LFS HTTP error: %s (action=%s)')
222 % (ex, action))
222 % (ex, action))
223 try:
223 try:
224 response = json.loads(rawjson)
224 response = json.loads(rawjson)
225 except ValueError:
225 except ValueError:
226 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
226 raise LfsRemoteError(_('LFS server returns invalid JSON: %s')
227 % rawjson)
227 % rawjson)
228 return response
228 return response
229
229
230 def _checkforservererror(self, pointers, responses, action):
230 def _checkforservererror(self, pointers, responses, action):
231 """Scans errors from objects
231 """Scans errors from objects
232
232
233 Raises LfsRemoteError if any objects have an error"""
233 Raises LfsRemoteError if any objects have an error"""
234 for response in responses:
234 for response in responses:
235 # The server should return 404 when objects cannot be found. Some
235 # The server should return 404 when objects cannot be found. Some
236 # server implementation (ex. lfs-test-server) does not set "error"
236 # server implementation (ex. lfs-test-server) does not set "error"
237 # but just removes "download" from "actions". Treat that case
237 # but just removes "download" from "actions". Treat that case
238 # as the same as 404 error.
238 # as the same as 404 error.
239 notfound = (response.get('error', {}).get('code') == 404
239 notfound = (response.get('error', {}).get('code') == 404
240 or (action == 'download'
240 or (action == 'download'
241 and action not in response.get('actions', [])))
241 and action not in response.get('actions', [])))
242 if notfound:
242 if notfound:
243 ptrmap = {p.oid(): p for p in pointers}
243 ptrmap = {p.oid(): p for p in pointers}
244 p = ptrmap.get(response['oid'], None)
244 p = ptrmap.get(response['oid'], None)
245 if p:
245 if p:
246 filename = getattr(p, 'filename', 'unknown')
246 filename = getattr(p, 'filename', 'unknown')
247 raise LfsRemoteError(
247 raise LfsRemoteError(
248 _(('LFS server error. Remote object '
248 _(('LFS server error. Remote object '
249 'for "%s" not found: %r')) % (filename, response))
249 'for "%s" not found: %r')) % (filename, response))
250 else:
250 else:
251 raise LfsRemoteError(
251 raise LfsRemoteError(
252 _('LFS server error. Unsolicited response for oid %s')
252 _('LFS server error. Unsolicited response for oid %s')
253 % response['oid'])
253 % response['oid'])
254 if 'error' in response:
254 if 'error' in response:
255 raise LfsRemoteError(_('LFS server error: %r') % response)
255 raise LfsRemoteError(_('LFS server error: %r') % response)
256
256
257 def _extractobjects(self, response, pointers, action):
257 def _extractobjects(self, response, pointers, action):
258 """extract objects from response of the batch API
258 """extract objects from response of the batch API
259
259
260 response: parsed JSON object returned by batch API
260 response: parsed JSON object returned by batch API
261 return response['objects'] filtered by action
261 return response['objects'] filtered by action
262 raise if any object has an error
262 raise if any object has an error
263 """
263 """
264 # Scan errors from objects - fail early
264 # Scan errors from objects - fail early
265 objects = response.get('objects', [])
265 objects = response.get('objects', [])
266 self._checkforservererror(pointers, objects, action)
266 self._checkforservererror(pointers, objects, action)
267
267
268 # Filter objects with given action. Practically, this skips uploading
268 # Filter objects with given action. Practically, this skips uploading
269 # objects which exist in the server.
269 # objects which exist in the server.
270 filteredobjects = [o for o in objects if action in o.get('actions', [])]
270 filteredobjects = [o for o in objects if action in o.get('actions', [])]
271
271
272 return filteredobjects
272 return filteredobjects
273
273
274 def _basictransfer(self, obj, action, localstore):
274 def _basictransfer(self, obj, action, localstore):
275 """Download or upload a single object using basic transfer protocol
275 """Download or upload a single object using basic transfer protocol
276
276
277 obj: dict, an object description returned by batch API
277 obj: dict, an object description returned by batch API
278 action: string, one of ['upload', 'download']
278 action: string, one of ['upload', 'download']
279 localstore: blobstore.local
279 localstore: blobstore.local
280
280
281 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
281 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
282 basic-transfers.md
282 basic-transfers.md
283 """
283 """
284 oid = str(obj['oid'])
284 oid = str(obj['oid'])
285
285
286 href = str(obj['actions'][action].get('href'))
286 href = str(obj['actions'][action].get('href'))
287 headers = obj['actions'][action].get('header', {}).items()
287 headers = obj['actions'][action].get('header', {}).items()
288
288
289 request = util.urlreq.request(href)
289 request = util.urlreq.request(href)
290 if action == 'upload':
290 if action == 'upload':
291 # If uploading blobs, read data from local blobstore.
291 # If uploading blobs, read data from local blobstore.
292 with localstore.open(oid) as fp:
292 with localstore.open(oid) as fp:
293 _verifyfile(oid, fp)
293 _verifyfile(oid, fp)
294 request.data = filewithprogress(localstore.open(oid), None)
294 request.data = filewithprogress(localstore.open(oid), None)
295 request.get_method = lambda: 'PUT'
295 request.get_method = lambda: 'PUT'
296
296
297 for k, v in headers:
297 for k, v in headers:
298 request.add_header(k, v)
298 request.add_header(k, v)
299
299
300 response = b''
300 response = b''
301 try:
301 try:
302 req = self.urlopener.open(request)
302 req = self.urlopener.open(request)
303 if action == 'download':
303 if action == 'download':
304 # If downloading blobs, store downloaded data to local blobstore
304 # If downloading blobs, store downloaded data to local blobstore
305 localstore.download(oid, req)
305 localstore.download(oid, req)
306 else:
306 else:
307 while True:
307 while True:
308 data = req.read(1048576)
308 data = req.read(1048576)
309 if not data:
309 if not data:
310 break
310 break
311 response += data
311 response += data
312 if response:
312 if response:
313 self.ui.debug('lfs %s response: %s' % (action, response))
313 self.ui.debug('lfs %s response: %s' % (action, response))
314 except util.urlerr.httperror as ex:
314 except util.urlerr.httperror as ex:
315 if self.ui.debugflag:
315 if self.ui.debugflag:
316 self.ui.debug('%s: %s' % (oid, ex.read()))
316 self.ui.debug('%s: %s\n' % (oid, ex.read()))
317 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
317 raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
318 % (ex, oid, action))
318 % (ex, oid, action))
319
319
320 def _batch(self, pointers, localstore, action):
320 def _batch(self, pointers, localstore, action):
321 if action not in ['upload', 'download']:
321 if action not in ['upload', 'download']:
322 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
322 raise error.ProgrammingError('invalid Git-LFS action: %s' % action)
323
323
324 response = self._batchrequest(pointers, action)
324 response = self._batchrequest(pointers, action)
325 objects = self._extractobjects(response, pointers, action)
325 objects = self._extractobjects(response, pointers, action)
326 total = sum(x.get('size', 0) for x in objects)
326 total = sum(x.get('size', 0) for x in objects)
327 sizes = {}
327 sizes = {}
328 for obj in objects:
328 for obj in objects:
329 sizes[obj.get('oid')] = obj.get('size', 0)
329 sizes[obj.get('oid')] = obj.get('size', 0)
330 topic = {'upload': _('lfs uploading'),
330 topic = {'upload': _('lfs uploading'),
331 'download': _('lfs downloading')}[action]
331 'download': _('lfs downloading')}[action]
332 if len(objects) > 1:
332 if len(objects) > 1:
333 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
333 self.ui.note(_('lfs: need to transfer %d objects (%s)\n')
334 % (len(objects), util.bytecount(total)))
334 % (len(objects), util.bytecount(total)))
335 self.ui.progress(topic, 0, total=total)
335 self.ui.progress(topic, 0, total=total)
336 def transfer(chunk):
336 def transfer(chunk):
337 for obj in chunk:
337 for obj in chunk:
338 objsize = obj.get('size', 0)
338 objsize = obj.get('size', 0)
339 if self.ui.verbose:
339 if self.ui.verbose:
340 if action == 'download':
340 if action == 'download':
341 msg = _('lfs: downloading %s (%s)\n')
341 msg = _('lfs: downloading %s (%s)\n')
342 elif action == 'upload':
342 elif action == 'upload':
343 msg = _('lfs: uploading %s (%s)\n')
343 msg = _('lfs: uploading %s (%s)\n')
344 self.ui.note(msg % (obj.get('oid'),
344 self.ui.note(msg % (obj.get('oid'),
345 util.bytecount(objsize)))
345 util.bytecount(objsize)))
346 retry = self.retry
346 retry = self.retry
347 while True:
347 while True:
348 try:
348 try:
349 self._basictransfer(obj, action, localstore)
349 self._basictransfer(obj, action, localstore)
350 yield 1, obj.get('oid')
350 yield 1, obj.get('oid')
351 break
351 break
352 except socket.error as ex:
352 except socket.error as ex:
353 if retry > 0:
353 if retry > 0:
354 self.ui.note(
354 self.ui.note(
355 _('lfs: failed: %r (remaining retry %d)\n')
355 _('lfs: failed: %r (remaining retry %d)\n')
356 % (ex, retry))
356 % (ex, retry))
357 retry -= 1
357 retry -= 1
358 continue
358 continue
359 raise
359 raise
360
360
361 # Until https multiplexing gets sorted out
361 # Until https multiplexing gets sorted out
362 if self.ui.configbool('experimental', 'lfs.worker-enable'):
362 if self.ui.configbool('experimental', 'lfs.worker-enable'):
363 oids = worker.worker(self.ui, 0.1, transfer, (),
363 oids = worker.worker(self.ui, 0.1, transfer, (),
364 sorted(objects, key=lambda o: o.get('oid')))
364 sorted(objects, key=lambda o: o.get('oid')))
365 else:
365 else:
366 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
366 oids = transfer(sorted(objects, key=lambda o: o.get('oid')))
367
367
368 processed = 0
368 processed = 0
369 for _one, oid in oids:
369 for _one, oid in oids:
370 processed += sizes[oid]
370 processed += sizes[oid]
371 self.ui.progress(topic, processed, total=total)
371 self.ui.progress(topic, processed, total=total)
372 self.ui.note(_('lfs: processed: %s\n') % oid)
372 self.ui.note(_('lfs: processed: %s\n') % oid)
373 self.ui.progress(topic, pos=None, total=total)
373 self.ui.progress(topic, pos=None, total=total)
374
374
375 def __del__(self):
375 def __del__(self):
376 # copied from mercurial/httppeer.py
376 # copied from mercurial/httppeer.py
377 urlopener = getattr(self, 'urlopener', None)
377 urlopener = getattr(self, 'urlopener', None)
378 if urlopener:
378 if urlopener:
379 for h in urlopener.handlers:
379 for h in urlopener.handlers:
380 h.close()
380 h.close()
381 getattr(h, "close_all", lambda : None)()
381 getattr(h, "close_all", lambda : None)()
382
382
383 class _dummyremote(object):
383 class _dummyremote(object):
384 """Dummy store storing blobs to temp directory."""
384 """Dummy store storing blobs to temp directory."""
385
385
386 def __init__(self, repo, url):
386 def __init__(self, repo, url):
387 fullpath = repo.vfs.join('lfs', url.path)
387 fullpath = repo.vfs.join('lfs', url.path)
388 self.vfs = lfsvfs(fullpath)
388 self.vfs = lfsvfs(fullpath)
389
389
390 def writebatch(self, pointers, fromstore):
390 def writebatch(self, pointers, fromstore):
391 for p in pointers:
391 for p in pointers:
392 content = fromstore.read(p.oid(), verify=True)
392 content = fromstore.read(p.oid(), verify=True)
393 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
393 with self.vfs(p.oid(), 'wb', atomictemp=True) as fp:
394 fp.write(content)
394 fp.write(content)
395
395
396 def readbatch(self, pointers, tostore):
396 def readbatch(self, pointers, tostore):
397 for p in pointers:
397 for p in pointers:
398 with self.vfs(p.oid(), 'rb') as fp:
398 with self.vfs(p.oid(), 'rb') as fp:
399 tostore.download(p.oid(), fp)
399 tostore.download(p.oid(), fp)
400
400
401 class _nullremote(object):
401 class _nullremote(object):
402 """Null store storing blobs to /dev/null."""
402 """Null store storing blobs to /dev/null."""
403
403
404 def __init__(self, repo, url):
404 def __init__(self, repo, url):
405 pass
405 pass
406
406
407 def writebatch(self, pointers, fromstore):
407 def writebatch(self, pointers, fromstore):
408 pass
408 pass
409
409
410 def readbatch(self, pointers, tostore):
410 def readbatch(self, pointers, tostore):
411 pass
411 pass
412
412
413 class _promptremote(object):
413 class _promptremote(object):
414 """Prompt user to set lfs.url when accessed."""
414 """Prompt user to set lfs.url when accessed."""
415
415
416 def __init__(self, repo, url):
416 def __init__(self, repo, url):
417 pass
417 pass
418
418
419 def writebatch(self, pointers, fromstore, ui=None):
419 def writebatch(self, pointers, fromstore, ui=None):
420 self._prompt()
420 self._prompt()
421
421
422 def readbatch(self, pointers, tostore, ui=None):
422 def readbatch(self, pointers, tostore, ui=None):
423 self._prompt()
423 self._prompt()
424
424
425 def _prompt(self):
425 def _prompt(self):
426 raise error.Abort(_('lfs.url needs to be configured'))
426 raise error.Abort(_('lfs.url needs to be configured'))
427
427
428 _storemap = {
428 _storemap = {
429 'https': _gitlfsremote,
429 'https': _gitlfsremote,
430 'http': _gitlfsremote,
430 'http': _gitlfsremote,
431 'file': _dummyremote,
431 'file': _dummyremote,
432 'null': _nullremote,
432 'null': _nullremote,
433 None: _promptremote,
433 None: _promptremote,
434 }
434 }
435
435
436 def _verify(oid, content):
436 def _verify(oid, content):
437 realoid = hashlib.sha256(content).hexdigest()
437 realoid = hashlib.sha256(content).hexdigest()
438 if realoid != oid:
438 if realoid != oid:
439 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
439 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
440 hint=_('run hg verify'))
440 hint=_('run hg verify'))
441
441
442 def _verifyfile(oid, fp):
442 def _verifyfile(oid, fp):
443 sha256 = hashlib.sha256()
443 sha256 = hashlib.sha256()
444 while True:
444 while True:
445 data = fp.read(1024 * 1024)
445 data = fp.read(1024 * 1024)
446 if not data:
446 if not data:
447 break
447 break
448 sha256.update(data)
448 sha256.update(data)
449 realoid = sha256.hexdigest()
449 realoid = sha256.hexdigest()
450 if realoid != oid:
450 if realoid != oid:
451 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
451 raise error.Abort(_('detected corrupt lfs object: %s') % oid,
452 hint=_('run hg verify'))
452 hint=_('run hg verify'))
453
453
454 def remote(repo):
454 def remote(repo):
455 """remotestore factory. return a store in _storemap depending on config"""
455 """remotestore factory. return a store in _storemap depending on config"""
456 url = util.url(repo.ui.config('lfs', 'url') or '')
456 url = util.url(repo.ui.config('lfs', 'url') or '')
457 scheme = url.scheme
457 scheme = url.scheme
458 if scheme not in _storemap:
458 if scheme not in _storemap:
459 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
459 raise error.Abort(_('lfs: unknown url scheme: %s') % scheme)
460 return _storemap[scheme](repo, url)
460 return _storemap[scheme](repo, url)
461
461
462 class LfsRemoteError(error.RevlogError):
462 class LfsRemoteError(error.RevlogError):
463 pass
463 pass
General Comments 0
You need to be logged in to leave comments. Login now