##// END OF EJS Templates
typing: disable a module-attr warning in the worker module's py2 code...
Matt Harbison -
r47525:f6480620 stable
parent child Browse files
Show More
@@ -1,466 +1,468 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.ispy3:
68 if pycompat.ispy3:
69
69
70 def ismainthread():
70 def ismainthread():
71 return threading.current_thread() == threading.main_thread()
71 return threading.current_thread() == threading.main_thread()
72
72
73 class _blockingreader(object):
73 class _blockingreader(object):
74 def __init__(self, wrapped):
74 def __init__(self, wrapped):
75 self._wrapped = wrapped
75 self._wrapped = wrapped
76
76
77 # Do NOT implement readinto() by making it delegate to
77 # Do NOT implement readinto() by making it delegate to
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
79 # with just read() and readline(), so we don't need to implement it.
79 # with just read() and readline(), so we don't need to implement it.
80
80
81 def readline(self):
81 def readline(self):
82 return self._wrapped.readline()
82 return self._wrapped.readline()
83
83
84 # issue multiple reads until size is fulfilled
84 # issue multiple reads until size is fulfilled
85 def read(self, size=-1):
85 def read(self, size=-1):
86 if size < 0:
86 if size < 0:
87 return self._wrapped.readall()
87 return self._wrapped.readall()
88
88
89 buf = bytearray(size)
89 buf = bytearray(size)
90 view = memoryview(buf)
90 view = memoryview(buf)
91 pos = 0
91 pos = 0
92
92
93 while pos < size:
93 while pos < size:
94 ret = self._wrapped.readinto(view[pos:])
94 ret = self._wrapped.readinto(view[pos:])
95 if not ret:
95 if not ret:
96 break
96 break
97 pos += ret
97 pos += ret
98
98
99 del view
99 del view
100 del buf[pos:]
100 del buf[pos:]
101 return bytes(buf)
101 return bytes(buf)
102
102
103
103
104 else:
104 else:
105
105
106 def ismainthread():
106 def ismainthread():
107 # pytype: disable=module-attr
107 return isinstance(threading.current_thread(), threading._MainThread)
108 return isinstance(threading.current_thread(), threading._MainThread)
109 # pytype: enable=module-attr
108
110
109 def _blockingreader(wrapped):
111 def _blockingreader(wrapped):
110 return wrapped
112 return wrapped
111
113
112
114
113 if pycompat.isposix or pycompat.iswindows:
115 if pycompat.isposix or pycompat.iswindows:
114 _STARTUP_COST = 0.01
116 _STARTUP_COST = 0.01
115 # The Windows worker is thread based. If tasks are CPU bound, threads
117 # The Windows worker is thread based. If tasks are CPU bound, threads
116 # in the presence of the GIL result in excessive context switching and
118 # in the presence of the GIL result in excessive context switching and
117 # this overhead can slow down execution.
119 # this overhead can slow down execution.
118 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
120 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
119 else:
121 else:
120 _STARTUP_COST = 1e30
122 _STARTUP_COST = 1e30
121 _DISALLOW_THREAD_UNSAFE = False
123 _DISALLOW_THREAD_UNSAFE = False
122
124
123
125
124 def worthwhile(ui, costperop, nops, threadsafe=True):
126 def worthwhile(ui, costperop, nops, threadsafe=True):
125 """try to determine whether the benefit of multiple processes can
127 """try to determine whether the benefit of multiple processes can
126 outweigh the cost of starting them"""
128 outweigh the cost of starting them"""
127
129
128 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
130 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
129 return False
131 return False
130
132
131 linear = costperop * nops
133 linear = costperop * nops
132 workers = _numworkers(ui)
134 workers = _numworkers(ui)
133 benefit = linear - (_STARTUP_COST * workers + linear / workers)
135 benefit = linear - (_STARTUP_COST * workers + linear / workers)
134 return benefit >= 0.15
136 return benefit >= 0.15
135
137
136
138
137 def worker(
139 def worker(
138 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
140 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
139 ):
141 ):
140 """run a function, possibly in parallel in multiple worker
142 """run a function, possibly in parallel in multiple worker
141 processes.
143 processes.
142
144
143 returns a progress iterator
145 returns a progress iterator
144
146
145 costperarg - cost of a single task
147 costperarg - cost of a single task
146
148
147 func - function to run. It is expected to return a progress iterator.
149 func - function to run. It is expected to return a progress iterator.
148
150
149 staticargs - arguments to pass to every invocation of the function
151 staticargs - arguments to pass to every invocation of the function
150
152
151 args - arguments to split into chunks, to pass to individual
153 args - arguments to split into chunks, to pass to individual
152 workers
154 workers
153
155
154 hasretval - when True, func and the current function return an progress
156 hasretval - when True, func and the current function return an progress
155 iterator then a dict (encoded as an iterator that yield many (False, ..)
157 iterator then a dict (encoded as an iterator that yield many (False, ..)
156 then a (True, dict)). The dicts are joined in some arbitrary order, so
158 then a (True, dict)). The dicts are joined in some arbitrary order, so
157 overlapping keys are a bad idea.
159 overlapping keys are a bad idea.
158
160
159 threadsafe - whether work items are thread safe and can be executed using
161 threadsafe - whether work items are thread safe and can be executed using
160 a thread-based worker. Should be disabled for CPU heavy tasks that don't
162 a thread-based worker. Should be disabled for CPU heavy tasks that don't
161 release the GIL.
163 release the GIL.
162 """
164 """
163 enabled = ui.configbool(b'worker', b'enabled')
165 enabled = ui.configbool(b'worker', b'enabled')
164 if enabled and _platformworker is _posixworker and not ismainthread():
166 if enabled and _platformworker is _posixworker and not ismainthread():
165 # The POSIX worker has to install a handler for SIGCHLD.
167 # The POSIX worker has to install a handler for SIGCHLD.
166 # Python up to 3.9 only allows this in the main thread.
168 # Python up to 3.9 only allows this in the main thread.
167 enabled = False
169 enabled = False
168
170
169 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
171 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
170 return _platformworker(ui, func, staticargs, args, hasretval)
172 return _platformworker(ui, func, staticargs, args, hasretval)
171 return func(*staticargs + (args,))
173 return func(*staticargs + (args,))
172
174
173
175
174 def _posixworker(ui, func, staticargs, args, hasretval):
176 def _posixworker(ui, func, staticargs, args, hasretval):
175 workers = _numworkers(ui)
177 workers = _numworkers(ui)
176 oldhandler = signal.getsignal(signal.SIGINT)
178 oldhandler = signal.getsignal(signal.SIGINT)
177 signal.signal(signal.SIGINT, signal.SIG_IGN)
179 signal.signal(signal.SIGINT, signal.SIG_IGN)
178 pids, problem = set(), [0]
180 pids, problem = set(), [0]
179
181
180 def killworkers():
182 def killworkers():
181 # unregister SIGCHLD handler as all children will be killed. This
183 # unregister SIGCHLD handler as all children will be killed. This
182 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
184 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
183 # could be updated while iterating, which would cause inconsistency.
185 # could be updated while iterating, which would cause inconsistency.
184 signal.signal(signal.SIGCHLD, oldchldhandler)
186 signal.signal(signal.SIGCHLD, oldchldhandler)
185 # if one worker bails, there's no good reason to wait for the rest
187 # if one worker bails, there's no good reason to wait for the rest
186 for p in pids:
188 for p in pids:
187 try:
189 try:
188 os.kill(p, signal.SIGTERM)
190 os.kill(p, signal.SIGTERM)
189 except OSError as err:
191 except OSError as err:
190 if err.errno != errno.ESRCH:
192 if err.errno != errno.ESRCH:
191 raise
193 raise
192
194
193 def waitforworkers(blocking=True):
195 def waitforworkers(blocking=True):
194 for pid in pids.copy():
196 for pid in pids.copy():
195 p = st = 0
197 p = st = 0
196 while True:
198 while True:
197 try:
199 try:
198 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
200 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
199 break
201 break
200 except OSError as e:
202 except OSError as e:
201 if e.errno == errno.EINTR:
203 if e.errno == errno.EINTR:
202 continue
204 continue
203 elif e.errno == errno.ECHILD:
205 elif e.errno == errno.ECHILD:
204 # child would already be reaped, but pids yet been
206 # child would already be reaped, but pids yet been
205 # updated (maybe interrupted just after waitpid)
207 # updated (maybe interrupted just after waitpid)
206 pids.discard(pid)
208 pids.discard(pid)
207 break
209 break
208 else:
210 else:
209 raise
211 raise
210 if not p:
212 if not p:
211 # skip subsequent steps, because child process should
213 # skip subsequent steps, because child process should
212 # be still running in this case
214 # be still running in this case
213 continue
215 continue
214 pids.discard(p)
216 pids.discard(p)
215 st = _exitstatus(st)
217 st = _exitstatus(st)
216 if st and not problem[0]:
218 if st and not problem[0]:
217 problem[0] = st
219 problem[0] = st
218
220
219 def sigchldhandler(signum, frame):
221 def sigchldhandler(signum, frame):
220 waitforworkers(blocking=False)
222 waitforworkers(blocking=False)
221 if problem[0]:
223 if problem[0]:
222 killworkers()
224 killworkers()
223
225
224 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
226 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
225 ui.flush()
227 ui.flush()
226 parentpid = os.getpid()
228 parentpid = os.getpid()
227 pipes = []
229 pipes = []
228 retval = {}
230 retval = {}
229 for pargs in partition(args, min(workers, len(args))):
231 for pargs in partition(args, min(workers, len(args))):
230 # Every worker gets its own pipe to send results on, so we don't have to
232 # Every worker gets its own pipe to send results on, so we don't have to
231 # implement atomic writes larger than PIPE_BUF. Each forked process has
233 # implement atomic writes larger than PIPE_BUF. Each forked process has
232 # its own pipe's descriptors in the local variables, and the parent
234 # its own pipe's descriptors in the local variables, and the parent
233 # process has the full list of pipe descriptors (and it doesn't really
235 # process has the full list of pipe descriptors (and it doesn't really
234 # care what order they're in).
236 # care what order they're in).
235 rfd, wfd = os.pipe()
237 rfd, wfd = os.pipe()
236 pipes.append((rfd, wfd))
238 pipes.append((rfd, wfd))
237 # make sure we use os._exit in all worker code paths. otherwise the
239 # make sure we use os._exit in all worker code paths. otherwise the
238 # worker may do some clean-ups which could cause surprises like
240 # worker may do some clean-ups which could cause surprises like
239 # deadlock. see sshpeer.cleanup for example.
241 # deadlock. see sshpeer.cleanup for example.
240 # override error handling *before* fork. this is necessary because
242 # override error handling *before* fork. this is necessary because
241 # exception (signal) may arrive after fork, before "pid =" assignment
243 # exception (signal) may arrive after fork, before "pid =" assignment
242 # completes, and other exception handler (dispatch.py) can lead to
244 # completes, and other exception handler (dispatch.py) can lead to
243 # unexpected code path without os._exit.
245 # unexpected code path without os._exit.
244 ret = -1
246 ret = -1
245 try:
247 try:
246 pid = os.fork()
248 pid = os.fork()
247 if pid == 0:
249 if pid == 0:
248 signal.signal(signal.SIGINT, oldhandler)
250 signal.signal(signal.SIGINT, oldhandler)
249 signal.signal(signal.SIGCHLD, oldchldhandler)
251 signal.signal(signal.SIGCHLD, oldchldhandler)
250
252
251 def workerfunc():
253 def workerfunc():
252 for r, w in pipes[:-1]:
254 for r, w in pipes[:-1]:
253 os.close(r)
255 os.close(r)
254 os.close(w)
256 os.close(w)
255 os.close(rfd)
257 os.close(rfd)
256 for result in func(*(staticargs + (pargs,))):
258 for result in func(*(staticargs + (pargs,))):
257 os.write(wfd, util.pickle.dumps(result))
259 os.write(wfd, util.pickle.dumps(result))
258 return 0
260 return 0
259
261
260 ret = scmutil.callcatch(ui, workerfunc)
262 ret = scmutil.callcatch(ui, workerfunc)
261 except: # parent re-raises, child never returns
263 except: # parent re-raises, child never returns
262 if os.getpid() == parentpid:
264 if os.getpid() == parentpid:
263 raise
265 raise
264 exctype = sys.exc_info()[0]
266 exctype = sys.exc_info()[0]
265 force = not issubclass(exctype, KeyboardInterrupt)
267 force = not issubclass(exctype, KeyboardInterrupt)
266 ui.traceback(force=force)
268 ui.traceback(force=force)
267 finally:
269 finally:
268 if os.getpid() != parentpid:
270 if os.getpid() != parentpid:
269 try:
271 try:
270 ui.flush()
272 ui.flush()
271 except: # never returns, no re-raises
273 except: # never returns, no re-raises
272 pass
274 pass
273 finally:
275 finally:
274 os._exit(ret & 255)
276 os._exit(ret & 255)
275 pids.add(pid)
277 pids.add(pid)
276 selector = selectors.DefaultSelector()
278 selector = selectors.DefaultSelector()
277 for rfd, wfd in pipes:
279 for rfd, wfd in pipes:
278 os.close(wfd)
280 os.close(wfd)
279 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
281 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
280
282
281 def cleanup():
283 def cleanup():
282 signal.signal(signal.SIGINT, oldhandler)
284 signal.signal(signal.SIGINT, oldhandler)
283 waitforworkers()
285 waitforworkers()
284 signal.signal(signal.SIGCHLD, oldchldhandler)
286 signal.signal(signal.SIGCHLD, oldchldhandler)
285 selector.close()
287 selector.close()
286 return problem[0]
288 return problem[0]
287
289
288 try:
290 try:
289 openpipes = len(pipes)
291 openpipes = len(pipes)
290 while openpipes > 0:
292 while openpipes > 0:
291 for key, events in selector.select():
293 for key, events in selector.select():
292 try:
294 try:
293 res = util.pickle.load(_blockingreader(key.fileobj))
295 res = util.pickle.load(_blockingreader(key.fileobj))
294 if hasretval and res[0]:
296 if hasretval and res[0]:
295 retval.update(res[1])
297 retval.update(res[1])
296 else:
298 else:
297 yield res
299 yield res
298 except EOFError:
300 except EOFError:
299 selector.unregister(key.fileobj)
301 selector.unregister(key.fileobj)
300 key.fileobj.close()
302 key.fileobj.close()
301 openpipes -= 1
303 openpipes -= 1
302 except IOError as e:
304 except IOError as e:
303 if e.errno == errno.EINTR:
305 if e.errno == errno.EINTR:
304 continue
306 continue
305 raise
307 raise
306 except: # re-raises
308 except: # re-raises
307 killworkers()
309 killworkers()
308 cleanup()
310 cleanup()
309 raise
311 raise
310 status = cleanup()
312 status = cleanup()
311 if status:
313 if status:
312 if status < 0:
314 if status < 0:
313 os.kill(os.getpid(), -status)
315 os.kill(os.getpid(), -status)
314 raise error.WorkerError(status)
316 raise error.WorkerError(status)
315 if hasretval:
317 if hasretval:
316 yield True, retval
318 yield True, retval
317
319
318
320
319 def _posixexitstatus(code):
321 def _posixexitstatus(code):
320 """convert a posix exit status into the same form returned by
322 """convert a posix exit status into the same form returned by
321 os.spawnv
323 os.spawnv
322
324
323 returns None if the process was stopped instead of exiting"""
325 returns None if the process was stopped instead of exiting"""
324 if os.WIFEXITED(code):
326 if os.WIFEXITED(code):
325 return os.WEXITSTATUS(code)
327 return os.WEXITSTATUS(code)
326 elif os.WIFSIGNALED(code):
328 elif os.WIFSIGNALED(code):
327 return -(os.WTERMSIG(code))
329 return -(os.WTERMSIG(code))
328
330
329
331
330 def _windowsworker(ui, func, staticargs, args, hasretval):
332 def _windowsworker(ui, func, staticargs, args, hasretval):
331 class Worker(threading.Thread):
333 class Worker(threading.Thread):
332 def __init__(
334 def __init__(
333 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
335 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
334 ):
336 ):
335 threading.Thread.__init__(self, *args, **kwargs)
337 threading.Thread.__init__(self, *args, **kwargs)
336 self._taskqueue = taskqueue
338 self._taskqueue = taskqueue
337 self._resultqueue = resultqueue
339 self._resultqueue = resultqueue
338 self._func = func
340 self._func = func
339 self._staticargs = staticargs
341 self._staticargs = staticargs
340 self._interrupted = False
342 self._interrupted = False
341 self.daemon = True
343 self.daemon = True
342 self.exception = None
344 self.exception = None
343
345
344 def interrupt(self):
346 def interrupt(self):
345 self._interrupted = True
347 self._interrupted = True
346
348
347 def run(self):
349 def run(self):
348 try:
350 try:
349 while not self._taskqueue.empty():
351 while not self._taskqueue.empty():
350 try:
352 try:
351 args = self._taskqueue.get_nowait()
353 args = self._taskqueue.get_nowait()
352 for res in self._func(*self._staticargs + (args,)):
354 for res in self._func(*self._staticargs + (args,)):
353 self._resultqueue.put(res)
355 self._resultqueue.put(res)
354 # threading doesn't provide a native way to
356 # threading doesn't provide a native way to
355 # interrupt execution. handle it manually at every
357 # interrupt execution. handle it manually at every
356 # iteration.
358 # iteration.
357 if self._interrupted:
359 if self._interrupted:
358 return
360 return
359 except pycompat.queue.Empty:
361 except pycompat.queue.Empty:
360 break
362 break
361 except Exception as e:
363 except Exception as e:
362 # store the exception such that the main thread can resurface
364 # store the exception such that the main thread can resurface
363 # it as if the func was running without workers.
365 # it as if the func was running without workers.
364 self.exception = e
366 self.exception = e
365 raise
367 raise
366
368
367 threads = []
369 threads = []
368
370
369 def trykillworkers():
371 def trykillworkers():
370 # Allow up to 1 second to clean worker threads nicely
372 # Allow up to 1 second to clean worker threads nicely
371 cleanupend = time.time() + 1
373 cleanupend = time.time() + 1
372 for t in threads:
374 for t in threads:
373 t.interrupt()
375 t.interrupt()
374 for t in threads:
376 for t in threads:
375 remainingtime = cleanupend - time.time()
377 remainingtime = cleanupend - time.time()
376 t.join(remainingtime)
378 t.join(remainingtime)
377 if t.is_alive():
379 if t.is_alive():
378 # pass over the workers joining failure. it is more
380 # pass over the workers joining failure. it is more
379 # important to surface the inital exception than the
381 # important to surface the inital exception than the
380 # fact that one of workers may be processing a large
382 # fact that one of workers may be processing a large
381 # task and does not get to handle the interruption.
383 # task and does not get to handle the interruption.
382 ui.warn(
384 ui.warn(
383 _(
385 _(
384 b"failed to kill worker threads while "
386 b"failed to kill worker threads while "
385 b"handling an exception\n"
387 b"handling an exception\n"
386 )
388 )
387 )
389 )
388 return
390 return
389
391
390 workers = _numworkers(ui)
392 workers = _numworkers(ui)
391 resultqueue = pycompat.queue.Queue()
393 resultqueue = pycompat.queue.Queue()
392 taskqueue = pycompat.queue.Queue()
394 taskqueue = pycompat.queue.Queue()
393 retval = {}
395 retval = {}
394 # partition work to more pieces than workers to minimize the chance
396 # partition work to more pieces than workers to minimize the chance
395 # of uneven distribution of large tasks between the workers
397 # of uneven distribution of large tasks between the workers
396 for pargs in partition(args, workers * 20):
398 for pargs in partition(args, workers * 20):
397 taskqueue.put(pargs)
399 taskqueue.put(pargs)
398 for _i in range(workers):
400 for _i in range(workers):
399 t = Worker(taskqueue, resultqueue, func, staticargs)
401 t = Worker(taskqueue, resultqueue, func, staticargs)
400 threads.append(t)
402 threads.append(t)
401 t.start()
403 t.start()
402 try:
404 try:
403 while len(threads) > 0:
405 while len(threads) > 0:
404 while not resultqueue.empty():
406 while not resultqueue.empty():
405 res = resultqueue.get()
407 res = resultqueue.get()
406 if hasretval and res[0]:
408 if hasretval and res[0]:
407 retval.update(res[1])
409 retval.update(res[1])
408 else:
410 else:
409 yield res
411 yield res
410 threads[0].join(0.05)
412 threads[0].join(0.05)
411 finishedthreads = [_t for _t in threads if not _t.is_alive()]
413 finishedthreads = [_t for _t in threads if not _t.is_alive()]
412 for t in finishedthreads:
414 for t in finishedthreads:
413 if t.exception is not None:
415 if t.exception is not None:
414 raise t.exception
416 raise t.exception
415 threads.remove(t)
417 threads.remove(t)
416 except (Exception, KeyboardInterrupt): # re-raises
418 except (Exception, KeyboardInterrupt): # re-raises
417 trykillworkers()
419 trykillworkers()
418 raise
420 raise
419 while not resultqueue.empty():
421 while not resultqueue.empty():
420 res = resultqueue.get()
422 res = resultqueue.get()
421 if hasretval and res[0]:
423 if hasretval and res[0]:
422 retval.update(res[1])
424 retval.update(res[1])
423 else:
425 else:
424 yield res
426 yield res
425 if hasretval:
427 if hasretval:
426 yield True, retval
428 yield True, retval
427
429
428
430
429 if pycompat.iswindows:
431 if pycompat.iswindows:
430 _platformworker = _windowsworker
432 _platformworker = _windowsworker
431 else:
433 else:
432 _platformworker = _posixworker
434 _platformworker = _posixworker
433 _exitstatus = _posixexitstatus
435 _exitstatus = _posixexitstatus
434
436
435
437
436 def partition(lst, nslices):
438 def partition(lst, nslices):
437 """partition a list into N slices of roughly equal size
439 """partition a list into N slices of roughly equal size
438
440
439 The current strategy takes every Nth element from the input. If
441 The current strategy takes every Nth element from the input. If
440 we ever write workers that need to preserve grouping in input
442 we ever write workers that need to preserve grouping in input
441 we should consider allowing callers to specify a partition strategy.
443 we should consider allowing callers to specify a partition strategy.
442
444
443 mpm is not a fan of this partitioning strategy when files are involved.
445 mpm is not a fan of this partitioning strategy when files are involved.
444 In his words:
446 In his words:
445
447
446 Single-threaded Mercurial makes a point of creating and visiting
448 Single-threaded Mercurial makes a point of creating and visiting
447 files in a fixed order (alphabetical). When creating files in order,
449 files in a fixed order (alphabetical). When creating files in order,
448 a typical filesystem is likely to allocate them on nearby regions on
450 a typical filesystem is likely to allocate them on nearby regions on
449 disk. Thus, when revisiting in the same order, locality is maximized
451 disk. Thus, when revisiting in the same order, locality is maximized
450 and various forms of OS and disk-level caching and read-ahead get a
452 and various forms of OS and disk-level caching and read-ahead get a
451 chance to work.
453 chance to work.
452
454
453 This effect can be quite significant on spinning disks. I discovered it
455 This effect can be quite significant on spinning disks. I discovered it
454 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
456 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
455 Tarring a repo and copying it to another disk effectively randomized
457 Tarring a repo and copying it to another disk effectively randomized
456 the revlog ordering on disk by sorting the revlogs by hash and suddenly
458 the revlog ordering on disk by sorting the revlogs by hash and suddenly
457 performance of my kernel checkout benchmark dropped by ~10x because the
459 performance of my kernel checkout benchmark dropped by ~10x because the
458 "working set" of sectors visited no longer fit in the drive's cache and
460 "working set" of sectors visited no longer fit in the drive's cache and
459 the workload switched from streaming to random I/O.
461 the workload switched from streaming to random I/O.
460
462
461 What we should really be doing is have workers read filenames from a
463 What we should really be doing is have workers read filenames from a
462 ordered queue. This preserves locality and also keeps any worker from
464 ordered queue. This preserves locality and also keeps any worker from
463 getting more than one file out of balance.
465 getting more than one file out of balance.
464 """
466 """
465 for i in range(nslices):
467 for i in range(nslices):
466 yield lst[i::nslices]
468 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now