##// END OF EJS Templates
worker: POSIX only supports workers from main thread (issue6460)...
Joerg Sonnenberger -
r46857:a42502e9 default
parent child Browse files
Show More
@@ -1,455 +1,466 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.ispy3:
68 if pycompat.ispy3:
69
69
70 def ismainthread():
71 return threading.current_thread() == threading.main_thread()
72
70 class _blockingreader(object):
73 class _blockingreader(object):
71 def __init__(self, wrapped):
74 def __init__(self, wrapped):
72 self._wrapped = wrapped
75 self._wrapped = wrapped
73
76
74 # Do NOT implement readinto() by making it delegate to
77 # Do NOT implement readinto() by making it delegate to
75 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
76 # with just read() and readline(), so we don't need to implement it.
79 # with just read() and readline(), so we don't need to implement it.
77
80
78 def readline(self):
81 def readline(self):
79 return self._wrapped.readline()
82 return self._wrapped.readline()
80
83
81 # issue multiple reads until size is fulfilled
84 # issue multiple reads until size is fulfilled
82 def read(self, size=-1):
85 def read(self, size=-1):
83 if size < 0:
86 if size < 0:
84 return self._wrapped.readall()
87 return self._wrapped.readall()
85
88
86 buf = bytearray(size)
89 buf = bytearray(size)
87 view = memoryview(buf)
90 view = memoryview(buf)
88 pos = 0
91 pos = 0
89
92
90 while pos < size:
93 while pos < size:
91 ret = self._wrapped.readinto(view[pos:])
94 ret = self._wrapped.readinto(view[pos:])
92 if not ret:
95 if not ret:
93 break
96 break
94 pos += ret
97 pos += ret
95
98
96 del view
99 del view
97 del buf[pos:]
100 del buf[pos:]
98 return bytes(buf)
101 return bytes(buf)
99
102
100
103
101 else:
104 else:
102
105
106 def ismainthread():
107 return isinstance(threading.current_thread(), threading._MainThread)
108
103 def _blockingreader(wrapped):
109 def _blockingreader(wrapped):
104 return wrapped
110 return wrapped
105
111
106
112
107 if pycompat.isposix or pycompat.iswindows:
113 if pycompat.isposix or pycompat.iswindows:
108 _STARTUP_COST = 0.01
114 _STARTUP_COST = 0.01
109 # The Windows worker is thread based. If tasks are CPU bound, threads
115 # The Windows worker is thread based. If tasks are CPU bound, threads
110 # in the presence of the GIL result in excessive context switching and
116 # in the presence of the GIL result in excessive context switching and
111 # this overhead can slow down execution.
117 # this overhead can slow down execution.
112 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
118 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
113 else:
119 else:
114 _STARTUP_COST = 1e30
120 _STARTUP_COST = 1e30
115 _DISALLOW_THREAD_UNSAFE = False
121 _DISALLOW_THREAD_UNSAFE = False
116
122
117
123
118 def worthwhile(ui, costperop, nops, threadsafe=True):
124 def worthwhile(ui, costperop, nops, threadsafe=True):
119 """try to determine whether the benefit of multiple processes can
125 """try to determine whether the benefit of multiple processes can
120 outweigh the cost of starting them"""
126 outweigh the cost of starting them"""
121
127
122 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
128 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
123 return False
129 return False
124
130
125 linear = costperop * nops
131 linear = costperop * nops
126 workers = _numworkers(ui)
132 workers = _numworkers(ui)
127 benefit = linear - (_STARTUP_COST * workers + linear / workers)
133 benefit = linear - (_STARTUP_COST * workers + linear / workers)
128 return benefit >= 0.15
134 return benefit >= 0.15
129
135
130
136
131 def worker(
137 def worker(
132 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
138 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
133 ):
139 ):
134 """run a function, possibly in parallel in multiple worker
140 """run a function, possibly in parallel in multiple worker
135 processes.
141 processes.
136
142
137 returns a progress iterator
143 returns a progress iterator
138
144
139 costperarg - cost of a single task
145 costperarg - cost of a single task
140
146
141 func - function to run. It is expected to return a progress iterator.
147 func - function to run. It is expected to return a progress iterator.
142
148
143 staticargs - arguments to pass to every invocation of the function
149 staticargs - arguments to pass to every invocation of the function
144
150
145 args - arguments to split into chunks, to pass to individual
151 args - arguments to split into chunks, to pass to individual
146 workers
152 workers
147
153
148 hasretval - when True, func and the current function return an progress
154 hasretval - when True, func and the current function return an progress
149 iterator then a dict (encoded as an iterator that yield many (False, ..)
155 iterator then a dict (encoded as an iterator that yield many (False, ..)
150 then a (True, dict)). The dicts are joined in some arbitrary order, so
156 then a (True, dict)). The dicts are joined in some arbitrary order, so
151 overlapping keys are a bad idea.
157 overlapping keys are a bad idea.
152
158
153 threadsafe - whether work items are thread safe and can be executed using
159 threadsafe - whether work items are thread safe and can be executed using
154 a thread-based worker. Should be disabled for CPU heavy tasks that don't
160 a thread-based worker. Should be disabled for CPU heavy tasks that don't
155 release the GIL.
161 release the GIL.
156 """
162 """
157 enabled = ui.configbool(b'worker', b'enabled')
163 enabled = ui.configbool(b'worker', b'enabled')
164 if enabled and _platformworker is _posixworker and not ismainthread():
165 # The POSIX worker has to install a handler for SIGCHLD.
166 # Python up to 3.9 only allows this in the main thread.
167 enabled = False
168
158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
169 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
159 return _platformworker(ui, func, staticargs, args, hasretval)
170 return _platformworker(ui, func, staticargs, args, hasretval)
160 return func(*staticargs + (args,))
171 return func(*staticargs + (args,))
161
172
162
173
163 def _posixworker(ui, func, staticargs, args, hasretval):
174 def _posixworker(ui, func, staticargs, args, hasretval):
164 workers = _numworkers(ui)
175 workers = _numworkers(ui)
165 oldhandler = signal.getsignal(signal.SIGINT)
176 oldhandler = signal.getsignal(signal.SIGINT)
166 signal.signal(signal.SIGINT, signal.SIG_IGN)
177 signal.signal(signal.SIGINT, signal.SIG_IGN)
167 pids, problem = set(), [0]
178 pids, problem = set(), [0]
168
179
169 def killworkers():
180 def killworkers():
170 # unregister SIGCHLD handler as all children will be killed. This
181 # unregister SIGCHLD handler as all children will be killed. This
171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
182 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
172 # could be updated while iterating, which would cause inconsistency.
183 # could be updated while iterating, which would cause inconsistency.
173 signal.signal(signal.SIGCHLD, oldchldhandler)
184 signal.signal(signal.SIGCHLD, oldchldhandler)
174 # if one worker bails, there's no good reason to wait for the rest
185 # if one worker bails, there's no good reason to wait for the rest
175 for p in pids:
186 for p in pids:
176 try:
187 try:
177 os.kill(p, signal.SIGTERM)
188 os.kill(p, signal.SIGTERM)
178 except OSError as err:
189 except OSError as err:
179 if err.errno != errno.ESRCH:
190 if err.errno != errno.ESRCH:
180 raise
191 raise
181
192
182 def waitforworkers(blocking=True):
193 def waitforworkers(blocking=True):
183 for pid in pids.copy():
194 for pid in pids.copy():
184 p = st = 0
195 p = st = 0
185 while True:
196 while True:
186 try:
197 try:
187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
198 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
188 break
199 break
189 except OSError as e:
200 except OSError as e:
190 if e.errno == errno.EINTR:
201 if e.errno == errno.EINTR:
191 continue
202 continue
192 elif e.errno == errno.ECHILD:
203 elif e.errno == errno.ECHILD:
193 # child would already be reaped, but pids yet been
204 # child would already be reaped, but pids yet been
194 # updated (maybe interrupted just after waitpid)
205 # updated (maybe interrupted just after waitpid)
195 pids.discard(pid)
206 pids.discard(pid)
196 break
207 break
197 else:
208 else:
198 raise
209 raise
199 if not p:
210 if not p:
200 # skip subsequent steps, because child process should
211 # skip subsequent steps, because child process should
201 # be still running in this case
212 # be still running in this case
202 continue
213 continue
203 pids.discard(p)
214 pids.discard(p)
204 st = _exitstatus(st)
215 st = _exitstatus(st)
205 if st and not problem[0]:
216 if st and not problem[0]:
206 problem[0] = st
217 problem[0] = st
207
218
208 def sigchldhandler(signum, frame):
219 def sigchldhandler(signum, frame):
209 waitforworkers(blocking=False)
220 waitforworkers(blocking=False)
210 if problem[0]:
221 if problem[0]:
211 killworkers()
222 killworkers()
212
223
213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
224 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
214 ui.flush()
225 ui.flush()
215 parentpid = os.getpid()
226 parentpid = os.getpid()
216 pipes = []
227 pipes = []
217 retval = {}
228 retval = {}
218 for pargs in partition(args, min(workers, len(args))):
229 for pargs in partition(args, min(workers, len(args))):
219 # Every worker gets its own pipe to send results on, so we don't have to
230 # Every worker gets its own pipe to send results on, so we don't have to
220 # implement atomic writes larger than PIPE_BUF. Each forked process has
231 # implement atomic writes larger than PIPE_BUF. Each forked process has
221 # its own pipe's descriptors in the local variables, and the parent
232 # its own pipe's descriptors in the local variables, and the parent
222 # process has the full list of pipe descriptors (and it doesn't really
233 # process has the full list of pipe descriptors (and it doesn't really
223 # care what order they're in).
234 # care what order they're in).
224 rfd, wfd = os.pipe()
235 rfd, wfd = os.pipe()
225 pipes.append((rfd, wfd))
236 pipes.append((rfd, wfd))
226 # make sure we use os._exit in all worker code paths. otherwise the
237 # make sure we use os._exit in all worker code paths. otherwise the
227 # worker may do some clean-ups which could cause surprises like
238 # worker may do some clean-ups which could cause surprises like
228 # deadlock. see sshpeer.cleanup for example.
239 # deadlock. see sshpeer.cleanup for example.
229 # override error handling *before* fork. this is necessary because
240 # override error handling *before* fork. this is necessary because
230 # exception (signal) may arrive after fork, before "pid =" assignment
241 # exception (signal) may arrive after fork, before "pid =" assignment
231 # completes, and other exception handler (dispatch.py) can lead to
242 # completes, and other exception handler (dispatch.py) can lead to
232 # unexpected code path without os._exit.
243 # unexpected code path without os._exit.
233 ret = -1
244 ret = -1
234 try:
245 try:
235 pid = os.fork()
246 pid = os.fork()
236 if pid == 0:
247 if pid == 0:
237 signal.signal(signal.SIGINT, oldhandler)
248 signal.signal(signal.SIGINT, oldhandler)
238 signal.signal(signal.SIGCHLD, oldchldhandler)
249 signal.signal(signal.SIGCHLD, oldchldhandler)
239
250
240 def workerfunc():
251 def workerfunc():
241 for r, w in pipes[:-1]:
252 for r, w in pipes[:-1]:
242 os.close(r)
253 os.close(r)
243 os.close(w)
254 os.close(w)
244 os.close(rfd)
255 os.close(rfd)
245 for result in func(*(staticargs + (pargs,))):
256 for result in func(*(staticargs + (pargs,))):
246 os.write(wfd, util.pickle.dumps(result))
257 os.write(wfd, util.pickle.dumps(result))
247 return 0
258 return 0
248
259
249 ret = scmutil.callcatch(ui, workerfunc)
260 ret = scmutil.callcatch(ui, workerfunc)
250 except: # parent re-raises, child never returns
261 except: # parent re-raises, child never returns
251 if os.getpid() == parentpid:
262 if os.getpid() == parentpid:
252 raise
263 raise
253 exctype = sys.exc_info()[0]
264 exctype = sys.exc_info()[0]
254 force = not issubclass(exctype, KeyboardInterrupt)
265 force = not issubclass(exctype, KeyboardInterrupt)
255 ui.traceback(force=force)
266 ui.traceback(force=force)
256 finally:
267 finally:
257 if os.getpid() != parentpid:
268 if os.getpid() != parentpid:
258 try:
269 try:
259 ui.flush()
270 ui.flush()
260 except: # never returns, no re-raises
271 except: # never returns, no re-raises
261 pass
272 pass
262 finally:
273 finally:
263 os._exit(ret & 255)
274 os._exit(ret & 255)
264 pids.add(pid)
275 pids.add(pid)
265 selector = selectors.DefaultSelector()
276 selector = selectors.DefaultSelector()
266 for rfd, wfd in pipes:
277 for rfd, wfd in pipes:
267 os.close(wfd)
278 os.close(wfd)
268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
279 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
269
280
270 def cleanup():
281 def cleanup():
271 signal.signal(signal.SIGINT, oldhandler)
282 signal.signal(signal.SIGINT, oldhandler)
272 waitforworkers()
283 waitforworkers()
273 signal.signal(signal.SIGCHLD, oldchldhandler)
284 signal.signal(signal.SIGCHLD, oldchldhandler)
274 selector.close()
285 selector.close()
275 return problem[0]
286 return problem[0]
276
287
277 try:
288 try:
278 openpipes = len(pipes)
289 openpipes = len(pipes)
279 while openpipes > 0:
290 while openpipes > 0:
280 for key, events in selector.select():
291 for key, events in selector.select():
281 try:
292 try:
282 res = util.pickle.load(_blockingreader(key.fileobj))
293 res = util.pickle.load(_blockingreader(key.fileobj))
283 if hasretval and res[0]:
294 if hasretval and res[0]:
284 retval.update(res[1])
295 retval.update(res[1])
285 else:
296 else:
286 yield res
297 yield res
287 except EOFError:
298 except EOFError:
288 selector.unregister(key.fileobj)
299 selector.unregister(key.fileobj)
289 key.fileobj.close()
300 key.fileobj.close()
290 openpipes -= 1
301 openpipes -= 1
291 except IOError as e:
302 except IOError as e:
292 if e.errno == errno.EINTR:
303 if e.errno == errno.EINTR:
293 continue
304 continue
294 raise
305 raise
295 except: # re-raises
306 except: # re-raises
296 killworkers()
307 killworkers()
297 cleanup()
308 cleanup()
298 raise
309 raise
299 status = cleanup()
310 status = cleanup()
300 if status:
311 if status:
301 if status < 0:
312 if status < 0:
302 os.kill(os.getpid(), -status)
313 os.kill(os.getpid(), -status)
303 raise error.WorkerError(status)
314 raise error.WorkerError(status)
304 if hasretval:
315 if hasretval:
305 yield True, retval
316 yield True, retval
306
317
307
318
308 def _posixexitstatus(code):
319 def _posixexitstatus(code):
309 """convert a posix exit status into the same form returned by
320 """convert a posix exit status into the same form returned by
310 os.spawnv
321 os.spawnv
311
322
312 returns None if the process was stopped instead of exiting"""
323 returns None if the process was stopped instead of exiting"""
313 if os.WIFEXITED(code):
324 if os.WIFEXITED(code):
314 return os.WEXITSTATUS(code)
325 return os.WEXITSTATUS(code)
315 elif os.WIFSIGNALED(code):
326 elif os.WIFSIGNALED(code):
316 return -(os.WTERMSIG(code))
327 return -(os.WTERMSIG(code))
317
328
318
329
319 def _windowsworker(ui, func, staticargs, args, hasretval):
330 def _windowsworker(ui, func, staticargs, args, hasretval):
320 class Worker(threading.Thread):
331 class Worker(threading.Thread):
321 def __init__(
332 def __init__(
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
333 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 ):
334 ):
324 threading.Thread.__init__(self, *args, **kwargs)
335 threading.Thread.__init__(self, *args, **kwargs)
325 self._taskqueue = taskqueue
336 self._taskqueue = taskqueue
326 self._resultqueue = resultqueue
337 self._resultqueue = resultqueue
327 self._func = func
338 self._func = func
328 self._staticargs = staticargs
339 self._staticargs = staticargs
329 self._interrupted = False
340 self._interrupted = False
330 self.daemon = True
341 self.daemon = True
331 self.exception = None
342 self.exception = None
332
343
333 def interrupt(self):
344 def interrupt(self):
334 self._interrupted = True
345 self._interrupted = True
335
346
336 def run(self):
347 def run(self):
337 try:
348 try:
338 while not self._taskqueue.empty():
349 while not self._taskqueue.empty():
339 try:
350 try:
340 args = self._taskqueue.get_nowait()
351 args = self._taskqueue.get_nowait()
341 for res in self._func(*self._staticargs + (args,)):
352 for res in self._func(*self._staticargs + (args,)):
342 self._resultqueue.put(res)
353 self._resultqueue.put(res)
343 # threading doesn't provide a native way to
354 # threading doesn't provide a native way to
344 # interrupt execution. handle it manually at every
355 # interrupt execution. handle it manually at every
345 # iteration.
356 # iteration.
346 if self._interrupted:
357 if self._interrupted:
347 return
358 return
348 except pycompat.queue.Empty:
359 except pycompat.queue.Empty:
349 break
360 break
350 except Exception as e:
361 except Exception as e:
351 # store the exception such that the main thread can resurface
362 # store the exception such that the main thread can resurface
352 # it as if the func was running without workers.
363 # it as if the func was running without workers.
353 self.exception = e
364 self.exception = e
354 raise
365 raise
355
366
356 threads = []
367 threads = []
357
368
358 def trykillworkers():
369 def trykillworkers():
359 # Allow up to 1 second to clean worker threads nicely
370 # Allow up to 1 second to clean worker threads nicely
360 cleanupend = time.time() + 1
371 cleanupend = time.time() + 1
361 for t in threads:
372 for t in threads:
362 t.interrupt()
373 t.interrupt()
363 for t in threads:
374 for t in threads:
364 remainingtime = cleanupend - time.time()
375 remainingtime = cleanupend - time.time()
365 t.join(remainingtime)
376 t.join(remainingtime)
366 if t.is_alive():
377 if t.is_alive():
367 # pass over the workers joining failure. it is more
378 # pass over the workers joining failure. it is more
368 # important to surface the inital exception than the
379 # important to surface the inital exception than the
369 # fact that one of workers may be processing a large
380 # fact that one of workers may be processing a large
370 # task and does not get to handle the interruption.
381 # task and does not get to handle the interruption.
371 ui.warn(
382 ui.warn(
372 _(
383 _(
373 b"failed to kill worker threads while "
384 b"failed to kill worker threads while "
374 b"handling an exception\n"
385 b"handling an exception\n"
375 )
386 )
376 )
387 )
377 return
388 return
378
389
379 workers = _numworkers(ui)
390 workers = _numworkers(ui)
380 resultqueue = pycompat.queue.Queue()
391 resultqueue = pycompat.queue.Queue()
381 taskqueue = pycompat.queue.Queue()
392 taskqueue = pycompat.queue.Queue()
382 retval = {}
393 retval = {}
383 # partition work to more pieces than workers to minimize the chance
394 # partition work to more pieces than workers to minimize the chance
384 # of uneven distribution of large tasks between the workers
395 # of uneven distribution of large tasks between the workers
385 for pargs in partition(args, workers * 20):
396 for pargs in partition(args, workers * 20):
386 taskqueue.put(pargs)
397 taskqueue.put(pargs)
387 for _i in range(workers):
398 for _i in range(workers):
388 t = Worker(taskqueue, resultqueue, func, staticargs)
399 t = Worker(taskqueue, resultqueue, func, staticargs)
389 threads.append(t)
400 threads.append(t)
390 t.start()
401 t.start()
391 try:
402 try:
392 while len(threads) > 0:
403 while len(threads) > 0:
393 while not resultqueue.empty():
404 while not resultqueue.empty():
394 res = resultqueue.get()
405 res = resultqueue.get()
395 if hasretval and res[0]:
406 if hasretval and res[0]:
396 retval.update(res[1])
407 retval.update(res[1])
397 else:
408 else:
398 yield res
409 yield res
399 threads[0].join(0.05)
410 threads[0].join(0.05)
400 finishedthreads = [_t for _t in threads if not _t.is_alive()]
411 finishedthreads = [_t for _t in threads if not _t.is_alive()]
401 for t in finishedthreads:
412 for t in finishedthreads:
402 if t.exception is not None:
413 if t.exception is not None:
403 raise t.exception
414 raise t.exception
404 threads.remove(t)
415 threads.remove(t)
405 except (Exception, KeyboardInterrupt): # re-raises
416 except (Exception, KeyboardInterrupt): # re-raises
406 trykillworkers()
417 trykillworkers()
407 raise
418 raise
408 while not resultqueue.empty():
419 while not resultqueue.empty():
409 res = resultqueue.get()
420 res = resultqueue.get()
410 if hasretval and res[0]:
421 if hasretval and res[0]:
411 retval.update(res[1])
422 retval.update(res[1])
412 else:
423 else:
413 yield res
424 yield res
414 if hasretval:
425 if hasretval:
415 yield True, retval
426 yield True, retval
416
427
417
428
418 if pycompat.iswindows:
429 if pycompat.iswindows:
419 _platformworker = _windowsworker
430 _platformworker = _windowsworker
420 else:
431 else:
421 _platformworker = _posixworker
432 _platformworker = _posixworker
422 _exitstatus = _posixexitstatus
433 _exitstatus = _posixexitstatus
423
434
424
435
425 def partition(lst, nslices):
436 def partition(lst, nslices):
426 """partition a list into N slices of roughly equal size
437 """partition a list into N slices of roughly equal size
427
438
428 The current strategy takes every Nth element from the input. If
439 The current strategy takes every Nth element from the input. If
429 we ever write workers that need to preserve grouping in input
440 we ever write workers that need to preserve grouping in input
430 we should consider allowing callers to specify a partition strategy.
441 we should consider allowing callers to specify a partition strategy.
431
442
432 mpm is not a fan of this partitioning strategy when files are involved.
443 mpm is not a fan of this partitioning strategy when files are involved.
433 In his words:
444 In his words:
434
445
435 Single-threaded Mercurial makes a point of creating and visiting
446 Single-threaded Mercurial makes a point of creating and visiting
436 files in a fixed order (alphabetical). When creating files in order,
447 files in a fixed order (alphabetical). When creating files in order,
437 a typical filesystem is likely to allocate them on nearby regions on
448 a typical filesystem is likely to allocate them on nearby regions on
438 disk. Thus, when revisiting in the same order, locality is maximized
449 disk. Thus, when revisiting in the same order, locality is maximized
439 and various forms of OS and disk-level caching and read-ahead get a
450 and various forms of OS and disk-level caching and read-ahead get a
440 chance to work.
451 chance to work.
441
452
442 This effect can be quite significant on spinning disks. I discovered it
453 This effect can be quite significant on spinning disks. I discovered it
443 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
454 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
444 Tarring a repo and copying it to another disk effectively randomized
455 Tarring a repo and copying it to another disk effectively randomized
445 the revlog ordering on disk by sorting the revlogs by hash and suddenly
456 the revlog ordering on disk by sorting the revlogs by hash and suddenly
446 performance of my kernel checkout benchmark dropped by ~10x because the
457 performance of my kernel checkout benchmark dropped by ~10x because the
447 "working set" of sectors visited no longer fit in the drive's cache and
458 "working set" of sectors visited no longer fit in the drive's cache and
448 the workload switched from streaming to random I/O.
459 the workload switched from streaming to random I/O.
449
460
450 What we should really be doing is have workers read filenames from a
461 What we should really be doing is have workers read filenames from a
451 ordered queue. This preserves locality and also keeps any worker from
462 ordered queue. This preserves locality and also keeps any worker from
452 getting more than one file out of balance.
463 getting more than one file out of balance.
453 """
464 """
454 for i in range(nslices):
465 for i in range(nslices):
455 yield lst[i::nslices]
466 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now