##// END OF EJS Templates
lfs: avoid closing connections when the worker doesn't fork...
Matt Harbison -
r50439:3556f039 stable
parent child Browse files
Show More
@@ -1,782 +1,785 b''
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
1 # blobstore.py - local and remote (speaking Git-LFS protocol) blob storages
2 #
2 #
3 # Copyright 2017 Facebook, Inc.
3 # Copyright 2017 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import contextlib
9 import contextlib
10 import errno
10 import errno
11 import hashlib
11 import hashlib
12 import json
12 import json
13 import os
13 import os
14 import re
14 import re
15 import socket
15 import socket
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.pycompat import getattr
18 from mercurial.pycompat import getattr
19 from mercurial.node import hex
19 from mercurial.node import hex
20
20
21 from mercurial import (
21 from mercurial import (
22 encoding,
22 encoding,
23 error,
23 error,
24 httpconnection as httpconnectionmod,
24 httpconnection as httpconnectionmod,
25 pathutil,
25 pathutil,
26 pycompat,
26 pycompat,
27 url as urlmod,
27 url as urlmod,
28 util,
28 util,
29 vfs as vfsmod,
29 vfs as vfsmod,
30 worker,
30 worker,
31 )
31 )
32
32
33 from mercurial.utils import (
33 from mercurial.utils import (
34 stringutil,
34 stringutil,
35 urlutil,
35 urlutil,
36 )
36 )
37
37
38 from ..largefiles import lfutil
38 from ..largefiles import lfutil
39
39
40 # 64 bytes for SHA256
40 # 64 bytes for SHA256
41 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
41 _lfsre = re.compile(br'\A[a-f0-9]{64}\Z')
42
42
43
43
44 class lfsvfs(vfsmod.vfs):
44 class lfsvfs(vfsmod.vfs):
45 def join(self, path):
45 def join(self, path):
46 """split the path at first two characters, like: XX/XXXXX..."""
46 """split the path at first two characters, like: XX/XXXXX..."""
47 if not _lfsre.match(path):
47 if not _lfsre.match(path):
48 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
48 raise error.ProgrammingError(b'unexpected lfs path: %s' % path)
49 return super(lfsvfs, self).join(path[0:2], path[2:])
49 return super(lfsvfs, self).join(path[0:2], path[2:])
50
50
51 def walk(self, path=None, onerror=None):
51 def walk(self, path=None, onerror=None):
52 """Yield (dirpath, [], oids) tuple for blobs under path
52 """Yield (dirpath, [], oids) tuple for blobs under path
53
53
54 Oids only exist in the root of this vfs, so dirpath is always ''.
54 Oids only exist in the root of this vfs, so dirpath is always ''.
55 """
55 """
56 root = os.path.normpath(self.base)
56 root = os.path.normpath(self.base)
57 # when dirpath == root, dirpath[prefixlen:] becomes empty
57 # when dirpath == root, dirpath[prefixlen:] becomes empty
58 # because len(dirpath) < prefixlen.
58 # because len(dirpath) < prefixlen.
59 prefixlen = len(pathutil.normasprefix(root))
59 prefixlen = len(pathutil.normasprefix(root))
60 oids = []
60 oids = []
61
61
62 for dirpath, dirs, files in os.walk(
62 for dirpath, dirs, files in os.walk(
63 self.reljoin(self.base, path or b''), onerror=onerror
63 self.reljoin(self.base, path or b''), onerror=onerror
64 ):
64 ):
65 dirpath = dirpath[prefixlen:]
65 dirpath = dirpath[prefixlen:]
66
66
67 # Silently skip unexpected files and directories
67 # Silently skip unexpected files and directories
68 if len(dirpath) == 2:
68 if len(dirpath) == 2:
69 oids.extend(
69 oids.extend(
70 [dirpath + f for f in files if _lfsre.match(dirpath + f)]
70 [dirpath + f for f in files if _lfsre.match(dirpath + f)]
71 )
71 )
72
72
73 yield (b'', [], oids)
73 yield (b'', [], oids)
74
74
75
75
76 class nullvfs(lfsvfs):
76 class nullvfs(lfsvfs):
77 def __init__(self):
77 def __init__(self):
78 pass
78 pass
79
79
80 def exists(self, oid):
80 def exists(self, oid):
81 return False
81 return False
82
82
83 def read(self, oid):
83 def read(self, oid):
84 # store.read() calls into here if the blob doesn't exist in its
84 # store.read() calls into here if the blob doesn't exist in its
85 # self.vfs. Raise the same error as a normal vfs when asked to read a
85 # self.vfs. Raise the same error as a normal vfs when asked to read a
86 # file that doesn't exist. The only difference is the full file path
86 # file that doesn't exist. The only difference is the full file path
87 # isn't available in the error.
87 # isn't available in the error.
88 raise IOError(
88 raise IOError(
89 errno.ENOENT,
89 errno.ENOENT,
90 pycompat.sysstr(b'%s: No such file or directory' % oid),
90 pycompat.sysstr(b'%s: No such file or directory' % oid),
91 )
91 )
92
92
93 def walk(self, path=None, onerror=None):
93 def walk(self, path=None, onerror=None):
94 return (b'', [], [])
94 return (b'', [], [])
95
95
96 def write(self, oid, data):
96 def write(self, oid, data):
97 pass
97 pass
98
98
99
99
100 class lfsuploadfile(httpconnectionmod.httpsendfile):
100 class lfsuploadfile(httpconnectionmod.httpsendfile):
101 """a file-like object that supports keepalive."""
101 """a file-like object that supports keepalive."""
102
102
103 def __init__(self, ui, filename):
103 def __init__(self, ui, filename):
104 super(lfsuploadfile, self).__init__(ui, filename, b'rb')
104 super(lfsuploadfile, self).__init__(ui, filename, b'rb')
105 self.read = self._data.read
105 self.read = self._data.read
106
106
107 def _makeprogress(self):
107 def _makeprogress(self):
108 return None # progress is handled by the worker client
108 return None # progress is handled by the worker client
109
109
110
110
111 class local:
111 class local:
112 """Local blobstore for large file contents.
112 """Local blobstore for large file contents.
113
113
114 This blobstore is used both as a cache and as a staging area for large blobs
114 This blobstore is used both as a cache and as a staging area for large blobs
115 to be uploaded to the remote blobstore.
115 to be uploaded to the remote blobstore.
116 """
116 """
117
117
118 def __init__(self, repo):
118 def __init__(self, repo):
119 fullpath = repo.svfs.join(b'lfs/objects')
119 fullpath = repo.svfs.join(b'lfs/objects')
120 self.vfs = lfsvfs(fullpath)
120 self.vfs = lfsvfs(fullpath)
121
121
122 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
122 if repo.ui.configbool(b'experimental', b'lfs.disableusercache'):
123 self.cachevfs = nullvfs()
123 self.cachevfs = nullvfs()
124 else:
124 else:
125 usercache = lfutil._usercachedir(repo.ui, b'lfs')
125 usercache = lfutil._usercachedir(repo.ui, b'lfs')
126 self.cachevfs = lfsvfs(usercache)
126 self.cachevfs = lfsvfs(usercache)
127 self.ui = repo.ui
127 self.ui = repo.ui
128
128
129 def open(self, oid):
129 def open(self, oid):
130 """Open a read-only file descriptor to the named blob, in either the
130 """Open a read-only file descriptor to the named blob, in either the
131 usercache or the local store."""
131 usercache or the local store."""
132 return open(self.path(oid), 'rb')
132 return open(self.path(oid), 'rb')
133
133
134 def path(self, oid):
134 def path(self, oid):
135 """Build the path for the given blob ``oid``.
135 """Build the path for the given blob ``oid``.
136
136
137 If the blob exists locally, the path may point to either the usercache
137 If the blob exists locally, the path may point to either the usercache
138 or the local store. If it doesn't, it will point to the local store.
138 or the local store. If it doesn't, it will point to the local store.
139 This is meant for situations where existing code that isn't LFS aware
139 This is meant for situations where existing code that isn't LFS aware
140 needs to open a blob. Generally, prefer the ``open`` method on this
140 needs to open a blob. Generally, prefer the ``open`` method on this
141 class.
141 class.
142 """
142 """
143 # The usercache is the most likely place to hold the file. Commit will
143 # The usercache is the most likely place to hold the file. Commit will
144 # write to both it and the local store, as will anything that downloads
144 # write to both it and the local store, as will anything that downloads
145 # the blobs. However, things like clone without an update won't
145 # the blobs. However, things like clone without an update won't
146 # populate the local store. For an init + push of a local clone,
146 # populate the local store. For an init + push of a local clone,
147 # the usercache is the only place it _could_ be. If not present, the
147 # the usercache is the only place it _could_ be. If not present, the
148 # missing file msg here will indicate the local repo, not the usercache.
148 # missing file msg here will indicate the local repo, not the usercache.
149 if self.cachevfs.exists(oid):
149 if self.cachevfs.exists(oid):
150 return self.cachevfs.join(oid)
150 return self.cachevfs.join(oid)
151
151
152 return self.vfs.join(oid)
152 return self.vfs.join(oid)
153
153
154 def download(self, oid, src, content_length):
154 def download(self, oid, src, content_length):
155 """Read the blob from the remote source in chunks, verify the content,
155 """Read the blob from the remote source in chunks, verify the content,
156 and write to this local blobstore."""
156 and write to this local blobstore."""
157 sha256 = hashlib.sha256()
157 sha256 = hashlib.sha256()
158 size = 0
158 size = 0
159
159
160 with self.vfs(oid, b'wb', atomictemp=True) as fp:
160 with self.vfs(oid, b'wb', atomictemp=True) as fp:
161 for chunk in util.filechunkiter(src, size=1048576):
161 for chunk in util.filechunkiter(src, size=1048576):
162 fp.write(chunk)
162 fp.write(chunk)
163 sha256.update(chunk)
163 sha256.update(chunk)
164 size += len(chunk)
164 size += len(chunk)
165
165
166 # If the server advertised a length longer than what we actually
166 # If the server advertised a length longer than what we actually
167 # received, then we should expect that the server crashed while
167 # received, then we should expect that the server crashed while
168 # producing the response (but the server has no way of telling us
168 # producing the response (but the server has no way of telling us
169 # that), and we really don't need to try to write the response to
169 # that), and we really don't need to try to write the response to
170 # the localstore, because it's not going to match the expected.
170 # the localstore, because it's not going to match the expected.
171 if content_length is not None and int(content_length) != size:
171 if content_length is not None and int(content_length) != size:
172 msg = (
172 msg = (
173 b"Response length (%d) does not match Content-Length "
173 b"Response length (%d) does not match Content-Length "
174 b"header (%d): likely server-side crash"
174 b"header (%d): likely server-side crash"
175 )
175 )
176 raise LfsRemoteError(_(msg) % (size, int(content_length)))
176 raise LfsRemoteError(_(msg) % (size, int(content_length)))
177
177
178 realoid = hex(sha256.digest())
178 realoid = hex(sha256.digest())
179 if realoid != oid:
179 if realoid != oid:
180 raise LfsCorruptionError(
180 raise LfsCorruptionError(
181 _(b'corrupt remote lfs object: %s') % oid
181 _(b'corrupt remote lfs object: %s') % oid
182 )
182 )
183
183
184 self._linktousercache(oid)
184 self._linktousercache(oid)
185
185
186 def write(self, oid, data):
186 def write(self, oid, data):
187 """Write blob to local blobstore.
187 """Write blob to local blobstore.
188
188
189 This should only be called from the filelog during a commit or similar.
189 This should only be called from the filelog during a commit or similar.
190 As such, there is no need to verify the data. Imports from a remote
190 As such, there is no need to verify the data. Imports from a remote
191 store must use ``download()`` instead."""
191 store must use ``download()`` instead."""
192 with self.vfs(oid, b'wb', atomictemp=True) as fp:
192 with self.vfs(oid, b'wb', atomictemp=True) as fp:
193 fp.write(data)
193 fp.write(data)
194
194
195 self._linktousercache(oid)
195 self._linktousercache(oid)
196
196
197 def linkfromusercache(self, oid):
197 def linkfromusercache(self, oid):
198 """Link blobs found in the user cache into this store.
198 """Link blobs found in the user cache into this store.
199
199
200 The server module needs to do this when it lets the client know not to
200 The server module needs to do this when it lets the client know not to
201 upload the blob, to ensure it is always available in this store.
201 upload the blob, to ensure it is always available in this store.
202 Normally this is done implicitly when the client reads or writes the
202 Normally this is done implicitly when the client reads or writes the
203 blob, but that doesn't happen when the server tells the client that it
203 blob, but that doesn't happen when the server tells the client that it
204 already has the blob.
204 already has the blob.
205 """
205 """
206 if not isinstance(self.cachevfs, nullvfs) and not self.vfs.exists(oid):
206 if not isinstance(self.cachevfs, nullvfs) and not self.vfs.exists(oid):
207 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
207 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
208 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
208 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
209
209
210 def _linktousercache(self, oid):
210 def _linktousercache(self, oid):
211 # XXX: should we verify the content of the cache, and hardlink back to
211 # XXX: should we verify the content of the cache, and hardlink back to
212 # the local store on success, but truncate, write and link on failure?
212 # the local store on success, but truncate, write and link on failure?
213 if not self.cachevfs.exists(oid) and not isinstance(
213 if not self.cachevfs.exists(oid) and not isinstance(
214 self.cachevfs, nullvfs
214 self.cachevfs, nullvfs
215 ):
215 ):
216 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
216 self.ui.note(_(b'lfs: adding %s to the usercache\n') % oid)
217 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
217 lfutil.link(self.vfs.join(oid), self.cachevfs.join(oid))
218
218
219 def read(self, oid, verify=True):
219 def read(self, oid, verify=True):
220 """Read blob from local blobstore."""
220 """Read blob from local blobstore."""
221 if not self.vfs.exists(oid):
221 if not self.vfs.exists(oid):
222 blob = self._read(self.cachevfs, oid, verify)
222 blob = self._read(self.cachevfs, oid, verify)
223
223
224 # Even if revlog will verify the content, it needs to be verified
224 # Even if revlog will verify the content, it needs to be verified
225 # now before making the hardlink to avoid propagating corrupt blobs.
225 # now before making the hardlink to avoid propagating corrupt blobs.
226 # Don't abort if corruption is detected, because `hg verify` will
226 # Don't abort if corruption is detected, because `hg verify` will
227 # give more useful info about the corruption- simply don't add the
227 # give more useful info about the corruption- simply don't add the
228 # hardlink.
228 # hardlink.
229 if verify or hex(hashlib.sha256(blob).digest()) == oid:
229 if verify or hex(hashlib.sha256(blob).digest()) == oid:
230 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
230 self.ui.note(_(b'lfs: found %s in the usercache\n') % oid)
231 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
231 lfutil.link(self.cachevfs.join(oid), self.vfs.join(oid))
232 else:
232 else:
233 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
233 self.ui.note(_(b'lfs: found %s in the local lfs store\n') % oid)
234 blob = self._read(self.vfs, oid, verify)
234 blob = self._read(self.vfs, oid, verify)
235 return blob
235 return blob
236
236
237 def _read(self, vfs, oid, verify):
237 def _read(self, vfs, oid, verify):
238 """Read blob (after verifying) from the given store"""
238 """Read blob (after verifying) from the given store"""
239 blob = vfs.read(oid)
239 blob = vfs.read(oid)
240 if verify:
240 if verify:
241 _verify(oid, blob)
241 _verify(oid, blob)
242 return blob
242 return blob
243
243
244 def verify(self, oid):
244 def verify(self, oid):
245 """Indicate whether or not the hash of the underlying file matches its
245 """Indicate whether or not the hash of the underlying file matches its
246 name."""
246 name."""
247 sha256 = hashlib.sha256()
247 sha256 = hashlib.sha256()
248
248
249 with self.open(oid) as fp:
249 with self.open(oid) as fp:
250 for chunk in util.filechunkiter(fp, size=1048576):
250 for chunk in util.filechunkiter(fp, size=1048576):
251 sha256.update(chunk)
251 sha256.update(chunk)
252
252
253 return oid == hex(sha256.digest())
253 return oid == hex(sha256.digest())
254
254
255 def has(self, oid):
255 def has(self, oid):
256 """Returns True if the local blobstore contains the requested blob,
256 """Returns True if the local blobstore contains the requested blob,
257 False otherwise."""
257 False otherwise."""
258 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
258 return self.cachevfs.exists(oid) or self.vfs.exists(oid)
259
259
260
260
261 def _urlerrorreason(urlerror):
261 def _urlerrorreason(urlerror):
262 """Create a friendly message for the given URLError to be used in an
262 """Create a friendly message for the given URLError to be used in an
263 LfsRemoteError message.
263 LfsRemoteError message.
264 """
264 """
265 inst = urlerror
265 inst = urlerror
266
266
267 if isinstance(urlerror.reason, Exception):
267 if isinstance(urlerror.reason, Exception):
268 inst = urlerror.reason
268 inst = urlerror.reason
269
269
270 if util.safehasattr(inst, b'reason'):
270 if util.safehasattr(inst, b'reason'):
271 try: # usually it is in the form (errno, strerror)
271 try: # usually it is in the form (errno, strerror)
272 reason = inst.reason.args[1]
272 reason = inst.reason.args[1]
273 except (AttributeError, IndexError):
273 except (AttributeError, IndexError):
274 # it might be anything, for example a string
274 # it might be anything, for example a string
275 reason = inst.reason
275 reason = inst.reason
276 if isinstance(reason, str):
276 if isinstance(reason, str):
277 # SSLError of Python 2.7.9 contains a unicode
277 # SSLError of Python 2.7.9 contains a unicode
278 reason = encoding.unitolocal(reason)
278 reason = encoding.unitolocal(reason)
279 return reason
279 return reason
280 elif getattr(inst, "strerror", None):
280 elif getattr(inst, "strerror", None):
281 return encoding.strtolocal(inst.strerror)
281 return encoding.strtolocal(inst.strerror)
282 else:
282 else:
283 return stringutil.forcebytestr(urlerror)
283 return stringutil.forcebytestr(urlerror)
284
284
285
285
286 class lfsauthhandler(util.urlreq.basehandler):
286 class lfsauthhandler(util.urlreq.basehandler):
287 handler_order = 480 # Before HTTPDigestAuthHandler (== 490)
287 handler_order = 480 # Before HTTPDigestAuthHandler (== 490)
288
288
289 def http_error_401(self, req, fp, code, msg, headers):
289 def http_error_401(self, req, fp, code, msg, headers):
290 """Enforces that any authentication performed is HTTP Basic
290 """Enforces that any authentication performed is HTTP Basic
291 Authentication. No authentication is also acceptable.
291 Authentication. No authentication is also acceptable.
292 """
292 """
293 authreq = headers.get('www-authenticate', None)
293 authreq = headers.get('www-authenticate', None)
294 if authreq:
294 if authreq:
295 scheme = authreq.split()[0]
295 scheme = authreq.split()[0]
296
296
297 if scheme.lower() != 'basic':
297 if scheme.lower() != 'basic':
298 msg = _(b'the server must support Basic Authentication')
298 msg = _(b'the server must support Basic Authentication')
299 raise util.urlerr.httperror(
299 raise util.urlerr.httperror(
300 req.get_full_url(),
300 req.get_full_url(),
301 code,
301 code,
302 encoding.strfromlocal(msg),
302 encoding.strfromlocal(msg),
303 headers,
303 headers,
304 fp,
304 fp,
305 )
305 )
306 return None
306 return None
307
307
308
308
309 class _gitlfsremote:
309 class _gitlfsremote:
310 def __init__(self, repo, url):
310 def __init__(self, repo, url):
311 ui = repo.ui
311 ui = repo.ui
312 self.ui = ui
312 self.ui = ui
313 baseurl, authinfo = url.authinfo()
313 baseurl, authinfo = url.authinfo()
314 self.baseurl = baseurl.rstrip(b'/')
314 self.baseurl = baseurl.rstrip(b'/')
315 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
315 useragent = repo.ui.config(b'experimental', b'lfs.user-agent')
316 if not useragent:
316 if not useragent:
317 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
317 useragent = b'git-lfs/2.3.4 (Mercurial %s)' % util.version()
318 self.urlopener = urlmod.opener(ui, authinfo, useragent)
318 self.urlopener = urlmod.opener(ui, authinfo, useragent)
319 self.urlopener.add_handler(lfsauthhandler())
319 self.urlopener.add_handler(lfsauthhandler())
320 self.retry = ui.configint(b'lfs', b'retry')
320 self.retry = ui.configint(b'lfs', b'retry')
321
321
322 def writebatch(self, pointers, fromstore):
322 def writebatch(self, pointers, fromstore):
323 """Batch upload from local to remote blobstore."""
323 """Batch upload from local to remote blobstore."""
324 self._batch(_deduplicate(pointers), fromstore, b'upload')
324 self._batch(_deduplicate(pointers), fromstore, b'upload')
325
325
326 def readbatch(self, pointers, tostore):
326 def readbatch(self, pointers, tostore):
327 """Batch download from remote to local blostore."""
327 """Batch download from remote to local blostore."""
328 self._batch(_deduplicate(pointers), tostore, b'download')
328 self._batch(_deduplicate(pointers), tostore, b'download')
329
329
330 def _batchrequest(self, pointers, action):
330 def _batchrequest(self, pointers, action):
331 """Get metadata about objects pointed by pointers for given action
331 """Get metadata about objects pointed by pointers for given action
332
332
333 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
333 Return decoded JSON object like {'objects': [{'oid': '', 'size': 1}]}
334 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
334 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/batch.md
335 """
335 """
336 objects = [
336 objects = [
337 {'oid': pycompat.strurl(p.oid()), 'size': p.size()}
337 {'oid': pycompat.strurl(p.oid()), 'size': p.size()}
338 for p in pointers
338 for p in pointers
339 ]
339 ]
340 requestdata = pycompat.bytesurl(
340 requestdata = pycompat.bytesurl(
341 json.dumps(
341 json.dumps(
342 {
342 {
343 'objects': objects,
343 'objects': objects,
344 'operation': pycompat.strurl(action),
344 'operation': pycompat.strurl(action),
345 }
345 }
346 )
346 )
347 )
347 )
348 url = b'%s/objects/batch' % self.baseurl
348 url = b'%s/objects/batch' % self.baseurl
349 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
349 batchreq = util.urlreq.request(pycompat.strurl(url), data=requestdata)
350 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
350 batchreq.add_header('Accept', 'application/vnd.git-lfs+json')
351 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
351 batchreq.add_header('Content-Type', 'application/vnd.git-lfs+json')
352 try:
352 try:
353 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
353 with contextlib.closing(self.urlopener.open(batchreq)) as rsp:
354 rawjson = rsp.read()
354 rawjson = rsp.read()
355 except util.urlerr.httperror as ex:
355 except util.urlerr.httperror as ex:
356 hints = {
356 hints = {
357 400: _(
357 400: _(
358 b'check that lfs serving is enabled on %s and "%s" is '
358 b'check that lfs serving is enabled on %s and "%s" is '
359 b'supported'
359 b'supported'
360 )
360 )
361 % (self.baseurl, action),
361 % (self.baseurl, action),
362 404: _(b'the "lfs.url" config may be used to override %s')
362 404: _(b'the "lfs.url" config may be used to override %s')
363 % self.baseurl,
363 % self.baseurl,
364 }
364 }
365 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
365 hint = hints.get(ex.code, _(b'api=%s, action=%s') % (url, action))
366 raise LfsRemoteError(
366 raise LfsRemoteError(
367 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
367 _(b'LFS HTTP error: %s') % stringutil.forcebytestr(ex),
368 hint=hint,
368 hint=hint,
369 )
369 )
370 except util.urlerr.urlerror as ex:
370 except util.urlerr.urlerror as ex:
371 hint = (
371 hint = (
372 _(b'the "lfs.url" config may be used to override %s')
372 _(b'the "lfs.url" config may be used to override %s')
373 % self.baseurl
373 % self.baseurl
374 )
374 )
375 raise LfsRemoteError(
375 raise LfsRemoteError(
376 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
376 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
377 )
377 )
378 try:
378 try:
379 response = pycompat.json_loads(rawjson)
379 response = pycompat.json_loads(rawjson)
380 except ValueError:
380 except ValueError:
381 raise LfsRemoteError(
381 raise LfsRemoteError(
382 _(b'LFS server returns invalid JSON: %s')
382 _(b'LFS server returns invalid JSON: %s')
383 % rawjson.encode("utf-8")
383 % rawjson.encode("utf-8")
384 )
384 )
385
385
386 if self.ui.debugflag:
386 if self.ui.debugflag:
387 self.ui.debug(b'Status: %d\n' % rsp.status)
387 self.ui.debug(b'Status: %d\n' % rsp.status)
388 # lfs-test-server and hg serve return headers in different order
388 # lfs-test-server and hg serve return headers in different order
389 headers = pycompat.bytestr(rsp.info()).strip()
389 headers = pycompat.bytestr(rsp.info()).strip()
390 self.ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
390 self.ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
391
391
392 if 'objects' in response:
392 if 'objects' in response:
393 response['objects'] = sorted(
393 response['objects'] = sorted(
394 response['objects'], key=lambda p: p['oid']
394 response['objects'], key=lambda p: p['oid']
395 )
395 )
396 self.ui.debug(
396 self.ui.debug(
397 b'%s\n'
397 b'%s\n'
398 % pycompat.bytesurl(
398 % pycompat.bytesurl(
399 json.dumps(
399 json.dumps(
400 response,
400 response,
401 indent=2,
401 indent=2,
402 separators=('', ': '),
402 separators=('', ': '),
403 sort_keys=True,
403 sort_keys=True,
404 )
404 )
405 )
405 )
406 )
406 )
407
407
408 def encodestr(x):
408 def encodestr(x):
409 if isinstance(x, str):
409 if isinstance(x, str):
410 return x.encode('utf-8')
410 return x.encode('utf-8')
411 return x
411 return x
412
412
413 return pycompat.rapply(encodestr, response)
413 return pycompat.rapply(encodestr, response)
414
414
415 def _checkforservererror(self, pointers, responses, action):
415 def _checkforservererror(self, pointers, responses, action):
416 """Scans errors from objects
416 """Scans errors from objects
417
417
418 Raises LfsRemoteError if any objects have an error"""
418 Raises LfsRemoteError if any objects have an error"""
419 for response in responses:
419 for response in responses:
420 # The server should return 404 when objects cannot be found. Some
420 # The server should return 404 when objects cannot be found. Some
421 # server implementation (ex. lfs-test-server) does not set "error"
421 # server implementation (ex. lfs-test-server) does not set "error"
422 # but just removes "download" from "actions". Treat that case
422 # but just removes "download" from "actions". Treat that case
423 # as the same as 404 error.
423 # as the same as 404 error.
424 if b'error' not in response:
424 if b'error' not in response:
425 if action == b'download' and action not in response.get(
425 if action == b'download' and action not in response.get(
426 b'actions', []
426 b'actions', []
427 ):
427 ):
428 code = 404
428 code = 404
429 else:
429 else:
430 continue
430 continue
431 else:
431 else:
432 # An error dict without a code doesn't make much sense, so
432 # An error dict without a code doesn't make much sense, so
433 # treat as a server error.
433 # treat as a server error.
434 code = response.get(b'error').get(b'code', 500)
434 code = response.get(b'error').get(b'code', 500)
435
435
436 ptrmap = {p.oid(): p for p in pointers}
436 ptrmap = {p.oid(): p for p in pointers}
437 p = ptrmap.get(response[b'oid'], None)
437 p = ptrmap.get(response[b'oid'], None)
438 if p:
438 if p:
439 filename = getattr(p, 'filename', b'unknown')
439 filename = getattr(p, 'filename', b'unknown')
440 errors = {
440 errors = {
441 404: b'The object does not exist',
441 404: b'The object does not exist',
442 410: b'The object was removed by the owner',
442 410: b'The object was removed by the owner',
443 422: b'Validation error',
443 422: b'Validation error',
444 500: b'Internal server error',
444 500: b'Internal server error',
445 }
445 }
446 msg = errors.get(code, b'status code %d' % code)
446 msg = errors.get(code, b'status code %d' % code)
447 raise LfsRemoteError(
447 raise LfsRemoteError(
448 _(b'LFS server error for "%s": %s') % (filename, msg)
448 _(b'LFS server error for "%s": %s') % (filename, msg)
449 )
449 )
450 else:
450 else:
451 raise LfsRemoteError(
451 raise LfsRemoteError(
452 _(b'LFS server error. Unsolicited response for oid %s')
452 _(b'LFS server error. Unsolicited response for oid %s')
453 % response[b'oid']
453 % response[b'oid']
454 )
454 )
455
455
456 def _extractobjects(self, response, pointers, action):
456 def _extractobjects(self, response, pointers, action):
457 """extract objects from response of the batch API
457 """extract objects from response of the batch API
458
458
459 response: parsed JSON object returned by batch API
459 response: parsed JSON object returned by batch API
460 return response['objects'] filtered by action
460 return response['objects'] filtered by action
461 raise if any object has an error
461 raise if any object has an error
462 """
462 """
463 # Scan errors from objects - fail early
463 # Scan errors from objects - fail early
464 objects = response.get(b'objects', [])
464 objects = response.get(b'objects', [])
465 self._checkforservererror(pointers, objects, action)
465 self._checkforservererror(pointers, objects, action)
466
466
467 # Filter objects with given action. Practically, this skips uploading
467 # Filter objects with given action. Practically, this skips uploading
468 # objects which exist in the server.
468 # objects which exist in the server.
469 filteredobjects = [
469 filteredobjects = [
470 o for o in objects if action in o.get(b'actions', [])
470 o for o in objects if action in o.get(b'actions', [])
471 ]
471 ]
472
472
473 return filteredobjects
473 return filteredobjects
474
474
475 def _basictransfer(self, obj, action, localstore):
475 def _basictransfer(self, obj, action, localstore):
476 """Download or upload a single object using basic transfer protocol
476 """Download or upload a single object using basic transfer protocol
477
477
478 obj: dict, an object description returned by batch API
478 obj: dict, an object description returned by batch API
479 action: string, one of ['upload', 'download']
479 action: string, one of ['upload', 'download']
480 localstore: blobstore.local
480 localstore: blobstore.local
481
481
482 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
482 See https://github.com/git-lfs/git-lfs/blob/master/docs/api/\
483 basic-transfers.md
483 basic-transfers.md
484 """
484 """
485 oid = obj[b'oid']
485 oid = obj[b'oid']
486 href = obj[b'actions'][action].get(b'href')
486 href = obj[b'actions'][action].get(b'href')
487 headers = obj[b'actions'][action].get(b'header', {}).items()
487 headers = obj[b'actions'][action].get(b'header', {}).items()
488
488
489 request = util.urlreq.request(pycompat.strurl(href))
489 request = util.urlreq.request(pycompat.strurl(href))
490 if action == b'upload':
490 if action == b'upload':
491 # If uploading blobs, read data from local blobstore.
491 # If uploading blobs, read data from local blobstore.
492 if not localstore.verify(oid):
492 if not localstore.verify(oid):
493 raise error.Abort(
493 raise error.Abort(
494 _(b'detected corrupt lfs object: %s') % oid,
494 _(b'detected corrupt lfs object: %s') % oid,
495 hint=_(b'run hg verify'),
495 hint=_(b'run hg verify'),
496 )
496 )
497
497
498 for k, v in headers:
498 for k, v in headers:
499 request.add_header(pycompat.strurl(k), pycompat.strurl(v))
499 request.add_header(pycompat.strurl(k), pycompat.strurl(v))
500
500
501 try:
501 try:
502 if action == b'upload':
502 if action == b'upload':
503 request.data = lfsuploadfile(self.ui, localstore.path(oid))
503 request.data = lfsuploadfile(self.ui, localstore.path(oid))
504 request.get_method = lambda: 'PUT'
504 request.get_method = lambda: 'PUT'
505 request.add_header('Content-Type', 'application/octet-stream')
505 request.add_header('Content-Type', 'application/octet-stream')
506 request.add_header('Content-Length', request.data.length)
506 request.add_header('Content-Length', request.data.length)
507
507
508 with contextlib.closing(self.urlopener.open(request)) as res:
508 with contextlib.closing(self.urlopener.open(request)) as res:
509 contentlength = res.info().get(b"content-length")
509 contentlength = res.info().get(b"content-length")
510 ui = self.ui # Shorten debug lines
510 ui = self.ui # Shorten debug lines
511 if self.ui.debugflag:
511 if self.ui.debugflag:
512 ui.debug(b'Status: %d\n' % res.status)
512 ui.debug(b'Status: %d\n' % res.status)
513 # lfs-test-server and hg serve return headers in different
513 # lfs-test-server and hg serve return headers in different
514 # order
514 # order
515 headers = pycompat.bytestr(res.info()).strip()
515 headers = pycompat.bytestr(res.info()).strip()
516 ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
516 ui.debug(b'%s\n' % b'\n'.join(sorted(headers.splitlines())))
517
517
518 if action == b'download':
518 if action == b'download':
519 # If downloading blobs, store downloaded data to local
519 # If downloading blobs, store downloaded data to local
520 # blobstore
520 # blobstore
521 localstore.download(oid, res, contentlength)
521 localstore.download(oid, res, contentlength)
522 else:
522 else:
523 blocks = []
523 blocks = []
524 while True:
524 while True:
525 data = res.read(1048576)
525 data = res.read(1048576)
526 if not data:
526 if not data:
527 break
527 break
528 blocks.append(data)
528 blocks.append(data)
529
529
530 response = b"".join(blocks)
530 response = b"".join(blocks)
531 if response:
531 if response:
532 ui.debug(b'lfs %s response: %s' % (action, response))
532 ui.debug(b'lfs %s response: %s' % (action, response))
533 except util.urlerr.httperror as ex:
533 except util.urlerr.httperror as ex:
534 if self.ui.debugflag:
534 if self.ui.debugflag:
535 self.ui.debug(
535 self.ui.debug(
536 b'%s: %s\n' % (oid, ex.read())
536 b'%s: %s\n' % (oid, ex.read())
537 ) # XXX: also bytes?
537 ) # XXX: also bytes?
538 raise LfsRemoteError(
538 raise LfsRemoteError(
539 _(b'LFS HTTP error: %s (oid=%s, action=%s)')
539 _(b'LFS HTTP error: %s (oid=%s, action=%s)')
540 % (stringutil.forcebytestr(ex), oid, action)
540 % (stringutil.forcebytestr(ex), oid, action)
541 )
541 )
542 except util.urlerr.urlerror as ex:
542 except util.urlerr.urlerror as ex:
543 hint = _(b'attempted connection to %s') % pycompat.bytesurl(
543 hint = _(b'attempted connection to %s') % pycompat.bytesurl(
544 util.urllibcompat.getfullurl(request)
544 util.urllibcompat.getfullurl(request)
545 )
545 )
546 raise LfsRemoteError(
546 raise LfsRemoteError(
547 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
547 _(b'LFS error: %s') % _urlerrorreason(ex), hint=hint
548 )
548 )
549 finally:
549 finally:
550 if request.data:
550 if request.data:
551 request.data.close()
551 request.data.close()
552
552
553 def _batch(self, pointers, localstore, action):
553 def _batch(self, pointers, localstore, action):
554 if action not in [b'upload', b'download']:
554 if action not in [b'upload', b'download']:
555 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
555 raise error.ProgrammingError(b'invalid Git-LFS action: %s' % action)
556
556
557 response = self._batchrequest(pointers, action)
557 response = self._batchrequest(pointers, action)
558 objects = self._extractobjects(response, pointers, action)
558 objects = self._extractobjects(response, pointers, action)
559 total = sum(x.get(b'size', 0) for x in objects)
559 total = sum(x.get(b'size', 0) for x in objects)
560 sizes = {}
560 sizes = {}
561 for obj in objects:
561 for obj in objects:
562 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
562 sizes[obj.get(b'oid')] = obj.get(b'size', 0)
563 topic = {
563 topic = {
564 b'upload': _(b'lfs uploading'),
564 b'upload': _(b'lfs uploading'),
565 b'download': _(b'lfs downloading'),
565 b'download': _(b'lfs downloading'),
566 }[action]
566 }[action]
567 if len(objects) > 1:
567 if len(objects) > 1:
568 self.ui.note(
568 self.ui.note(
569 _(b'lfs: need to transfer %d objects (%s)\n')
569 _(b'lfs: need to transfer %d objects (%s)\n')
570 % (len(objects), util.bytecount(total))
570 % (len(objects), util.bytecount(total))
571 )
571 )
572
572
573 def transfer(chunk):
573 def transfer(chunk):
574 for obj in chunk:
574 for obj in chunk:
575 objsize = obj.get(b'size', 0)
575 objsize = obj.get(b'size', 0)
576 if self.ui.verbose:
576 if self.ui.verbose:
577 if action == b'download':
577 if action == b'download':
578 msg = _(b'lfs: downloading %s (%s)\n')
578 msg = _(b'lfs: downloading %s (%s)\n')
579 elif action == b'upload':
579 elif action == b'upload':
580 msg = _(b'lfs: uploading %s (%s)\n')
580 msg = _(b'lfs: uploading %s (%s)\n')
581 self.ui.note(
581 self.ui.note(
582 msg % (obj.get(b'oid'), util.bytecount(objsize))
582 msg % (obj.get(b'oid'), util.bytecount(objsize))
583 )
583 )
584 retry = self.retry
584 retry = self.retry
585 while True:
585 while True:
586 try:
586 try:
587 self._basictransfer(obj, action, localstore)
587 self._basictransfer(obj, action, localstore)
588 yield 1, obj.get(b'oid')
588 yield 1, obj.get(b'oid')
589 break
589 break
590 except socket.error as ex:
590 except socket.error as ex:
591 if retry > 0:
591 if retry > 0:
592 self.ui.note(
592 self.ui.note(
593 _(b'lfs: failed: %r (remaining retry %d)\n')
593 _(b'lfs: failed: %r (remaining retry %d)\n')
594 % (stringutil.forcebytestr(ex), retry)
594 % (stringutil.forcebytestr(ex), retry)
595 )
595 )
596 retry -= 1
596 retry -= 1
597 continue
597 continue
598 raise
598 raise
599
599
600 # Until https multiplexing gets sorted out
600 # Until https multiplexing gets sorted out. It's not clear if
601 # ConnectionManager.set_ready() is externally synchronized for thread
602 # safety with Windows workers.
601 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
603 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
602 # The POSIX workers are forks of this process, so before spinning
604 # The POSIX workers are forks of this process, so before spinning
603 # them up, close all pooled connections. Otherwise, there's no way
605 # them up, close all pooled connections. Otherwise, there's no way
604 # to coordinate between them about who is using what, and the
606 # to coordinate between them about who is using what, and the
605 # transfers will get corrupted.
607 # transfers will get corrupted.
606 #
608 #
607 # TODO: add a function to keepalive.ConnectionManager to mark all
609 # TODO: add a function to keepalive.ConnectionManager to mark all
608 # ready connections as in use, and roll that back after the fork?
610 # ready connections as in use, and roll that back after the fork?
609 # That would allow the existing pool of connections in this process
611 # That would allow the existing pool of connections in this process
610 # to be preserved.
612 # to be preserved.
611 if not pycompat.iswindows:
613 def prefork():
612 for h in self.urlopener.handlers:
614 for h in self.urlopener.handlers:
613 getattr(h, "close_all", lambda: None)()
615 getattr(h, "close_all", lambda: None)()
614
616
615 oids = worker.worker(
617 oids = worker.worker(
616 self.ui,
618 self.ui,
617 0.1,
619 0.1,
618 transfer,
620 transfer,
619 (),
621 (),
620 sorted(objects, key=lambda o: o.get(b'oid')),
622 sorted(objects, key=lambda o: o.get(b'oid')),
623 prefork=prefork,
621 )
624 )
622 else:
625 else:
623 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
626 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
624
627
625 with self.ui.makeprogress(
628 with self.ui.makeprogress(
626 topic, unit=_(b"bytes"), total=total
629 topic, unit=_(b"bytes"), total=total
627 ) as progress:
630 ) as progress:
628 progress.update(0)
631 progress.update(0)
629 processed = 0
632 processed = 0
630 blobs = 0
633 blobs = 0
631 for _one, oid in oids:
634 for _one, oid in oids:
632 processed += sizes[oid]
635 processed += sizes[oid]
633 blobs += 1
636 blobs += 1
634 progress.update(processed)
637 progress.update(processed)
635 self.ui.note(_(b'lfs: processed: %s\n') % oid)
638 self.ui.note(_(b'lfs: processed: %s\n') % oid)
636
639
637 if blobs > 0:
640 if blobs > 0:
638 if action == b'upload':
641 if action == b'upload':
639 self.ui.status(
642 self.ui.status(
640 _(b'lfs: uploaded %d files (%s)\n')
643 _(b'lfs: uploaded %d files (%s)\n')
641 % (blobs, util.bytecount(processed))
644 % (blobs, util.bytecount(processed))
642 )
645 )
643 elif action == b'download':
646 elif action == b'download':
644 self.ui.status(
647 self.ui.status(
645 _(b'lfs: downloaded %d files (%s)\n')
648 _(b'lfs: downloaded %d files (%s)\n')
646 % (blobs, util.bytecount(processed))
649 % (blobs, util.bytecount(processed))
647 )
650 )
648
651
649 def __del__(self):
652 def __del__(self):
650 # copied from mercurial/httppeer.py
653 # copied from mercurial/httppeer.py
651 urlopener = getattr(self, 'urlopener', None)
654 urlopener = getattr(self, 'urlopener', None)
652 if urlopener:
655 if urlopener:
653 for h in urlopener.handlers:
656 for h in urlopener.handlers:
654 h.close()
657 h.close()
655 getattr(h, "close_all", lambda: None)()
658 getattr(h, "close_all", lambda: None)()
656
659
657
660
658 class _dummyremote:
661 class _dummyremote:
659 """Dummy store storing blobs to temp directory."""
662 """Dummy store storing blobs to temp directory."""
660
663
661 def __init__(self, repo, url):
664 def __init__(self, repo, url):
662 fullpath = repo.vfs.join(b'lfs', url.path)
665 fullpath = repo.vfs.join(b'lfs', url.path)
663 self.vfs = lfsvfs(fullpath)
666 self.vfs = lfsvfs(fullpath)
664
667
665 def writebatch(self, pointers, fromstore):
668 def writebatch(self, pointers, fromstore):
666 for p in _deduplicate(pointers):
669 for p in _deduplicate(pointers):
667 content = fromstore.read(p.oid(), verify=True)
670 content = fromstore.read(p.oid(), verify=True)
668 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
671 with self.vfs(p.oid(), b'wb', atomictemp=True) as fp:
669 fp.write(content)
672 fp.write(content)
670
673
671 def readbatch(self, pointers, tostore):
674 def readbatch(self, pointers, tostore):
672 for p in _deduplicate(pointers):
675 for p in _deduplicate(pointers):
673 with self.vfs(p.oid(), b'rb') as fp:
676 with self.vfs(p.oid(), b'rb') as fp:
674 tostore.download(p.oid(), fp, None)
677 tostore.download(p.oid(), fp, None)
675
678
676
679
677 class _nullremote:
680 class _nullremote:
678 """Null store storing blobs to /dev/null."""
681 """Null store storing blobs to /dev/null."""
679
682
680 def __init__(self, repo, url):
683 def __init__(self, repo, url):
681 pass
684 pass
682
685
683 def writebatch(self, pointers, fromstore):
686 def writebatch(self, pointers, fromstore):
684 pass
687 pass
685
688
686 def readbatch(self, pointers, tostore):
689 def readbatch(self, pointers, tostore):
687 pass
690 pass
688
691
689
692
690 class _promptremote:
693 class _promptremote:
691 """Prompt user to set lfs.url when accessed."""
694 """Prompt user to set lfs.url when accessed."""
692
695
693 def __init__(self, repo, url):
696 def __init__(self, repo, url):
694 pass
697 pass
695
698
696 def writebatch(self, pointers, fromstore, ui=None):
699 def writebatch(self, pointers, fromstore, ui=None):
697 self._prompt()
700 self._prompt()
698
701
699 def readbatch(self, pointers, tostore, ui=None):
702 def readbatch(self, pointers, tostore, ui=None):
700 self._prompt()
703 self._prompt()
701
704
702 def _prompt(self):
705 def _prompt(self):
703 raise error.Abort(_(b'lfs.url needs to be configured'))
706 raise error.Abort(_(b'lfs.url needs to be configured'))
704
707
705
708
706 _storemap = {
709 _storemap = {
707 b'https': _gitlfsremote,
710 b'https': _gitlfsremote,
708 b'http': _gitlfsremote,
711 b'http': _gitlfsremote,
709 b'file': _dummyremote,
712 b'file': _dummyremote,
710 b'null': _nullremote,
713 b'null': _nullremote,
711 None: _promptremote,
714 None: _promptremote,
712 }
715 }
713
716
714
717
715 def _deduplicate(pointers):
718 def _deduplicate(pointers):
716 """Remove any duplicate oids that exist in the list"""
719 """Remove any duplicate oids that exist in the list"""
717 reduced = util.sortdict()
720 reduced = util.sortdict()
718 for p in pointers:
721 for p in pointers:
719 reduced[p.oid()] = p
722 reduced[p.oid()] = p
720 return reduced.values()
723 return reduced.values()
721
724
722
725
723 def _verify(oid, content):
726 def _verify(oid, content):
724 realoid = hex(hashlib.sha256(content).digest())
727 realoid = hex(hashlib.sha256(content).digest())
725 if realoid != oid:
728 if realoid != oid:
726 raise LfsCorruptionError(
729 raise LfsCorruptionError(
727 _(b'detected corrupt lfs object: %s') % oid,
730 _(b'detected corrupt lfs object: %s') % oid,
728 hint=_(b'run hg verify'),
731 hint=_(b'run hg verify'),
729 )
732 )
730
733
731
734
732 def remote(repo, remote=None):
735 def remote(repo, remote=None):
733 """remotestore factory. return a store in _storemap depending on config
736 """remotestore factory. return a store in _storemap depending on config
734
737
735 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
738 If ``lfs.url`` is specified, use that remote endpoint. Otherwise, try to
736 infer the endpoint, based on the remote repository using the same path
739 infer the endpoint, based on the remote repository using the same path
737 adjustments as git. As an extension, 'http' is supported as well so that
740 adjustments as git. As an extension, 'http' is supported as well so that
738 ``hg serve`` works out of the box.
741 ``hg serve`` works out of the box.
739
742
740 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
743 https://github.com/git-lfs/git-lfs/blob/master/docs/api/server-discovery.md
741 """
744 """
742 lfsurl = repo.ui.config(b'lfs', b'url')
745 lfsurl = repo.ui.config(b'lfs', b'url')
743 url = urlutil.url(lfsurl or b'')
746 url = urlutil.url(lfsurl or b'')
744 if lfsurl is None:
747 if lfsurl is None:
745 if remote:
748 if remote:
746 path = remote
749 path = remote
747 elif util.safehasattr(repo, b'_subtoppath'):
750 elif util.safehasattr(repo, b'_subtoppath'):
748 # The pull command sets this during the optional update phase, which
751 # The pull command sets this during the optional update phase, which
749 # tells exactly where the pull originated, whether 'paths.default'
752 # tells exactly where the pull originated, whether 'paths.default'
750 # or explicit.
753 # or explicit.
751 path = repo._subtoppath
754 path = repo._subtoppath
752 else:
755 else:
753 # TODO: investigate 'paths.remote:lfsurl' style path customization,
756 # TODO: investigate 'paths.remote:lfsurl' style path customization,
754 # and fall back to inferring from 'paths.remote' if unspecified.
757 # and fall back to inferring from 'paths.remote' if unspecified.
755 path = repo.ui.config(b'paths', b'default') or b''
758 path = repo.ui.config(b'paths', b'default') or b''
756
759
757 defaulturl = urlutil.url(path)
760 defaulturl = urlutil.url(path)
758
761
759 # TODO: support local paths as well.
762 # TODO: support local paths as well.
760 # TODO: consider the ssh -> https transformation that git applies
763 # TODO: consider the ssh -> https transformation that git applies
761 if defaulturl.scheme in (b'http', b'https'):
764 if defaulturl.scheme in (b'http', b'https'):
762 if defaulturl.path and defaulturl.path[:-1] != b'/':
765 if defaulturl.path and defaulturl.path[:-1] != b'/':
763 defaulturl.path += b'/'
766 defaulturl.path += b'/'
764 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
767 defaulturl.path = (defaulturl.path or b'') + b'.git/info/lfs'
765
768
766 url = urlutil.url(bytes(defaulturl))
769 url = urlutil.url(bytes(defaulturl))
767 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
770 repo.ui.note(_(b'lfs: assuming remote store: %s\n') % url)
768
771
769 scheme = url.scheme
772 scheme = url.scheme
770 if scheme not in _storemap:
773 if scheme not in _storemap:
771 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
774 raise error.Abort(_(b'lfs: unknown url scheme: %s') % scheme)
772 return _storemap[scheme](repo, url)
775 return _storemap[scheme](repo, url)
773
776
774
777
775 class LfsRemoteError(error.StorageError):
778 class LfsRemoteError(error.StorageError):
776 pass
779 pass
777
780
778
781
779 class LfsCorruptionError(error.Abort):
782 class LfsCorruptionError(error.Abort):
780 """Raised when a corrupt blob is detected, aborting an operation
783 """Raised when a corrupt blob is detected, aborting an operation
781
784
782 It exists to allow specialized handling on the server side."""
785 It exists to allow specialized handling on the server side."""
@@ -1,455 +1,472 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import os
9 import os
10 import pickle
10 import pickle
11 import selectors
11 import selectors
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 from .i18n import _
17 from .i18n import _
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 scmutil,
22 scmutil,
23 )
23 )
24
24
25
25
26 def countcpus():
26 def countcpus():
27 '''try to count the number of CPUs on the system'''
27 '''try to count the number of CPUs on the system'''
28
28
29 # posix
29 # posix
30 try:
30 try:
31 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
31 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
32 if n > 0:
32 if n > 0:
33 return n
33 return n
34 except (AttributeError, ValueError):
34 except (AttributeError, ValueError):
35 pass
35 pass
36
36
37 # windows
37 # windows
38 try:
38 try:
39 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
39 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (KeyError, ValueError):
42 except (KeyError, ValueError):
43 pass
43 pass
44
44
45 return 1
45 return 1
46
46
47
47
48 def _numworkers(ui):
48 def _numworkers(ui):
49 s = ui.config(b'worker', b'numcpus')
49 s = ui.config(b'worker', b'numcpus')
50 if s:
50 if s:
51 try:
51 try:
52 n = int(s)
52 n = int(s)
53 if n >= 1:
53 if n >= 1:
54 return n
54 return n
55 except ValueError:
55 except ValueError:
56 raise error.Abort(_(b'number of cpus must be an integer'))
56 raise error.Abort(_(b'number of cpus must be an integer'))
57 return min(max(countcpus(), 4), 32)
57 return min(max(countcpus(), 4), 32)
58
58
59
59
60 def ismainthread():
60 def ismainthread():
61 return threading.current_thread() == threading.main_thread()
61 return threading.current_thread() == threading.main_thread()
62
62
63
63
64 class _blockingreader:
64 class _blockingreader:
65 """Wrap unbuffered stream such that pickle.load() works with it.
65 """Wrap unbuffered stream such that pickle.load() works with it.
66
66
67 pickle.load() expects that calls to read() and readinto() read as many
67 pickle.load() expects that calls to read() and readinto() read as many
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
69 pickle.load() raises an EOFError.
69 pickle.load() raises an EOFError.
70 """
70 """
71
71
72 def __init__(self, wrapped):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
73 self._wrapped = wrapped
74
74
75 def readline(self):
75 def readline(self):
76 return self._wrapped.readline()
76 return self._wrapped.readline()
77
77
78 def readinto(self, buf):
78 def readinto(self, buf):
79 pos = 0
79 pos = 0
80 size = len(buf)
80 size = len(buf)
81
81
82 with memoryview(buf) as view:
82 with memoryview(buf) as view:
83 while pos < size:
83 while pos < size:
84 with view[pos:] as subview:
84 with view[pos:] as subview:
85 ret = self._wrapped.readinto(subview)
85 ret = self._wrapped.readinto(subview)
86 if not ret:
86 if not ret:
87 break
87 break
88 pos += ret
88 pos += ret
89
89
90 return pos
90 return pos
91
91
92 # issue multiple reads until size is fulfilled (or EOF is encountered)
92 # issue multiple reads until size is fulfilled (or EOF is encountered)
93 def read(self, size=-1):
93 def read(self, size=-1):
94 if size < 0:
94 if size < 0:
95 return self._wrapped.readall()
95 return self._wrapped.readall()
96
96
97 buf = bytearray(size)
97 buf = bytearray(size)
98 n_read = self.readinto(buf)
98 n_read = self.readinto(buf)
99 del buf[n_read:]
99 del buf[n_read:]
100 return bytes(buf)
100 return bytes(buf)
101
101
102
102
103 if pycompat.isposix or pycompat.iswindows:
103 if pycompat.isposix or pycompat.iswindows:
104 _STARTUP_COST = 0.01
104 _STARTUP_COST = 0.01
105 # The Windows worker is thread based. If tasks are CPU bound, threads
105 # The Windows worker is thread based. If tasks are CPU bound, threads
106 # in the presence of the GIL result in excessive context switching and
106 # in the presence of the GIL result in excessive context switching and
107 # this overhead can slow down execution.
107 # this overhead can slow down execution.
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 else:
109 else:
110 _STARTUP_COST = 1e30
110 _STARTUP_COST = 1e30
111 _DISALLOW_THREAD_UNSAFE = False
111 _DISALLOW_THREAD_UNSAFE = False
112
112
113
113
114 def worthwhile(ui, costperop, nops, threadsafe=True):
114 def worthwhile(ui, costperop, nops, threadsafe=True):
115 """try to determine whether the benefit of multiple processes can
115 """try to determine whether the benefit of multiple processes can
116 outweigh the cost of starting them"""
116 outweigh the cost of starting them"""
117
117
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 return False
119 return False
120
120
121 linear = costperop * nops
121 linear = costperop * nops
122 workers = _numworkers(ui)
122 workers = _numworkers(ui)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 return benefit >= 0.15
124 return benefit >= 0.15
125
125
126
126
127 def worker(
127 def worker(
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ui,
129 costperarg,
130 func,
131 staticargs,
132 args,
133 hasretval=False,
134 threadsafe=True,
135 prefork=None,
129 ):
136 ):
130 """run a function, possibly in parallel in multiple worker
137 """run a function, possibly in parallel in multiple worker
131 processes.
138 processes.
132
139
133 returns a progress iterator
140 returns a progress iterator
134
141
135 costperarg - cost of a single task
142 costperarg - cost of a single task
136
143
137 func - function to run. It is expected to return a progress iterator.
144 func - function to run. It is expected to return a progress iterator.
138
145
139 staticargs - arguments to pass to every invocation of the function
146 staticargs - arguments to pass to every invocation of the function
140
147
141 args - arguments to split into chunks, to pass to individual
148 args - arguments to split into chunks, to pass to individual
142 workers
149 workers
143
150
144 hasretval - when True, func and the current function return an progress
151 hasretval - when True, func and the current function return an progress
145 iterator then a dict (encoded as an iterator that yield many (False, ..)
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
146 then a (True, dict)). The dicts are joined in some arbitrary order, so
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
147 overlapping keys are a bad idea.
154 overlapping keys are a bad idea.
148
155
149 threadsafe - whether work items are thread safe and can be executed using
156 threadsafe - whether work items are thread safe and can be executed using
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 release the GIL.
158 release the GIL.
159
160 prefork - a parameterless Callable that is invoked prior to forking the
161 process. fork() is only used on non-Windows platforms, but is also not
162 called on POSIX platforms if the work amount doesn't warrant a worker.
152 """
163 """
153 enabled = ui.configbool(b'worker', b'enabled')
164 enabled = ui.configbool(b'worker', b'enabled')
154 if enabled and _platformworker is _posixworker and not ismainthread():
165 if enabled and _platformworker is _posixworker and not ismainthread():
155 # The POSIX worker has to install a handler for SIGCHLD.
166 # The POSIX worker has to install a handler for SIGCHLD.
156 # Python up to 3.9 only allows this in the main thread.
167 # Python up to 3.9 only allows this in the main thread.
157 enabled = False
168 enabled = False
158
169
159 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
160 return _platformworker(ui, func, staticargs, args, hasretval)
171 return _platformworker(
172 ui, func, staticargs, args, hasretval, prefork=prefork
173 )
161 return func(*staticargs + (args,))
174 return func(*staticargs + (args,))
162
175
163
176
164 def _posixworker(ui, func, staticargs, args, hasretval):
177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
165 workers = _numworkers(ui)
178 workers = _numworkers(ui)
166 oldhandler = signal.getsignal(signal.SIGINT)
179 oldhandler = signal.getsignal(signal.SIGINT)
167 signal.signal(signal.SIGINT, signal.SIG_IGN)
180 signal.signal(signal.SIGINT, signal.SIG_IGN)
168 pids, problem = set(), [0]
181 pids, problem = set(), [0]
169
182
170 def killworkers():
183 def killworkers():
171 # unregister SIGCHLD handler as all children will be killed. This
184 # unregister SIGCHLD handler as all children will be killed. This
172 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
185 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
173 # could be updated while iterating, which would cause inconsistency.
186 # could be updated while iterating, which would cause inconsistency.
174 signal.signal(signal.SIGCHLD, oldchldhandler)
187 signal.signal(signal.SIGCHLD, oldchldhandler)
175 # if one worker bails, there's no good reason to wait for the rest
188 # if one worker bails, there's no good reason to wait for the rest
176 for p in pids:
189 for p in pids:
177 try:
190 try:
178 os.kill(p, signal.SIGTERM)
191 os.kill(p, signal.SIGTERM)
179 except ProcessLookupError:
192 except ProcessLookupError:
180 pass
193 pass
181
194
182 def waitforworkers(blocking=True):
195 def waitforworkers(blocking=True):
183 for pid in pids.copy():
196 for pid in pids.copy():
184 p = st = 0
197 p = st = 0
185 try:
198 try:
186 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
199 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
187 except ChildProcessError:
200 except ChildProcessError:
188 # child would already be reaped, but pids yet been
201 # child would already be reaped, but pids yet been
189 # updated (maybe interrupted just after waitpid)
202 # updated (maybe interrupted just after waitpid)
190 pids.discard(pid)
203 pids.discard(pid)
191 if not p:
204 if not p:
192 # skip subsequent steps, because child process should
205 # skip subsequent steps, because child process should
193 # be still running in this case
206 # be still running in this case
194 continue
207 continue
195 pids.discard(p)
208 pids.discard(p)
196 st = _exitstatus(st)
209 st = _exitstatus(st)
197 if st and not problem[0]:
210 if st and not problem[0]:
198 problem[0] = st
211 problem[0] = st
199
212
200 def sigchldhandler(signum, frame):
213 def sigchldhandler(signum, frame):
201 waitforworkers(blocking=False)
214 waitforworkers(blocking=False)
202 if problem[0]:
215 if problem[0]:
203 killworkers()
216 killworkers()
204
217
205 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
218 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
206 ui.flush()
219 ui.flush()
207 parentpid = os.getpid()
220 parentpid = os.getpid()
208 pipes = []
221 pipes = []
209 retval = {}
222 retval = {}
223
224 if prefork:
225 prefork()
226
210 for pargs in partition(args, min(workers, len(args))):
227 for pargs in partition(args, min(workers, len(args))):
211 # Every worker gets its own pipe to send results on, so we don't have to
228 # Every worker gets its own pipe to send results on, so we don't have to
212 # implement atomic writes larger than PIPE_BUF. Each forked process has
229 # implement atomic writes larger than PIPE_BUF. Each forked process has
213 # its own pipe's descriptors in the local variables, and the parent
230 # its own pipe's descriptors in the local variables, and the parent
214 # process has the full list of pipe descriptors (and it doesn't really
231 # process has the full list of pipe descriptors (and it doesn't really
215 # care what order they're in).
232 # care what order they're in).
216 rfd, wfd = os.pipe()
233 rfd, wfd = os.pipe()
217 pipes.append((rfd, wfd))
234 pipes.append((rfd, wfd))
218 # make sure we use os._exit in all worker code paths. otherwise the
235 # make sure we use os._exit in all worker code paths. otherwise the
219 # worker may do some clean-ups which could cause surprises like
236 # worker may do some clean-ups which could cause surprises like
220 # deadlock. see sshpeer.cleanup for example.
237 # deadlock. see sshpeer.cleanup for example.
221 # override error handling *before* fork. this is necessary because
238 # override error handling *before* fork. this is necessary because
222 # exception (signal) may arrive after fork, before "pid =" assignment
239 # exception (signal) may arrive after fork, before "pid =" assignment
223 # completes, and other exception handler (dispatch.py) can lead to
240 # completes, and other exception handler (dispatch.py) can lead to
224 # unexpected code path without os._exit.
241 # unexpected code path without os._exit.
225 ret = -1
242 ret = -1
226 try:
243 try:
227 pid = os.fork()
244 pid = os.fork()
228 if pid == 0:
245 if pid == 0:
229 signal.signal(signal.SIGINT, oldhandler)
246 signal.signal(signal.SIGINT, oldhandler)
230 signal.signal(signal.SIGCHLD, oldchldhandler)
247 signal.signal(signal.SIGCHLD, oldchldhandler)
231
248
232 def workerfunc():
249 def workerfunc():
233 for r, w in pipes[:-1]:
250 for r, w in pipes[:-1]:
234 os.close(r)
251 os.close(r)
235 os.close(w)
252 os.close(w)
236 os.close(rfd)
253 os.close(rfd)
237 with os.fdopen(wfd, 'wb') as wf:
254 with os.fdopen(wfd, 'wb') as wf:
238 for result in func(*(staticargs + (pargs,))):
255 for result in func(*(staticargs + (pargs,))):
239 pickle.dump(result, wf)
256 pickle.dump(result, wf)
240 wf.flush()
257 wf.flush()
241 return 0
258 return 0
242
259
243 ret = scmutil.callcatch(ui, workerfunc)
260 ret = scmutil.callcatch(ui, workerfunc)
244 except: # parent re-raises, child never returns
261 except: # parent re-raises, child never returns
245 if os.getpid() == parentpid:
262 if os.getpid() == parentpid:
246 raise
263 raise
247 exctype = sys.exc_info()[0]
264 exctype = sys.exc_info()[0]
248 force = not issubclass(exctype, KeyboardInterrupt)
265 force = not issubclass(exctype, KeyboardInterrupt)
249 ui.traceback(force=force)
266 ui.traceback(force=force)
250 finally:
267 finally:
251 if os.getpid() != parentpid:
268 if os.getpid() != parentpid:
252 try:
269 try:
253 ui.flush()
270 ui.flush()
254 except: # never returns, no re-raises
271 except: # never returns, no re-raises
255 pass
272 pass
256 finally:
273 finally:
257 os._exit(ret & 255)
274 os._exit(ret & 255)
258 pids.add(pid)
275 pids.add(pid)
259 selector = selectors.DefaultSelector()
276 selector = selectors.DefaultSelector()
260 for rfd, wfd in pipes:
277 for rfd, wfd in pipes:
261 os.close(wfd)
278 os.close(wfd)
262 # The stream has to be unbuffered. Otherwise, if all data is read from
279 # The stream has to be unbuffered. Otherwise, if all data is read from
263 # the raw file into the buffer, the selector thinks that the FD is not
280 # the raw file into the buffer, the selector thinks that the FD is not
264 # ready to read while pickle.load() could read from the buffer. This
281 # ready to read while pickle.load() could read from the buffer. This
265 # would delay the processing of readable items.
282 # would delay the processing of readable items.
266 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
267
284
268 def cleanup():
285 def cleanup():
269 signal.signal(signal.SIGINT, oldhandler)
286 signal.signal(signal.SIGINT, oldhandler)
270 waitforworkers()
287 waitforworkers()
271 signal.signal(signal.SIGCHLD, oldchldhandler)
288 signal.signal(signal.SIGCHLD, oldchldhandler)
272 selector.close()
289 selector.close()
273 return problem[0]
290 return problem[0]
274
291
275 try:
292 try:
276 openpipes = len(pipes)
293 openpipes = len(pipes)
277 while openpipes > 0:
294 while openpipes > 0:
278 for key, events in selector.select():
295 for key, events in selector.select():
279 try:
296 try:
280 # The pytype error likely goes away on a modern version of
297 # The pytype error likely goes away on a modern version of
281 # pytype having a modern typeshed snapshot.
298 # pytype having a modern typeshed snapshot.
282 # pytype: disable=wrong-arg-types
299 # pytype: disable=wrong-arg-types
283 res = pickle.load(_blockingreader(key.fileobj))
300 res = pickle.load(_blockingreader(key.fileobj))
284 # pytype: enable=wrong-arg-types
301 # pytype: enable=wrong-arg-types
285 if hasretval and res[0]:
302 if hasretval and res[0]:
286 retval.update(res[1])
303 retval.update(res[1])
287 else:
304 else:
288 yield res
305 yield res
289 except EOFError:
306 except EOFError:
290 selector.unregister(key.fileobj)
307 selector.unregister(key.fileobj)
291 # pytype: disable=attribute-error
308 # pytype: disable=attribute-error
292 key.fileobj.close()
309 key.fileobj.close()
293 # pytype: enable=attribute-error
310 # pytype: enable=attribute-error
294 openpipes -= 1
311 openpipes -= 1
295 except: # re-raises
312 except: # re-raises
296 killworkers()
313 killworkers()
297 cleanup()
314 cleanup()
298 raise
315 raise
299 status = cleanup()
316 status = cleanup()
300 if status:
317 if status:
301 if status < 0:
318 if status < 0:
302 os.kill(os.getpid(), -status)
319 os.kill(os.getpid(), -status)
303 raise error.WorkerError(status)
320 raise error.WorkerError(status)
304 if hasretval:
321 if hasretval:
305 yield True, retval
322 yield True, retval
306
323
307
324
308 def _posixexitstatus(code):
325 def _posixexitstatus(code):
309 """convert a posix exit status into the same form returned by
326 """convert a posix exit status into the same form returned by
310 os.spawnv
327 os.spawnv
311
328
312 returns None if the process was stopped instead of exiting"""
329 returns None if the process was stopped instead of exiting"""
313 if os.WIFEXITED(code):
330 if os.WIFEXITED(code):
314 return os.WEXITSTATUS(code)
331 return os.WEXITSTATUS(code)
315 elif os.WIFSIGNALED(code):
332 elif os.WIFSIGNALED(code):
316 return -(os.WTERMSIG(code))
333 return -(os.WTERMSIG(code))
317
334
318
335
319 def _windowsworker(ui, func, staticargs, args, hasretval):
336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
320 class Worker(threading.Thread):
337 class Worker(threading.Thread):
321 def __init__(
338 def __init__(
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 ):
340 ):
324 threading.Thread.__init__(self, *args, **kwargs)
341 threading.Thread.__init__(self, *args, **kwargs)
325 self._taskqueue = taskqueue
342 self._taskqueue = taskqueue
326 self._resultqueue = resultqueue
343 self._resultqueue = resultqueue
327 self._func = func
344 self._func = func
328 self._staticargs = staticargs
345 self._staticargs = staticargs
329 self._interrupted = False
346 self._interrupted = False
330 self.daemon = True
347 self.daemon = True
331 self.exception = None
348 self.exception = None
332
349
333 def interrupt(self):
350 def interrupt(self):
334 self._interrupted = True
351 self._interrupted = True
335
352
336 def run(self):
353 def run(self):
337 try:
354 try:
338 while not self._taskqueue.empty():
355 while not self._taskqueue.empty():
339 try:
356 try:
340 args = self._taskqueue.get_nowait()
357 args = self._taskqueue.get_nowait()
341 for res in self._func(*self._staticargs + (args,)):
358 for res in self._func(*self._staticargs + (args,)):
342 self._resultqueue.put(res)
359 self._resultqueue.put(res)
343 # threading doesn't provide a native way to
360 # threading doesn't provide a native way to
344 # interrupt execution. handle it manually at every
361 # interrupt execution. handle it manually at every
345 # iteration.
362 # iteration.
346 if self._interrupted:
363 if self._interrupted:
347 return
364 return
348 except pycompat.queue.Empty:
365 except pycompat.queue.Empty:
349 break
366 break
350 except Exception as e:
367 except Exception as e:
351 # store the exception such that the main thread can resurface
368 # store the exception such that the main thread can resurface
352 # it as if the func was running without workers.
369 # it as if the func was running without workers.
353 self.exception = e
370 self.exception = e
354 raise
371 raise
355
372
356 threads = []
373 threads = []
357
374
358 def trykillworkers():
375 def trykillworkers():
359 # Allow up to 1 second to clean worker threads nicely
376 # Allow up to 1 second to clean worker threads nicely
360 cleanupend = time.time() + 1
377 cleanupend = time.time() + 1
361 for t in threads:
378 for t in threads:
362 t.interrupt()
379 t.interrupt()
363 for t in threads:
380 for t in threads:
364 remainingtime = cleanupend - time.time()
381 remainingtime = cleanupend - time.time()
365 t.join(remainingtime)
382 t.join(remainingtime)
366 if t.is_alive():
383 if t.is_alive():
367 # pass over the workers joining failure. it is more
384 # pass over the workers joining failure. it is more
368 # important to surface the inital exception than the
385 # important to surface the inital exception than the
369 # fact that one of workers may be processing a large
386 # fact that one of workers may be processing a large
370 # task and does not get to handle the interruption.
387 # task and does not get to handle the interruption.
371 ui.warn(
388 ui.warn(
372 _(
389 _(
373 b"failed to kill worker threads while "
390 b"failed to kill worker threads while "
374 b"handling an exception\n"
391 b"handling an exception\n"
375 )
392 )
376 )
393 )
377 return
394 return
378
395
379 workers = _numworkers(ui)
396 workers = _numworkers(ui)
380 resultqueue = pycompat.queue.Queue()
397 resultqueue = pycompat.queue.Queue()
381 taskqueue = pycompat.queue.Queue()
398 taskqueue = pycompat.queue.Queue()
382 retval = {}
399 retval = {}
383 # partition work to more pieces than workers to minimize the chance
400 # partition work to more pieces than workers to minimize the chance
384 # of uneven distribution of large tasks between the workers
401 # of uneven distribution of large tasks between the workers
385 for pargs in partition(args, workers * 20):
402 for pargs in partition(args, workers * 20):
386 taskqueue.put(pargs)
403 taskqueue.put(pargs)
387 for _i in range(workers):
404 for _i in range(workers):
388 t = Worker(taskqueue, resultqueue, func, staticargs)
405 t = Worker(taskqueue, resultqueue, func, staticargs)
389 threads.append(t)
406 threads.append(t)
390 t.start()
407 t.start()
391 try:
408 try:
392 while len(threads) > 0:
409 while len(threads) > 0:
393 while not resultqueue.empty():
410 while not resultqueue.empty():
394 res = resultqueue.get()
411 res = resultqueue.get()
395 if hasretval and res[0]:
412 if hasretval and res[0]:
396 retval.update(res[1])
413 retval.update(res[1])
397 else:
414 else:
398 yield res
415 yield res
399 threads[0].join(0.05)
416 threads[0].join(0.05)
400 finishedthreads = [_t for _t in threads if not _t.is_alive()]
417 finishedthreads = [_t for _t in threads if not _t.is_alive()]
401 for t in finishedthreads:
418 for t in finishedthreads:
402 if t.exception is not None:
419 if t.exception is not None:
403 raise t.exception
420 raise t.exception
404 threads.remove(t)
421 threads.remove(t)
405 except (Exception, KeyboardInterrupt): # re-raises
422 except (Exception, KeyboardInterrupt): # re-raises
406 trykillworkers()
423 trykillworkers()
407 raise
424 raise
408 while not resultqueue.empty():
425 while not resultqueue.empty():
409 res = resultqueue.get()
426 res = resultqueue.get()
410 if hasretval and res[0]:
427 if hasretval and res[0]:
411 retval.update(res[1])
428 retval.update(res[1])
412 else:
429 else:
413 yield res
430 yield res
414 if hasretval:
431 if hasretval:
415 yield True, retval
432 yield True, retval
416
433
417
434
418 if pycompat.iswindows:
435 if pycompat.iswindows:
419 _platformworker = _windowsworker
436 _platformworker = _windowsworker
420 else:
437 else:
421 _platformworker = _posixworker
438 _platformworker = _posixworker
422 _exitstatus = _posixexitstatus
439 _exitstatus = _posixexitstatus
423
440
424
441
425 def partition(lst, nslices):
442 def partition(lst, nslices):
426 """partition a list into N slices of roughly equal size
443 """partition a list into N slices of roughly equal size
427
444
428 The current strategy takes every Nth element from the input. If
445 The current strategy takes every Nth element from the input. If
429 we ever write workers that need to preserve grouping in input
446 we ever write workers that need to preserve grouping in input
430 we should consider allowing callers to specify a partition strategy.
447 we should consider allowing callers to specify a partition strategy.
431
448
432 olivia is not a fan of this partitioning strategy when files are involved.
449 olivia is not a fan of this partitioning strategy when files are involved.
433 In his words:
450 In his words:
434
451
435 Single-threaded Mercurial makes a point of creating and visiting
452 Single-threaded Mercurial makes a point of creating and visiting
436 files in a fixed order (alphabetical). When creating files in order,
453 files in a fixed order (alphabetical). When creating files in order,
437 a typical filesystem is likely to allocate them on nearby regions on
454 a typical filesystem is likely to allocate them on nearby regions on
438 disk. Thus, when revisiting in the same order, locality is maximized
455 disk. Thus, when revisiting in the same order, locality is maximized
439 and various forms of OS and disk-level caching and read-ahead get a
456 and various forms of OS and disk-level caching and read-ahead get a
440 chance to work.
457 chance to work.
441
458
442 This effect can be quite significant on spinning disks. I discovered it
459 This effect can be quite significant on spinning disks. I discovered it
443 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
460 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
444 Tarring a repo and copying it to another disk effectively randomized
461 Tarring a repo and copying it to another disk effectively randomized
445 the revlog ordering on disk by sorting the revlogs by hash and suddenly
462 the revlog ordering on disk by sorting the revlogs by hash and suddenly
446 performance of my kernel checkout benchmark dropped by ~10x because the
463 performance of my kernel checkout benchmark dropped by ~10x because the
447 "working set" of sectors visited no longer fit in the drive's cache and
464 "working set" of sectors visited no longer fit in the drive's cache and
448 the workload switched from streaming to random I/O.
465 the workload switched from streaming to random I/O.
449
466
450 What we should really be doing is have workers read filenames from a
467 What we should really be doing is have workers read filenames from a
451 ordered queue. This preserves locality and also keeps any worker from
468 ordered queue. This preserves locality and also keeps any worker from
452 getting more than one file out of balance.
469 getting more than one file out of balance.
453 """
470 """
454 for i in range(nslices):
471 for i in range(nslices):
455 yield lst[i::nslices]
472 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now