##// END OF EJS Templates
branching: fix wrong merge conflict resolution from 13dfad0f9f7a...
Manuel Jacob -
r50119:cdb85d05 default
parent child Browse files
Show More
@@ -1,505 +1,474 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader(object):
71 class _blockingreader:
72 def __init__(self, wrapped):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
73 self._wrapped = wrapped
74
74
75 # Do NOT implement readinto() by making it delegate to
75 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
77 # with just read() and readline(), so we don't need to implement it.
78
78
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
80
80
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
82 def readinto(self, b):
82 def readinto(self, b):
83 pos = 0
83 pos = 0
84 size = len(b)
84 size = len(b)
85
85
86 while pos < size:
86 while pos < size:
87 ret = self._wrapped.readinto(b[pos:])
87 ret = self._wrapped.readinto(b[pos:])
88 if not ret:
88 if not ret:
89 break
89 break
90 pos += ret
90 pos += ret
91
91
92 return pos
92 return pos
93
93
94 def readline(self):
94 def readline(self):
95 return self._wrapped.readline()
95 return self._wrapped.readline()
96
96
97 # issue multiple reads until size is fulfilled
97 # issue multiple reads until size is fulfilled
98 def read(self, size=-1):
98 def read(self, size=-1):
99 if size < 0:
99 if size < 0:
100 return self._wrapped.readall()
100 return self._wrapped.readall()
101
101
102 buf = bytearray(size)
102 buf = bytearray(size)
103 view = memoryview(buf)
103 view = memoryview(buf)
104 pos = 0
104 pos = 0
105
105
106 while pos < size:
106 while pos < size:
107 ret = self._wrapped.readinto(view[pos:])
107 ret = self._wrapped.readinto(view[pos:])
108 if not ret:
108 if not ret:
109 break
109 break
110 pos += ret
110 pos += ret
111
111
112 del view
112 del view
113 del buf[pos:]
113 del buf[pos:]
114 return bytes(buf)
114 return bytes(buf)
115
115
116
116
117 class _blockingreader:
118 def __init__(self, wrapped):
119 self._wrapped = wrapped
120
121 # Do NOT implement readinto() by making it delegate to
122 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
123 # with just read() and readline(), so we don't need to implement it.
124
125 def readline(self):
126 return self._wrapped.readline()
127
128 # issue multiple reads until size is fulfilled
129 def read(self, size=-1):
130 if size < 0:
131 return self._wrapped.readall()
132
133 buf = bytearray(size)
134 view = memoryview(buf)
135 pos = 0
136
137 while pos < size:
138 ret = self._wrapped.readinto(view[pos:])
139 if not ret:
140 break
141 pos += ret
142
143 del view
144 del buf[pos:]
145 return bytes(buf)
146
147
148 if pycompat.isposix or pycompat.iswindows:
117 if pycompat.isposix or pycompat.iswindows:
149 _STARTUP_COST = 0.01
118 _STARTUP_COST = 0.01
150 # The Windows worker is thread based. If tasks are CPU bound, threads
119 # The Windows worker is thread based. If tasks are CPU bound, threads
151 # in the presence of the GIL result in excessive context switching and
120 # in the presence of the GIL result in excessive context switching and
152 # this overhead can slow down execution.
121 # this overhead can slow down execution.
153 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
154 else:
123 else:
155 _STARTUP_COST = 1e30
124 _STARTUP_COST = 1e30
156 _DISALLOW_THREAD_UNSAFE = False
125 _DISALLOW_THREAD_UNSAFE = False
157
126
158
127
159 def worthwhile(ui, costperop, nops, threadsafe=True):
128 def worthwhile(ui, costperop, nops, threadsafe=True):
160 """try to determine whether the benefit of multiple processes can
129 """try to determine whether the benefit of multiple processes can
161 outweigh the cost of starting them"""
130 outweigh the cost of starting them"""
162
131
163 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
164 return False
133 return False
165
134
166 linear = costperop * nops
135 linear = costperop * nops
167 workers = _numworkers(ui)
136 workers = _numworkers(ui)
168 benefit = linear - (_STARTUP_COST * workers + linear / workers)
137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
169 return benefit >= 0.15
138 return benefit >= 0.15
170
139
171
140
172 def worker(
141 def worker(
173 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
174 ):
143 ):
175 """run a function, possibly in parallel in multiple worker
144 """run a function, possibly in parallel in multiple worker
176 processes.
145 processes.
177
146
178 returns a progress iterator
147 returns a progress iterator
179
148
180 costperarg - cost of a single task
149 costperarg - cost of a single task
181
150
182 func - function to run. It is expected to return a progress iterator.
151 func - function to run. It is expected to return a progress iterator.
183
152
184 staticargs - arguments to pass to every invocation of the function
153 staticargs - arguments to pass to every invocation of the function
185
154
186 args - arguments to split into chunks, to pass to individual
155 args - arguments to split into chunks, to pass to individual
187 workers
156 workers
188
157
189 hasretval - when True, func and the current function return an progress
158 hasretval - when True, func and the current function return an progress
190 iterator then a dict (encoded as an iterator that yield many (False, ..)
159 iterator then a dict (encoded as an iterator that yield many (False, ..)
191 then a (True, dict)). The dicts are joined in some arbitrary order, so
160 then a (True, dict)). The dicts are joined in some arbitrary order, so
192 overlapping keys are a bad idea.
161 overlapping keys are a bad idea.
193
162
194 threadsafe - whether work items are thread safe and can be executed using
163 threadsafe - whether work items are thread safe and can be executed using
195 a thread-based worker. Should be disabled for CPU heavy tasks that don't
164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
196 release the GIL.
165 release the GIL.
197 """
166 """
198 enabled = ui.configbool(b'worker', b'enabled')
167 enabled = ui.configbool(b'worker', b'enabled')
199 if enabled and _platformworker is _posixworker and not ismainthread():
168 if enabled and _platformworker is _posixworker and not ismainthread():
200 # The POSIX worker has to install a handler for SIGCHLD.
169 # The POSIX worker has to install a handler for SIGCHLD.
201 # Python up to 3.9 only allows this in the main thread.
170 # Python up to 3.9 only allows this in the main thread.
202 enabled = False
171 enabled = False
203
172
204 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
205 return _platformworker(ui, func, staticargs, args, hasretval)
174 return _platformworker(ui, func, staticargs, args, hasretval)
206 return func(*staticargs + (args,))
175 return func(*staticargs + (args,))
207
176
208
177
209 def _posixworker(ui, func, staticargs, args, hasretval):
178 def _posixworker(ui, func, staticargs, args, hasretval):
210 workers = _numworkers(ui)
179 workers = _numworkers(ui)
211 oldhandler = signal.getsignal(signal.SIGINT)
180 oldhandler = signal.getsignal(signal.SIGINT)
212 signal.signal(signal.SIGINT, signal.SIG_IGN)
181 signal.signal(signal.SIGINT, signal.SIG_IGN)
213 pids, problem = set(), [0]
182 pids, problem = set(), [0]
214
183
215 def killworkers():
184 def killworkers():
216 # unregister SIGCHLD handler as all children will be killed. This
185 # unregister SIGCHLD handler as all children will be killed. This
217 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
218 # could be updated while iterating, which would cause inconsistency.
187 # could be updated while iterating, which would cause inconsistency.
219 signal.signal(signal.SIGCHLD, oldchldhandler)
188 signal.signal(signal.SIGCHLD, oldchldhandler)
220 # if one worker bails, there's no good reason to wait for the rest
189 # if one worker bails, there's no good reason to wait for the rest
221 for p in pids:
190 for p in pids:
222 try:
191 try:
223 os.kill(p, signal.SIGTERM)
192 os.kill(p, signal.SIGTERM)
224 except OSError as err:
193 except OSError as err:
225 if err.errno != errno.ESRCH:
194 if err.errno != errno.ESRCH:
226 raise
195 raise
227
196
228 def waitforworkers(blocking=True):
197 def waitforworkers(blocking=True):
229 for pid in pids.copy():
198 for pid in pids.copy():
230 p = st = 0
199 p = st = 0
231 while True:
200 while True:
232 try:
201 try:
233 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
234 break
203 break
235 except OSError as e:
204 except OSError as e:
236 if e.errno == errno.EINTR:
205 if e.errno == errno.EINTR:
237 continue
206 continue
238 elif e.errno == errno.ECHILD:
207 elif e.errno == errno.ECHILD:
239 # child would already be reaped, but pids yet been
208 # child would already be reaped, but pids yet been
240 # updated (maybe interrupted just after waitpid)
209 # updated (maybe interrupted just after waitpid)
241 pids.discard(pid)
210 pids.discard(pid)
242 break
211 break
243 else:
212 else:
244 raise
213 raise
245 if not p:
214 if not p:
246 # skip subsequent steps, because child process should
215 # skip subsequent steps, because child process should
247 # be still running in this case
216 # be still running in this case
248 continue
217 continue
249 pids.discard(p)
218 pids.discard(p)
250 st = _exitstatus(st)
219 st = _exitstatus(st)
251 if st and not problem[0]:
220 if st and not problem[0]:
252 problem[0] = st
221 problem[0] = st
253
222
254 def sigchldhandler(signum, frame):
223 def sigchldhandler(signum, frame):
255 waitforworkers(blocking=False)
224 waitforworkers(blocking=False)
256 if problem[0]:
225 if problem[0]:
257 killworkers()
226 killworkers()
258
227
259 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
260 ui.flush()
229 ui.flush()
261 parentpid = os.getpid()
230 parentpid = os.getpid()
262 pipes = []
231 pipes = []
263 retval = {}
232 retval = {}
264 for pargs in partition(args, min(workers, len(args))):
233 for pargs in partition(args, min(workers, len(args))):
265 # Every worker gets its own pipe to send results on, so we don't have to
234 # Every worker gets its own pipe to send results on, so we don't have to
266 # implement atomic writes larger than PIPE_BUF. Each forked process has
235 # implement atomic writes larger than PIPE_BUF. Each forked process has
267 # its own pipe's descriptors in the local variables, and the parent
236 # its own pipe's descriptors in the local variables, and the parent
268 # process has the full list of pipe descriptors (and it doesn't really
237 # process has the full list of pipe descriptors (and it doesn't really
269 # care what order they're in).
238 # care what order they're in).
270 rfd, wfd = os.pipe()
239 rfd, wfd = os.pipe()
271 pipes.append((rfd, wfd))
240 pipes.append((rfd, wfd))
272 # make sure we use os._exit in all worker code paths. otherwise the
241 # make sure we use os._exit in all worker code paths. otherwise the
273 # worker may do some clean-ups which could cause surprises like
242 # worker may do some clean-ups which could cause surprises like
274 # deadlock. see sshpeer.cleanup for example.
243 # deadlock. see sshpeer.cleanup for example.
275 # override error handling *before* fork. this is necessary because
244 # override error handling *before* fork. this is necessary because
276 # exception (signal) may arrive after fork, before "pid =" assignment
245 # exception (signal) may arrive after fork, before "pid =" assignment
277 # completes, and other exception handler (dispatch.py) can lead to
246 # completes, and other exception handler (dispatch.py) can lead to
278 # unexpected code path without os._exit.
247 # unexpected code path without os._exit.
279 ret = -1
248 ret = -1
280 try:
249 try:
281 pid = os.fork()
250 pid = os.fork()
282 if pid == 0:
251 if pid == 0:
283 signal.signal(signal.SIGINT, oldhandler)
252 signal.signal(signal.SIGINT, oldhandler)
284 signal.signal(signal.SIGCHLD, oldchldhandler)
253 signal.signal(signal.SIGCHLD, oldchldhandler)
285
254
286 def workerfunc():
255 def workerfunc():
287 for r, w in pipes[:-1]:
256 for r, w in pipes[:-1]:
288 os.close(r)
257 os.close(r)
289 os.close(w)
258 os.close(w)
290 os.close(rfd)
259 os.close(rfd)
291 for result in func(*(staticargs + (pargs,))):
260 for result in func(*(staticargs + (pargs,))):
292 os.write(wfd, pickle.dumps(result))
261 os.write(wfd, pickle.dumps(result))
293 return 0
262 return 0
294
263
295 ret = scmutil.callcatch(ui, workerfunc)
264 ret = scmutil.callcatch(ui, workerfunc)
296 except: # parent re-raises, child never returns
265 except: # parent re-raises, child never returns
297 if os.getpid() == parentpid:
266 if os.getpid() == parentpid:
298 raise
267 raise
299 exctype = sys.exc_info()[0]
268 exctype = sys.exc_info()[0]
300 force = not issubclass(exctype, KeyboardInterrupt)
269 force = not issubclass(exctype, KeyboardInterrupt)
301 ui.traceback(force=force)
270 ui.traceback(force=force)
302 finally:
271 finally:
303 if os.getpid() != parentpid:
272 if os.getpid() != parentpid:
304 try:
273 try:
305 ui.flush()
274 ui.flush()
306 except: # never returns, no re-raises
275 except: # never returns, no re-raises
307 pass
276 pass
308 finally:
277 finally:
309 os._exit(ret & 255)
278 os._exit(ret & 255)
310 pids.add(pid)
279 pids.add(pid)
311 selector = selectors.DefaultSelector()
280 selector = selectors.DefaultSelector()
312 for rfd, wfd in pipes:
281 for rfd, wfd in pipes:
313 os.close(wfd)
282 os.close(wfd)
314 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
315
284
316 def cleanup():
285 def cleanup():
317 signal.signal(signal.SIGINT, oldhandler)
286 signal.signal(signal.SIGINT, oldhandler)
318 waitforworkers()
287 waitforworkers()
319 signal.signal(signal.SIGCHLD, oldchldhandler)
288 signal.signal(signal.SIGCHLD, oldchldhandler)
320 selector.close()
289 selector.close()
321 return problem[0]
290 return problem[0]
322
291
323 try:
292 try:
324 openpipes = len(pipes)
293 openpipes = len(pipes)
325 while openpipes > 0:
294 while openpipes > 0:
326 for key, events in selector.select():
295 for key, events in selector.select():
327 try:
296 try:
328 # The pytype error likely goes away on a modern version of
297 # The pytype error likely goes away on a modern version of
329 # pytype having a modern typeshed snapshot.
298 # pytype having a modern typeshed snapshot.
330 # pytype: disable=wrong-arg-types
299 # pytype: disable=wrong-arg-types
331 res = pickle.load(_blockingreader(key.fileobj))
300 res = pickle.load(_blockingreader(key.fileobj))
332 # pytype: enable=wrong-arg-types
301 # pytype: enable=wrong-arg-types
333 if hasretval and res[0]:
302 if hasretval and res[0]:
334 retval.update(res[1])
303 retval.update(res[1])
335 else:
304 else:
336 yield res
305 yield res
337 except EOFError:
306 except EOFError:
338 selector.unregister(key.fileobj)
307 selector.unregister(key.fileobj)
339 key.fileobj.close()
308 key.fileobj.close()
340 openpipes -= 1
309 openpipes -= 1
341 except IOError as e:
310 except IOError as e:
342 if e.errno == errno.EINTR:
311 if e.errno == errno.EINTR:
343 continue
312 continue
344 raise
313 raise
345 except: # re-raises
314 except: # re-raises
346 killworkers()
315 killworkers()
347 cleanup()
316 cleanup()
348 raise
317 raise
349 status = cleanup()
318 status = cleanup()
350 if status:
319 if status:
351 if status < 0:
320 if status < 0:
352 os.kill(os.getpid(), -status)
321 os.kill(os.getpid(), -status)
353 raise error.WorkerError(status)
322 raise error.WorkerError(status)
354 if hasretval:
323 if hasretval:
355 yield True, retval
324 yield True, retval
356
325
357
326
358 def _posixexitstatus(code):
327 def _posixexitstatus(code):
359 """convert a posix exit status into the same form returned by
328 """convert a posix exit status into the same form returned by
360 os.spawnv
329 os.spawnv
361
330
362 returns None if the process was stopped instead of exiting"""
331 returns None if the process was stopped instead of exiting"""
363 if os.WIFEXITED(code):
332 if os.WIFEXITED(code):
364 return os.WEXITSTATUS(code)
333 return os.WEXITSTATUS(code)
365 elif os.WIFSIGNALED(code):
334 elif os.WIFSIGNALED(code):
366 return -(os.WTERMSIG(code))
335 return -(os.WTERMSIG(code))
367
336
368
337
369 def _windowsworker(ui, func, staticargs, args, hasretval):
338 def _windowsworker(ui, func, staticargs, args, hasretval):
370 class Worker(threading.Thread):
339 class Worker(threading.Thread):
371 def __init__(
340 def __init__(
372 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
341 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
373 ):
342 ):
374 threading.Thread.__init__(self, *args, **kwargs)
343 threading.Thread.__init__(self, *args, **kwargs)
375 self._taskqueue = taskqueue
344 self._taskqueue = taskqueue
376 self._resultqueue = resultqueue
345 self._resultqueue = resultqueue
377 self._func = func
346 self._func = func
378 self._staticargs = staticargs
347 self._staticargs = staticargs
379 self._interrupted = False
348 self._interrupted = False
380 self.daemon = True
349 self.daemon = True
381 self.exception = None
350 self.exception = None
382
351
383 def interrupt(self):
352 def interrupt(self):
384 self._interrupted = True
353 self._interrupted = True
385
354
386 def run(self):
355 def run(self):
387 try:
356 try:
388 while not self._taskqueue.empty():
357 while not self._taskqueue.empty():
389 try:
358 try:
390 args = self._taskqueue.get_nowait()
359 args = self._taskqueue.get_nowait()
391 for res in self._func(*self._staticargs + (args,)):
360 for res in self._func(*self._staticargs + (args,)):
392 self._resultqueue.put(res)
361 self._resultqueue.put(res)
393 # threading doesn't provide a native way to
362 # threading doesn't provide a native way to
394 # interrupt execution. handle it manually at every
363 # interrupt execution. handle it manually at every
395 # iteration.
364 # iteration.
396 if self._interrupted:
365 if self._interrupted:
397 return
366 return
398 except pycompat.queue.Empty:
367 except pycompat.queue.Empty:
399 break
368 break
400 except Exception as e:
369 except Exception as e:
401 # store the exception such that the main thread can resurface
370 # store the exception such that the main thread can resurface
402 # it as if the func was running without workers.
371 # it as if the func was running without workers.
403 self.exception = e
372 self.exception = e
404 raise
373 raise
405
374
406 threads = []
375 threads = []
407
376
408 def trykillworkers():
377 def trykillworkers():
409 # Allow up to 1 second to clean worker threads nicely
378 # Allow up to 1 second to clean worker threads nicely
410 cleanupend = time.time() + 1
379 cleanupend = time.time() + 1
411 for t in threads:
380 for t in threads:
412 t.interrupt()
381 t.interrupt()
413 for t in threads:
382 for t in threads:
414 remainingtime = cleanupend - time.time()
383 remainingtime = cleanupend - time.time()
415 t.join(remainingtime)
384 t.join(remainingtime)
416 if t.is_alive():
385 if t.is_alive():
417 # pass over the workers joining failure. it is more
386 # pass over the workers joining failure. it is more
418 # important to surface the inital exception than the
387 # important to surface the inital exception than the
419 # fact that one of workers may be processing a large
388 # fact that one of workers may be processing a large
420 # task and does not get to handle the interruption.
389 # task and does not get to handle the interruption.
421 ui.warn(
390 ui.warn(
422 _(
391 _(
423 b"failed to kill worker threads while "
392 b"failed to kill worker threads while "
424 b"handling an exception\n"
393 b"handling an exception\n"
425 )
394 )
426 )
395 )
427 return
396 return
428
397
429 workers = _numworkers(ui)
398 workers = _numworkers(ui)
430 resultqueue = pycompat.queue.Queue()
399 resultqueue = pycompat.queue.Queue()
431 taskqueue = pycompat.queue.Queue()
400 taskqueue = pycompat.queue.Queue()
432 retval = {}
401 retval = {}
433 # partition work to more pieces than workers to minimize the chance
402 # partition work to more pieces than workers to minimize the chance
434 # of uneven distribution of large tasks between the workers
403 # of uneven distribution of large tasks between the workers
435 for pargs in partition(args, workers * 20):
404 for pargs in partition(args, workers * 20):
436 taskqueue.put(pargs)
405 taskqueue.put(pargs)
437 for _i in range(workers):
406 for _i in range(workers):
438 t = Worker(taskqueue, resultqueue, func, staticargs)
407 t = Worker(taskqueue, resultqueue, func, staticargs)
439 threads.append(t)
408 threads.append(t)
440 t.start()
409 t.start()
441 try:
410 try:
442 while len(threads) > 0:
411 while len(threads) > 0:
443 while not resultqueue.empty():
412 while not resultqueue.empty():
444 res = resultqueue.get()
413 res = resultqueue.get()
445 if hasretval and res[0]:
414 if hasretval and res[0]:
446 retval.update(res[1])
415 retval.update(res[1])
447 else:
416 else:
448 yield res
417 yield res
449 threads[0].join(0.05)
418 threads[0].join(0.05)
450 finishedthreads = [_t for _t in threads if not _t.is_alive()]
419 finishedthreads = [_t for _t in threads if not _t.is_alive()]
451 for t in finishedthreads:
420 for t in finishedthreads:
452 if t.exception is not None:
421 if t.exception is not None:
453 raise t.exception
422 raise t.exception
454 threads.remove(t)
423 threads.remove(t)
455 except (Exception, KeyboardInterrupt): # re-raises
424 except (Exception, KeyboardInterrupt): # re-raises
456 trykillworkers()
425 trykillworkers()
457 raise
426 raise
458 while not resultqueue.empty():
427 while not resultqueue.empty():
459 res = resultqueue.get()
428 res = resultqueue.get()
460 if hasretval and res[0]:
429 if hasretval and res[0]:
461 retval.update(res[1])
430 retval.update(res[1])
462 else:
431 else:
463 yield res
432 yield res
464 if hasretval:
433 if hasretval:
465 yield True, retval
434 yield True, retval
466
435
467
436
468 if pycompat.iswindows:
437 if pycompat.iswindows:
469 _platformworker = _windowsworker
438 _platformworker = _windowsworker
470 else:
439 else:
471 _platformworker = _posixworker
440 _platformworker = _posixworker
472 _exitstatus = _posixexitstatus
441 _exitstatus = _posixexitstatus
473
442
474
443
475 def partition(lst, nslices):
444 def partition(lst, nslices):
476 """partition a list into N slices of roughly equal size
445 """partition a list into N slices of roughly equal size
477
446
478 The current strategy takes every Nth element from the input. If
447 The current strategy takes every Nth element from the input. If
479 we ever write workers that need to preserve grouping in input
448 we ever write workers that need to preserve grouping in input
480 we should consider allowing callers to specify a partition strategy.
449 we should consider allowing callers to specify a partition strategy.
481
450
482 olivia is not a fan of this partitioning strategy when files are involved.
451 olivia is not a fan of this partitioning strategy when files are involved.
483 In his words:
452 In his words:
484
453
485 Single-threaded Mercurial makes a point of creating and visiting
454 Single-threaded Mercurial makes a point of creating and visiting
486 files in a fixed order (alphabetical). When creating files in order,
455 files in a fixed order (alphabetical). When creating files in order,
487 a typical filesystem is likely to allocate them on nearby regions on
456 a typical filesystem is likely to allocate them on nearby regions on
488 disk. Thus, when revisiting in the same order, locality is maximized
457 disk. Thus, when revisiting in the same order, locality is maximized
489 and various forms of OS and disk-level caching and read-ahead get a
458 and various forms of OS and disk-level caching and read-ahead get a
490 chance to work.
459 chance to work.
491
460
492 This effect can be quite significant on spinning disks. I discovered it
461 This effect can be quite significant on spinning disks. I discovered it
493 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
462 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
494 Tarring a repo and copying it to another disk effectively randomized
463 Tarring a repo and copying it to another disk effectively randomized
495 the revlog ordering on disk by sorting the revlogs by hash and suddenly
464 the revlog ordering on disk by sorting the revlogs by hash and suddenly
496 performance of my kernel checkout benchmark dropped by ~10x because the
465 performance of my kernel checkout benchmark dropped by ~10x because the
497 "working set" of sectors visited no longer fit in the drive's cache and
466 "working set" of sectors visited no longer fit in the drive's cache and
498 the workload switched from streaming to random I/O.
467 the workload switched from streaming to random I/O.
499
468
500 What we should really be doing is have workers read filenames from a
469 What we should really be doing is have workers read filenames from a
501 ordered queue. This preserves locality and also keeps any worker from
470 ordered queue. This preserves locality and also keeps any worker from
502 getting more than one file out of balance.
471 getting more than one file out of balance.
503 """
472 """
504 for i in range(nslices):
473 for i in range(nslices):
505 yield lst[i::nslices]
474 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now