##// END OF EJS Templates
worker: avoid reading 1 byte at a time from the OS pipe...
Arseniy Alekseyev -
r50794:3eef8baf default
parent child Browse files
Show More
@@ -1,472 +1,444 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import os
9 import os
10 import pickle
10 import pickle
11 import selectors
11 import selectors
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 from .i18n import _
17 from .i18n import _
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 scmutil,
22 scmutil,
23 )
23 )
24
24
25
25
26 def countcpus():
26 def countcpus():
27 '''try to count the number of CPUs on the system'''
27 '''try to count the number of CPUs on the system'''
28
28
29 # posix
29 # posix
30 try:
30 try:
31 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
31 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
32 if n > 0:
32 if n > 0:
33 return n
33 return n
34 except (AttributeError, ValueError):
34 except (AttributeError, ValueError):
35 pass
35 pass
36
36
37 # windows
37 # windows
38 try:
38 try:
39 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
39 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (KeyError, ValueError):
42 except (KeyError, ValueError):
43 pass
43 pass
44
44
45 return 1
45 return 1
46
46
47
47
48 def _numworkers(ui):
48 def _numworkers(ui):
49 s = ui.config(b'worker', b'numcpus')
49 s = ui.config(b'worker', b'numcpus')
50 if s:
50 if s:
51 try:
51 try:
52 n = int(s)
52 n = int(s)
53 if n >= 1:
53 if n >= 1:
54 return n
54 return n
55 except ValueError:
55 except ValueError:
56 raise error.Abort(_(b'number of cpus must be an integer'))
56 raise error.Abort(_(b'number of cpus must be an integer'))
57 return min(max(countcpus(), 4), 32)
57 return min(max(countcpus(), 4), 32)
58
58
59
59
60 def ismainthread():
60 def ismainthread():
61 return threading.current_thread() == threading.main_thread()
61 return threading.current_thread() == threading.main_thread()
62
62
63
63
64 class _blockingreader:
65 """Wrap unbuffered stream such that pickle.load() works with it.
66
67 pickle.load() expects that calls to read() and readinto() read as many
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
69 pickle.load() raises an EOFError.
70 """
71
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
74
75 def readline(self):
76 return self._wrapped.readline()
77
78 def readinto(self, buf):
79 pos = 0
80 size = len(buf)
81
82 with memoryview(buf) as view:
83 while pos < size:
84 with view[pos:] as subview:
85 ret = self._wrapped.readinto(subview)
86 if not ret:
87 break
88 pos += ret
89
90 return pos
91
92 # issue multiple reads until size is fulfilled (or EOF is encountered)
93 def read(self, size=-1):
94 if size < 0:
95 return self._wrapped.readall()
96
97 buf = bytearray(size)
98 n_read = self.readinto(buf)
99 del buf[n_read:]
100 return bytes(buf)
101
102
103 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
104 _STARTUP_COST = 0.01
65 _STARTUP_COST = 0.01
105 # The Windows worker is thread based. If tasks are CPU bound, threads
66 # The Windows worker is thread based. If tasks are CPU bound, threads
106 # in the presence of the GIL result in excessive context switching and
67 # in the presence of the GIL result in excessive context switching and
107 # this overhead can slow down execution.
68 # this overhead can slow down execution.
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 else:
70 else:
110 _STARTUP_COST = 1e30
71 _STARTUP_COST = 1e30
111 _DISALLOW_THREAD_UNSAFE = False
72 _DISALLOW_THREAD_UNSAFE = False
112
73
113
74
114 def worthwhile(ui, costperop, nops, threadsafe=True):
75 def worthwhile(ui, costperop, nops, threadsafe=True):
115 """try to determine whether the benefit of multiple processes can
76 """try to determine whether the benefit of multiple processes can
116 outweigh the cost of starting them"""
77 outweigh the cost of starting them"""
117
78
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 return False
80 return False
120
81
121 linear = costperop * nops
82 linear = costperop * nops
122 workers = _numworkers(ui)
83 workers = _numworkers(ui)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 return benefit >= 0.15
85 return benefit >= 0.15
125
86
126
87
127 def worker(
88 def worker(
128 ui,
89 ui,
129 costperarg,
90 costperarg,
130 func,
91 func,
131 staticargs,
92 staticargs,
132 args,
93 args,
133 hasretval=False,
94 hasretval=False,
134 threadsafe=True,
95 threadsafe=True,
135 prefork=None,
96 prefork=None,
136 ):
97 ):
137 """run a function, possibly in parallel in multiple worker
98 """run a function, possibly in parallel in multiple worker
138 processes.
99 processes.
139
100
140 returns a progress iterator
101 returns a progress iterator
141
102
142 costperarg - cost of a single task
103 costperarg - cost of a single task
143
104
144 func - function to run. It is expected to return a progress iterator.
105 func - function to run. It is expected to return a progress iterator.
145
106
146 staticargs - arguments to pass to every invocation of the function
107 staticargs - arguments to pass to every invocation of the function
147
108
148 args - arguments to split into chunks, to pass to individual
109 args - arguments to split into chunks, to pass to individual
149 workers
110 workers
150
111
151 hasretval - when True, func and the current function return an progress
112 hasretval - when True, func and the current function return an progress
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
113 iterator then a dict (encoded as an iterator that yield many (False, ..)
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
114 then a (True, dict)). The dicts are joined in some arbitrary order, so
154 overlapping keys are a bad idea.
115 overlapping keys are a bad idea.
155
116
156 threadsafe - whether work items are thread safe and can be executed using
117 threadsafe - whether work items are thread safe and can be executed using
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
118 a thread-based worker. Should be disabled for CPU heavy tasks that don't
158 release the GIL.
119 release the GIL.
159
120
160 prefork - a parameterless Callable that is invoked prior to forking the
121 prefork - a parameterless Callable that is invoked prior to forking the
161 process. fork() is only used on non-Windows platforms, but is also not
122 process. fork() is only used on non-Windows platforms, but is also not
162 called on POSIX platforms if the work amount doesn't warrant a worker.
123 called on POSIX platforms if the work amount doesn't warrant a worker.
163 """
124 """
164 enabled = ui.configbool(b'worker', b'enabled')
125 enabled = ui.configbool(b'worker', b'enabled')
165 if enabled and _platformworker is _posixworker and not ismainthread():
126 if enabled and _platformworker is _posixworker and not ismainthread():
166 # The POSIX worker has to install a handler for SIGCHLD.
127 # The POSIX worker has to install a handler for SIGCHLD.
167 # Python up to 3.9 only allows this in the main thread.
128 # Python up to 3.9 only allows this in the main thread.
168 enabled = False
129 enabled = False
169
130
170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
131 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
171 return _platformworker(
132 return _platformworker(
172 ui, func, staticargs, args, hasretval, prefork=prefork
133 ui, func, staticargs, args, hasretval, prefork=prefork
173 )
134 )
174 return func(*staticargs + (args,))
135 return func(*staticargs + (args,))
175
136
176
137
177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
138 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
178 workers = _numworkers(ui)
139 workers = _numworkers(ui)
179 oldhandler = signal.getsignal(signal.SIGINT)
140 oldhandler = signal.getsignal(signal.SIGINT)
180 signal.signal(signal.SIGINT, signal.SIG_IGN)
141 signal.signal(signal.SIGINT, signal.SIG_IGN)
181 pids, problem = set(), [0]
142 pids, problem = set(), [0]
182
143
183 def killworkers():
144 def killworkers():
184 # unregister SIGCHLD handler as all children will be killed. This
145 # unregister SIGCHLD handler as all children will be killed. This
185 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
146 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
186 # could be updated while iterating, which would cause inconsistency.
147 # could be updated while iterating, which would cause inconsistency.
187 signal.signal(signal.SIGCHLD, oldchldhandler)
148 signal.signal(signal.SIGCHLD, oldchldhandler)
188 # if one worker bails, there's no good reason to wait for the rest
149 # if one worker bails, there's no good reason to wait for the rest
189 for p in pids:
150 for p in pids:
190 try:
151 try:
191 os.kill(p, signal.SIGTERM)
152 os.kill(p, signal.SIGTERM)
192 except ProcessLookupError:
153 except ProcessLookupError:
193 pass
154 pass
194
155
195 def waitforworkers(blocking=True):
156 def waitforworkers(blocking=True):
196 for pid in pids.copy():
157 for pid in pids.copy():
197 p = st = 0
158 p = st = 0
198 try:
159 try:
199 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
160 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
200 except ChildProcessError:
161 except ChildProcessError:
201 # child would already be reaped, but pids yet been
162 # child would already be reaped, but pids yet been
202 # updated (maybe interrupted just after waitpid)
163 # updated (maybe interrupted just after waitpid)
203 pids.discard(pid)
164 pids.discard(pid)
204 if not p:
165 if not p:
205 # skip subsequent steps, because child process should
166 # skip subsequent steps, because child process should
206 # be still running in this case
167 # be still running in this case
207 continue
168 continue
208 pids.discard(p)
169 pids.discard(p)
209 st = _exitstatus(st)
170 st = _exitstatus(st)
210 if st and not problem[0]:
171 if st and not problem[0]:
211 problem[0] = st
172 problem[0] = st
212
173
213 def sigchldhandler(signum, frame):
174 def sigchldhandler(signum, frame):
214 waitforworkers(blocking=False)
175 waitforworkers(blocking=False)
215 if problem[0]:
176 if problem[0]:
216 killworkers()
177 killworkers()
217
178
218 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
179 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
219 ui.flush()
180 ui.flush()
220 parentpid = os.getpid()
181 parentpid = os.getpid()
221 pipes = []
182 pipes = []
222 retval = {}
183 retval = {}
223
184
224 if prefork:
185 if prefork:
225 prefork()
186 prefork()
226
187
227 for pargs in partition(args, min(workers, len(args))):
188 for pargs in partition(args, min(workers, len(args))):
228 # Every worker gets its own pipe to send results on, so we don't have to
189 # Every worker gets its own pipe to send results on, so we don't have to
229 # implement atomic writes larger than PIPE_BUF. Each forked process has
190 # implement atomic writes larger than PIPE_BUF. Each forked process has
230 # its own pipe's descriptors in the local variables, and the parent
191 # its own pipe's descriptors in the local variables, and the parent
231 # process has the full list of pipe descriptors (and it doesn't really
192 # process has the full list of pipe descriptors (and it doesn't really
232 # care what order they're in).
193 # care what order they're in).
233 rfd, wfd = os.pipe()
194 rfd, wfd = os.pipe()
234 pipes.append((rfd, wfd))
195 pipes.append((rfd, wfd))
235 # make sure we use os._exit in all worker code paths. otherwise the
196 # make sure we use os._exit in all worker code paths. otherwise the
236 # worker may do some clean-ups which could cause surprises like
197 # worker may do some clean-ups which could cause surprises like
237 # deadlock. see sshpeer.cleanup for example.
198 # deadlock. see sshpeer.cleanup for example.
238 # override error handling *before* fork. this is necessary because
199 # override error handling *before* fork. this is necessary because
239 # exception (signal) may arrive after fork, before "pid =" assignment
200 # exception (signal) may arrive after fork, before "pid =" assignment
240 # completes, and other exception handler (dispatch.py) can lead to
201 # completes, and other exception handler (dispatch.py) can lead to
241 # unexpected code path without os._exit.
202 # unexpected code path without os._exit.
242 ret = -1
203 ret = -1
243 try:
204 try:
244 pid = os.fork()
205 pid = os.fork()
245 if pid == 0:
206 if pid == 0:
246 signal.signal(signal.SIGINT, oldhandler)
207 signal.signal(signal.SIGINT, oldhandler)
247 signal.signal(signal.SIGCHLD, oldchldhandler)
208 signal.signal(signal.SIGCHLD, oldchldhandler)
248
209
249 def workerfunc():
210 def workerfunc():
250 for r, w in pipes[:-1]:
211 for r, w in pipes[:-1]:
251 os.close(r)
212 os.close(r)
252 os.close(w)
213 os.close(w)
253 os.close(rfd)
214 os.close(rfd)
254 with os.fdopen(wfd, 'wb') as wf:
215 with os.fdopen(wfd, 'wb') as wf:
255 for result in func(*(staticargs + (pargs,))):
216 for result in func(*(staticargs + (pargs,))):
256 pickle.dump(result, wf)
217 pickle.dump(result, wf)
257 wf.flush()
218 wf.flush()
258 return 0
219 return 0
259
220
260 ret = scmutil.callcatch(ui, workerfunc)
221 ret = scmutil.callcatch(ui, workerfunc)
261 except: # parent re-raises, child never returns
222 except: # parent re-raises, child never returns
262 if os.getpid() == parentpid:
223 if os.getpid() == parentpid:
263 raise
224 raise
264 exctype = sys.exc_info()[0]
225 exctype = sys.exc_info()[0]
265 force = not issubclass(exctype, KeyboardInterrupt)
226 force = not issubclass(exctype, KeyboardInterrupt)
266 ui.traceback(force=force)
227 ui.traceback(force=force)
267 finally:
228 finally:
268 if os.getpid() != parentpid:
229 if os.getpid() != parentpid:
269 try:
230 try:
270 ui.flush()
231 ui.flush()
271 except: # never returns, no re-raises
232 except: # never returns, no re-raises
272 pass
233 pass
273 finally:
234 finally:
274 os._exit(ret & 255)
235 os._exit(ret & 255)
275 pids.add(pid)
236 pids.add(pid)
276 selector = selectors.DefaultSelector()
237 selector = selectors.DefaultSelector()
277 for rfd, wfd in pipes:
238 for rfd, wfd in pipes:
278 os.close(wfd)
239 os.close(wfd)
279 # The stream has to be unbuffered. Otherwise, if all data is read from
240 # Buffering is needed for performance, but it also presents a problem:
280 # the raw file into the buffer, the selector thinks that the FD is not
241 # selector doesn't take the buffered data into account,
281 # ready to read while pickle.load() could read from the buffer. This
242 # so we have to arrange it so that the buffers are empty when select is called
282 # would delay the processing of readable items.
243 # (see [peek_nonblock])
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
244 selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ)
245
246 def peek_nonblock(f):
247 os.set_blocking(f.fileno(), False)
248 res = f.peek()
249 os.set_blocking(f.fileno(), True)
250 return res
251
252 def load_all(f):
253 # The pytype error likely goes away on a modern version of
254 # pytype having a modern typeshed snapshot.
255 # pytype: disable=wrong-arg-types
256 yield pickle.load(f)
257 while len(peek_nonblock(f)) > 0:
258 yield pickle.load(f)
259 # pytype: enable=wrong-arg-types
284
260
285 def cleanup():
261 def cleanup():
286 signal.signal(signal.SIGINT, oldhandler)
262 signal.signal(signal.SIGINT, oldhandler)
287 waitforworkers()
263 waitforworkers()
288 signal.signal(signal.SIGCHLD, oldchldhandler)
264 signal.signal(signal.SIGCHLD, oldchldhandler)
289 selector.close()
265 selector.close()
290 return problem[0]
266 return problem[0]
291
267
292 try:
268 try:
293 openpipes = len(pipes)
269 openpipes = len(pipes)
294 while openpipes > 0:
270 while openpipes > 0:
295 for key, events in selector.select():
271 for key, events in selector.select():
296 try:
272 try:
297 # The pytype error likely goes away on a modern version of
273 for res in load_all(key.fileobj):
298 # pytype having a modern typeshed snapshot.
274 if hasretval and res[0]:
299 # pytype: disable=wrong-arg-types
275 retval.update(res[1])
300 res = pickle.load(_blockingreader(key.fileobj))
276 else:
301 # pytype: enable=wrong-arg-types
277 yield res
302 if hasretval and res[0]:
303 retval.update(res[1])
304 else:
305 yield res
306 except EOFError:
278 except EOFError:
307 selector.unregister(key.fileobj)
279 selector.unregister(key.fileobj)
308 # pytype: disable=attribute-error
280 # pytype: disable=attribute-error
309 key.fileobj.close()
281 key.fileobj.close()
310 # pytype: enable=attribute-error
282 # pytype: enable=attribute-error
311 openpipes -= 1
283 openpipes -= 1
312 except: # re-raises
284 except: # re-raises
313 killworkers()
285 killworkers()
314 cleanup()
286 cleanup()
315 raise
287 raise
316 status = cleanup()
288 status = cleanup()
317 if status:
289 if status:
318 if status < 0:
290 if status < 0:
319 os.kill(os.getpid(), -status)
291 os.kill(os.getpid(), -status)
320 raise error.WorkerError(status)
292 raise error.WorkerError(status)
321 if hasretval:
293 if hasretval:
322 yield True, retval
294 yield True, retval
323
295
324
296
325 def _posixexitstatus(code):
297 def _posixexitstatus(code):
326 """convert a posix exit status into the same form returned by
298 """convert a posix exit status into the same form returned by
327 os.spawnv
299 os.spawnv
328
300
329 returns None if the process was stopped instead of exiting"""
301 returns None if the process was stopped instead of exiting"""
330 if os.WIFEXITED(code):
302 if os.WIFEXITED(code):
331 return os.WEXITSTATUS(code)
303 return os.WEXITSTATUS(code)
332 elif os.WIFSIGNALED(code):
304 elif os.WIFSIGNALED(code):
333 return -(os.WTERMSIG(code))
305 return -(os.WTERMSIG(code))
334
306
335
307
336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
308 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
337 class Worker(threading.Thread):
309 class Worker(threading.Thread):
338 def __init__(
310 def __init__(
339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
311 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
340 ):
312 ):
341 threading.Thread.__init__(self, *args, **kwargs)
313 threading.Thread.__init__(self, *args, **kwargs)
342 self._taskqueue = taskqueue
314 self._taskqueue = taskqueue
343 self._resultqueue = resultqueue
315 self._resultqueue = resultqueue
344 self._func = func
316 self._func = func
345 self._staticargs = staticargs
317 self._staticargs = staticargs
346 self._interrupted = False
318 self._interrupted = False
347 self.daemon = True
319 self.daemon = True
348 self.exception = None
320 self.exception = None
349
321
350 def interrupt(self):
322 def interrupt(self):
351 self._interrupted = True
323 self._interrupted = True
352
324
353 def run(self):
325 def run(self):
354 try:
326 try:
355 while not self._taskqueue.empty():
327 while not self._taskqueue.empty():
356 try:
328 try:
357 args = self._taskqueue.get_nowait()
329 args = self._taskqueue.get_nowait()
358 for res in self._func(*self._staticargs + (args,)):
330 for res in self._func(*self._staticargs + (args,)):
359 self._resultqueue.put(res)
331 self._resultqueue.put(res)
360 # threading doesn't provide a native way to
332 # threading doesn't provide a native way to
361 # interrupt execution. handle it manually at every
333 # interrupt execution. handle it manually at every
362 # iteration.
334 # iteration.
363 if self._interrupted:
335 if self._interrupted:
364 return
336 return
365 except pycompat.queue.Empty:
337 except pycompat.queue.Empty:
366 break
338 break
367 except Exception as e:
339 except Exception as e:
368 # store the exception such that the main thread can resurface
340 # store the exception such that the main thread can resurface
369 # it as if the func was running without workers.
341 # it as if the func was running without workers.
370 self.exception = e
342 self.exception = e
371 raise
343 raise
372
344
373 threads = []
345 threads = []
374
346
375 def trykillworkers():
347 def trykillworkers():
376 # Allow up to 1 second to clean worker threads nicely
348 # Allow up to 1 second to clean worker threads nicely
377 cleanupend = time.time() + 1
349 cleanupend = time.time() + 1
378 for t in threads:
350 for t in threads:
379 t.interrupt()
351 t.interrupt()
380 for t in threads:
352 for t in threads:
381 remainingtime = cleanupend - time.time()
353 remainingtime = cleanupend - time.time()
382 t.join(remainingtime)
354 t.join(remainingtime)
383 if t.is_alive():
355 if t.is_alive():
384 # pass over the workers joining failure. it is more
356 # pass over the workers joining failure. it is more
385 # important to surface the inital exception than the
357 # important to surface the inital exception than the
386 # fact that one of workers may be processing a large
358 # fact that one of workers may be processing a large
387 # task and does not get to handle the interruption.
359 # task and does not get to handle the interruption.
388 ui.warn(
360 ui.warn(
389 _(
361 _(
390 b"failed to kill worker threads while "
362 b"failed to kill worker threads while "
391 b"handling an exception\n"
363 b"handling an exception\n"
392 )
364 )
393 )
365 )
394 return
366 return
395
367
396 workers = _numworkers(ui)
368 workers = _numworkers(ui)
397 resultqueue = pycompat.queue.Queue()
369 resultqueue = pycompat.queue.Queue()
398 taskqueue = pycompat.queue.Queue()
370 taskqueue = pycompat.queue.Queue()
399 retval = {}
371 retval = {}
400 # partition work to more pieces than workers to minimize the chance
372 # partition work to more pieces than workers to minimize the chance
401 # of uneven distribution of large tasks between the workers
373 # of uneven distribution of large tasks between the workers
402 for pargs in partition(args, workers * 20):
374 for pargs in partition(args, workers * 20):
403 taskqueue.put(pargs)
375 taskqueue.put(pargs)
404 for _i in range(workers):
376 for _i in range(workers):
405 t = Worker(taskqueue, resultqueue, func, staticargs)
377 t = Worker(taskqueue, resultqueue, func, staticargs)
406 threads.append(t)
378 threads.append(t)
407 t.start()
379 t.start()
408 try:
380 try:
409 while len(threads) > 0:
381 while len(threads) > 0:
410 while not resultqueue.empty():
382 while not resultqueue.empty():
411 res = resultqueue.get()
383 res = resultqueue.get()
412 if hasretval and res[0]:
384 if hasretval and res[0]:
413 retval.update(res[1])
385 retval.update(res[1])
414 else:
386 else:
415 yield res
387 yield res
416 threads[0].join(0.05)
388 threads[0].join(0.05)
417 finishedthreads = [_t for _t in threads if not _t.is_alive()]
389 finishedthreads = [_t for _t in threads if not _t.is_alive()]
418 for t in finishedthreads:
390 for t in finishedthreads:
419 if t.exception is not None:
391 if t.exception is not None:
420 raise t.exception
392 raise t.exception
421 threads.remove(t)
393 threads.remove(t)
422 except (Exception, KeyboardInterrupt): # re-raises
394 except (Exception, KeyboardInterrupt): # re-raises
423 trykillworkers()
395 trykillworkers()
424 raise
396 raise
425 while not resultqueue.empty():
397 while not resultqueue.empty():
426 res = resultqueue.get()
398 res = resultqueue.get()
427 if hasretval and res[0]:
399 if hasretval and res[0]:
428 retval.update(res[1])
400 retval.update(res[1])
429 else:
401 else:
430 yield res
402 yield res
431 if hasretval:
403 if hasretval:
432 yield True, retval
404 yield True, retval
433
405
434
406
435 if pycompat.iswindows:
407 if pycompat.iswindows:
436 _platformworker = _windowsworker
408 _platformworker = _windowsworker
437 else:
409 else:
438 _platformworker = _posixworker
410 _platformworker = _posixworker
439 _exitstatus = _posixexitstatus
411 _exitstatus = _posixexitstatus
440
412
441
413
442 def partition(lst, nslices):
414 def partition(lst, nslices):
443 """partition a list into N slices of roughly equal size
415 """partition a list into N slices of roughly equal size
444
416
445 The current strategy takes every Nth element from the input. If
417 The current strategy takes every Nth element from the input. If
446 we ever write workers that need to preserve grouping in input
418 we ever write workers that need to preserve grouping in input
447 we should consider allowing callers to specify a partition strategy.
419 we should consider allowing callers to specify a partition strategy.
448
420
449 olivia is not a fan of this partitioning strategy when files are involved.
421 olivia is not a fan of this partitioning strategy when files are involved.
450 In his words:
422 In his words:
451
423
452 Single-threaded Mercurial makes a point of creating and visiting
424 Single-threaded Mercurial makes a point of creating and visiting
453 files in a fixed order (alphabetical). When creating files in order,
425 files in a fixed order (alphabetical). When creating files in order,
454 a typical filesystem is likely to allocate them on nearby regions on
426 a typical filesystem is likely to allocate them on nearby regions on
455 disk. Thus, when revisiting in the same order, locality is maximized
427 disk. Thus, when revisiting in the same order, locality is maximized
456 and various forms of OS and disk-level caching and read-ahead get a
428 and various forms of OS and disk-level caching and read-ahead get a
457 chance to work.
429 chance to work.
458
430
459 This effect can be quite significant on spinning disks. I discovered it
431 This effect can be quite significant on spinning disks. I discovered it
460 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
432 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
461 Tarring a repo and copying it to another disk effectively randomized
433 Tarring a repo and copying it to another disk effectively randomized
462 the revlog ordering on disk by sorting the revlogs by hash and suddenly
434 the revlog ordering on disk by sorting the revlogs by hash and suddenly
463 performance of my kernel checkout benchmark dropped by ~10x because the
435 performance of my kernel checkout benchmark dropped by ~10x because the
464 "working set" of sectors visited no longer fit in the drive's cache and
436 "working set" of sectors visited no longer fit in the drive's cache and
465 the workload switched from streaming to random I/O.
437 the workload switched from streaming to random I/O.
466
438
467 What we should really be doing is have workers read filenames from a
439 What we should really be doing is have workers read filenames from a
468 ordered queue. This preserves locality and also keeps any worker from
440 ordered queue. This preserves locality and also keeps any worker from
469 getting more than one file out of balance.
441 getting more than one file out of balance.
470 """
442 """
471 for i in range(nslices):
443 for i in range(nslices):
472 yield lst[i::nslices]
444 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now