##// END OF EJS Templates
worker: adapt _blockingreader to work around a python3.8.[0-1] bug (issue6444)...
Matt Harbison -
r50103:2fe4efaa stable
parent child Browse files
Show More
@@ -1,468 +1,483 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.ispy3:
68 if pycompat.ispy3:
69
69
70 def ismainthread():
70 def ismainthread():
71 return threading.current_thread() == threading.main_thread()
71 return threading.current_thread() == threading.main_thread()
72
72
73 class _blockingreader(object):
73 class _blockingreader(object):
74 def __init__(self, wrapped):
74 def __init__(self, wrapped):
75 self._wrapped = wrapped
75 self._wrapped = wrapped
76
76
77 # Do NOT implement readinto() by making it delegate to
77 # Do NOT implement readinto() by making it delegate to
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
79 # with just read() and readline(), so we don't need to implement it.
79 # with just read() and readline(), so we don't need to implement it.
80
80
81 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
82
83 # This is required for python 3.8, prior to 3.8.2. See issue6444.
84 def readinto(self, b):
85 pos = 0
86 size = len(b)
87
88 while pos < size:
89 ret = self._wrapped.readinto(b[pos:])
90 if not ret:
91 break
92 pos += ret
93
94 return pos
95
81 def readline(self):
96 def readline(self):
82 return self._wrapped.readline()
97 return self._wrapped.readline()
83
98
84 # issue multiple reads until size is fulfilled
99 # issue multiple reads until size is fulfilled
85 def read(self, size=-1):
100 def read(self, size=-1):
86 if size < 0:
101 if size < 0:
87 return self._wrapped.readall()
102 return self._wrapped.readall()
88
103
89 buf = bytearray(size)
104 buf = bytearray(size)
90 view = memoryview(buf)
105 view = memoryview(buf)
91 pos = 0
106 pos = 0
92
107
93 while pos < size:
108 while pos < size:
94 ret = self._wrapped.readinto(view[pos:])
109 ret = self._wrapped.readinto(view[pos:])
95 if not ret:
110 if not ret:
96 break
111 break
97 pos += ret
112 pos += ret
98
113
99 del view
114 del view
100 del buf[pos:]
115 del buf[pos:]
101 return bytes(buf)
116 return bytes(buf)
102
117
103
118
104 else:
119 else:
105
120
106 def ismainthread():
121 def ismainthread():
107 # pytype: disable=module-attr
122 # pytype: disable=module-attr
108 return isinstance(threading.current_thread(), threading._MainThread)
123 return isinstance(threading.current_thread(), threading._MainThread)
109 # pytype: enable=module-attr
124 # pytype: enable=module-attr
110
125
111 def _blockingreader(wrapped):
126 def _blockingreader(wrapped):
112 return wrapped
127 return wrapped
113
128
114
129
115 if pycompat.isposix or pycompat.iswindows:
130 if pycompat.isposix or pycompat.iswindows:
116 _STARTUP_COST = 0.01
131 _STARTUP_COST = 0.01
117 # The Windows worker is thread based. If tasks are CPU bound, threads
132 # The Windows worker is thread based. If tasks are CPU bound, threads
118 # in the presence of the GIL result in excessive context switching and
133 # in the presence of the GIL result in excessive context switching and
119 # this overhead can slow down execution.
134 # this overhead can slow down execution.
120 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
135 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
121 else:
136 else:
122 _STARTUP_COST = 1e30
137 _STARTUP_COST = 1e30
123 _DISALLOW_THREAD_UNSAFE = False
138 _DISALLOW_THREAD_UNSAFE = False
124
139
125
140
126 def worthwhile(ui, costperop, nops, threadsafe=True):
141 def worthwhile(ui, costperop, nops, threadsafe=True):
127 """try to determine whether the benefit of multiple processes can
142 """try to determine whether the benefit of multiple processes can
128 outweigh the cost of starting them"""
143 outweigh the cost of starting them"""
129
144
130 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
145 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
131 return False
146 return False
132
147
133 linear = costperop * nops
148 linear = costperop * nops
134 workers = _numworkers(ui)
149 workers = _numworkers(ui)
135 benefit = linear - (_STARTUP_COST * workers + linear / workers)
150 benefit = linear - (_STARTUP_COST * workers + linear / workers)
136 return benefit >= 0.15
151 return benefit >= 0.15
137
152
138
153
139 def worker(
154 def worker(
140 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
155 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
141 ):
156 ):
142 """run a function, possibly in parallel in multiple worker
157 """run a function, possibly in parallel in multiple worker
143 processes.
158 processes.
144
159
145 returns a progress iterator
160 returns a progress iterator
146
161
147 costperarg - cost of a single task
162 costperarg - cost of a single task
148
163
149 func - function to run. It is expected to return a progress iterator.
164 func - function to run. It is expected to return a progress iterator.
150
165
151 staticargs - arguments to pass to every invocation of the function
166 staticargs - arguments to pass to every invocation of the function
152
167
153 args - arguments to split into chunks, to pass to individual
168 args - arguments to split into chunks, to pass to individual
154 workers
169 workers
155
170
156 hasretval - when True, func and the current function return an progress
171 hasretval - when True, func and the current function return an progress
157 iterator then a dict (encoded as an iterator that yield many (False, ..)
172 iterator then a dict (encoded as an iterator that yield many (False, ..)
158 then a (True, dict)). The dicts are joined in some arbitrary order, so
173 then a (True, dict)). The dicts are joined in some arbitrary order, so
159 overlapping keys are a bad idea.
174 overlapping keys are a bad idea.
160
175
161 threadsafe - whether work items are thread safe and can be executed using
176 threadsafe - whether work items are thread safe and can be executed using
162 a thread-based worker. Should be disabled for CPU heavy tasks that don't
177 a thread-based worker. Should be disabled for CPU heavy tasks that don't
163 release the GIL.
178 release the GIL.
164 """
179 """
165 enabled = ui.configbool(b'worker', b'enabled')
180 enabled = ui.configbool(b'worker', b'enabled')
166 if enabled and _platformworker is _posixworker and not ismainthread():
181 if enabled and _platformworker is _posixworker and not ismainthread():
167 # The POSIX worker has to install a handler for SIGCHLD.
182 # The POSIX worker has to install a handler for SIGCHLD.
168 # Python up to 3.9 only allows this in the main thread.
183 # Python up to 3.9 only allows this in the main thread.
169 enabled = False
184 enabled = False
170
185
171 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
186 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
172 return _platformworker(ui, func, staticargs, args, hasretval)
187 return _platformworker(ui, func, staticargs, args, hasretval)
173 return func(*staticargs + (args,))
188 return func(*staticargs + (args,))
174
189
175
190
176 def _posixworker(ui, func, staticargs, args, hasretval):
191 def _posixworker(ui, func, staticargs, args, hasretval):
177 workers = _numworkers(ui)
192 workers = _numworkers(ui)
178 oldhandler = signal.getsignal(signal.SIGINT)
193 oldhandler = signal.getsignal(signal.SIGINT)
179 signal.signal(signal.SIGINT, signal.SIG_IGN)
194 signal.signal(signal.SIGINT, signal.SIG_IGN)
180 pids, problem = set(), [0]
195 pids, problem = set(), [0]
181
196
182 def killworkers():
197 def killworkers():
183 # unregister SIGCHLD handler as all children will be killed. This
198 # unregister SIGCHLD handler as all children will be killed. This
184 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
199 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
185 # could be updated while iterating, which would cause inconsistency.
200 # could be updated while iterating, which would cause inconsistency.
186 signal.signal(signal.SIGCHLD, oldchldhandler)
201 signal.signal(signal.SIGCHLD, oldchldhandler)
187 # if one worker bails, there's no good reason to wait for the rest
202 # if one worker bails, there's no good reason to wait for the rest
188 for p in pids:
203 for p in pids:
189 try:
204 try:
190 os.kill(p, signal.SIGTERM)
205 os.kill(p, signal.SIGTERM)
191 except OSError as err:
206 except OSError as err:
192 if err.errno != errno.ESRCH:
207 if err.errno != errno.ESRCH:
193 raise
208 raise
194
209
195 def waitforworkers(blocking=True):
210 def waitforworkers(blocking=True):
196 for pid in pids.copy():
211 for pid in pids.copy():
197 p = st = 0
212 p = st = 0
198 while True:
213 while True:
199 try:
214 try:
200 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
215 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
201 break
216 break
202 except OSError as e:
217 except OSError as e:
203 if e.errno == errno.EINTR:
218 if e.errno == errno.EINTR:
204 continue
219 continue
205 elif e.errno == errno.ECHILD:
220 elif e.errno == errno.ECHILD:
206 # child would already be reaped, but pids yet been
221 # child would already be reaped, but pids yet been
207 # updated (maybe interrupted just after waitpid)
222 # updated (maybe interrupted just after waitpid)
208 pids.discard(pid)
223 pids.discard(pid)
209 break
224 break
210 else:
225 else:
211 raise
226 raise
212 if not p:
227 if not p:
213 # skip subsequent steps, because child process should
228 # skip subsequent steps, because child process should
214 # be still running in this case
229 # be still running in this case
215 continue
230 continue
216 pids.discard(p)
231 pids.discard(p)
217 st = _exitstatus(st)
232 st = _exitstatus(st)
218 if st and not problem[0]:
233 if st and not problem[0]:
219 problem[0] = st
234 problem[0] = st
220
235
221 def sigchldhandler(signum, frame):
236 def sigchldhandler(signum, frame):
222 waitforworkers(blocking=False)
237 waitforworkers(blocking=False)
223 if problem[0]:
238 if problem[0]:
224 killworkers()
239 killworkers()
225
240
226 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
241 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
227 ui.flush()
242 ui.flush()
228 parentpid = os.getpid()
243 parentpid = os.getpid()
229 pipes = []
244 pipes = []
230 retval = {}
245 retval = {}
231 for pargs in partition(args, min(workers, len(args))):
246 for pargs in partition(args, min(workers, len(args))):
232 # Every worker gets its own pipe to send results on, so we don't have to
247 # Every worker gets its own pipe to send results on, so we don't have to
233 # implement atomic writes larger than PIPE_BUF. Each forked process has
248 # implement atomic writes larger than PIPE_BUF. Each forked process has
234 # its own pipe's descriptors in the local variables, and the parent
249 # its own pipe's descriptors in the local variables, and the parent
235 # process has the full list of pipe descriptors (and it doesn't really
250 # process has the full list of pipe descriptors (and it doesn't really
236 # care what order they're in).
251 # care what order they're in).
237 rfd, wfd = os.pipe()
252 rfd, wfd = os.pipe()
238 pipes.append((rfd, wfd))
253 pipes.append((rfd, wfd))
239 # make sure we use os._exit in all worker code paths. otherwise the
254 # make sure we use os._exit in all worker code paths. otherwise the
240 # worker may do some clean-ups which could cause surprises like
255 # worker may do some clean-ups which could cause surprises like
241 # deadlock. see sshpeer.cleanup for example.
256 # deadlock. see sshpeer.cleanup for example.
242 # override error handling *before* fork. this is necessary because
257 # override error handling *before* fork. this is necessary because
243 # exception (signal) may arrive after fork, before "pid =" assignment
258 # exception (signal) may arrive after fork, before "pid =" assignment
244 # completes, and other exception handler (dispatch.py) can lead to
259 # completes, and other exception handler (dispatch.py) can lead to
245 # unexpected code path without os._exit.
260 # unexpected code path without os._exit.
246 ret = -1
261 ret = -1
247 try:
262 try:
248 pid = os.fork()
263 pid = os.fork()
249 if pid == 0:
264 if pid == 0:
250 signal.signal(signal.SIGINT, oldhandler)
265 signal.signal(signal.SIGINT, oldhandler)
251 signal.signal(signal.SIGCHLD, oldchldhandler)
266 signal.signal(signal.SIGCHLD, oldchldhandler)
252
267
253 def workerfunc():
268 def workerfunc():
254 for r, w in pipes[:-1]:
269 for r, w in pipes[:-1]:
255 os.close(r)
270 os.close(r)
256 os.close(w)
271 os.close(w)
257 os.close(rfd)
272 os.close(rfd)
258 for result in func(*(staticargs + (pargs,))):
273 for result in func(*(staticargs + (pargs,))):
259 os.write(wfd, util.pickle.dumps(result))
274 os.write(wfd, util.pickle.dumps(result))
260 return 0
275 return 0
261
276
262 ret = scmutil.callcatch(ui, workerfunc)
277 ret = scmutil.callcatch(ui, workerfunc)
263 except: # parent re-raises, child never returns
278 except: # parent re-raises, child never returns
264 if os.getpid() == parentpid:
279 if os.getpid() == parentpid:
265 raise
280 raise
266 exctype = sys.exc_info()[0]
281 exctype = sys.exc_info()[0]
267 force = not issubclass(exctype, KeyboardInterrupt)
282 force = not issubclass(exctype, KeyboardInterrupt)
268 ui.traceback(force=force)
283 ui.traceback(force=force)
269 finally:
284 finally:
270 if os.getpid() != parentpid:
285 if os.getpid() != parentpid:
271 try:
286 try:
272 ui.flush()
287 ui.flush()
273 except: # never returns, no re-raises
288 except: # never returns, no re-raises
274 pass
289 pass
275 finally:
290 finally:
276 os._exit(ret & 255)
291 os._exit(ret & 255)
277 pids.add(pid)
292 pids.add(pid)
278 selector = selectors.DefaultSelector()
293 selector = selectors.DefaultSelector()
279 for rfd, wfd in pipes:
294 for rfd, wfd in pipes:
280 os.close(wfd)
295 os.close(wfd)
281 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
296 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
282
297
283 def cleanup():
298 def cleanup():
284 signal.signal(signal.SIGINT, oldhandler)
299 signal.signal(signal.SIGINT, oldhandler)
285 waitforworkers()
300 waitforworkers()
286 signal.signal(signal.SIGCHLD, oldchldhandler)
301 signal.signal(signal.SIGCHLD, oldchldhandler)
287 selector.close()
302 selector.close()
288 return problem[0]
303 return problem[0]
289
304
290 try:
305 try:
291 openpipes = len(pipes)
306 openpipes = len(pipes)
292 while openpipes > 0:
307 while openpipes > 0:
293 for key, events in selector.select():
308 for key, events in selector.select():
294 try:
309 try:
295 res = util.pickle.load(_blockingreader(key.fileobj))
310 res = util.pickle.load(_blockingreader(key.fileobj))
296 if hasretval and res[0]:
311 if hasretval and res[0]:
297 retval.update(res[1])
312 retval.update(res[1])
298 else:
313 else:
299 yield res
314 yield res
300 except EOFError:
315 except EOFError:
301 selector.unregister(key.fileobj)
316 selector.unregister(key.fileobj)
302 key.fileobj.close()
317 key.fileobj.close()
303 openpipes -= 1
318 openpipes -= 1
304 except IOError as e:
319 except IOError as e:
305 if e.errno == errno.EINTR:
320 if e.errno == errno.EINTR:
306 continue
321 continue
307 raise
322 raise
308 except: # re-raises
323 except: # re-raises
309 killworkers()
324 killworkers()
310 cleanup()
325 cleanup()
311 raise
326 raise
312 status = cleanup()
327 status = cleanup()
313 if status:
328 if status:
314 if status < 0:
329 if status < 0:
315 os.kill(os.getpid(), -status)
330 os.kill(os.getpid(), -status)
316 raise error.WorkerError(status)
331 raise error.WorkerError(status)
317 if hasretval:
332 if hasretval:
318 yield True, retval
333 yield True, retval
319
334
320
335
321 def _posixexitstatus(code):
336 def _posixexitstatus(code):
322 """convert a posix exit status into the same form returned by
337 """convert a posix exit status into the same form returned by
323 os.spawnv
338 os.spawnv
324
339
325 returns None if the process was stopped instead of exiting"""
340 returns None if the process was stopped instead of exiting"""
326 if os.WIFEXITED(code):
341 if os.WIFEXITED(code):
327 return os.WEXITSTATUS(code)
342 return os.WEXITSTATUS(code)
328 elif os.WIFSIGNALED(code):
343 elif os.WIFSIGNALED(code):
329 return -(os.WTERMSIG(code))
344 return -(os.WTERMSIG(code))
330
345
331
346
332 def _windowsworker(ui, func, staticargs, args, hasretval):
347 def _windowsworker(ui, func, staticargs, args, hasretval):
333 class Worker(threading.Thread):
348 class Worker(threading.Thread):
334 def __init__(
349 def __init__(
335 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
350 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
336 ):
351 ):
337 threading.Thread.__init__(self, *args, **kwargs)
352 threading.Thread.__init__(self, *args, **kwargs)
338 self._taskqueue = taskqueue
353 self._taskqueue = taskqueue
339 self._resultqueue = resultqueue
354 self._resultqueue = resultqueue
340 self._func = func
355 self._func = func
341 self._staticargs = staticargs
356 self._staticargs = staticargs
342 self._interrupted = False
357 self._interrupted = False
343 self.daemon = True
358 self.daemon = True
344 self.exception = None
359 self.exception = None
345
360
346 def interrupt(self):
361 def interrupt(self):
347 self._interrupted = True
362 self._interrupted = True
348
363
349 def run(self):
364 def run(self):
350 try:
365 try:
351 while not self._taskqueue.empty():
366 while not self._taskqueue.empty():
352 try:
367 try:
353 args = self._taskqueue.get_nowait()
368 args = self._taskqueue.get_nowait()
354 for res in self._func(*self._staticargs + (args,)):
369 for res in self._func(*self._staticargs + (args,)):
355 self._resultqueue.put(res)
370 self._resultqueue.put(res)
356 # threading doesn't provide a native way to
371 # threading doesn't provide a native way to
357 # interrupt execution. handle it manually at every
372 # interrupt execution. handle it manually at every
358 # iteration.
373 # iteration.
359 if self._interrupted:
374 if self._interrupted:
360 return
375 return
361 except pycompat.queue.Empty:
376 except pycompat.queue.Empty:
362 break
377 break
363 except Exception as e:
378 except Exception as e:
364 # store the exception such that the main thread can resurface
379 # store the exception such that the main thread can resurface
365 # it as if the func was running without workers.
380 # it as if the func was running without workers.
366 self.exception = e
381 self.exception = e
367 raise
382 raise
368
383
369 threads = []
384 threads = []
370
385
371 def trykillworkers():
386 def trykillworkers():
372 # Allow up to 1 second to clean worker threads nicely
387 # Allow up to 1 second to clean worker threads nicely
373 cleanupend = time.time() + 1
388 cleanupend = time.time() + 1
374 for t in threads:
389 for t in threads:
375 t.interrupt()
390 t.interrupt()
376 for t in threads:
391 for t in threads:
377 remainingtime = cleanupend - time.time()
392 remainingtime = cleanupend - time.time()
378 t.join(remainingtime)
393 t.join(remainingtime)
379 if t.is_alive():
394 if t.is_alive():
380 # pass over the workers joining failure. it is more
395 # pass over the workers joining failure. it is more
381 # important to surface the inital exception than the
396 # important to surface the inital exception than the
382 # fact that one of workers may be processing a large
397 # fact that one of workers may be processing a large
383 # task and does not get to handle the interruption.
398 # task and does not get to handle the interruption.
384 ui.warn(
399 ui.warn(
385 _(
400 _(
386 b"failed to kill worker threads while "
401 b"failed to kill worker threads while "
387 b"handling an exception\n"
402 b"handling an exception\n"
388 )
403 )
389 )
404 )
390 return
405 return
391
406
392 workers = _numworkers(ui)
407 workers = _numworkers(ui)
393 resultqueue = pycompat.queue.Queue()
408 resultqueue = pycompat.queue.Queue()
394 taskqueue = pycompat.queue.Queue()
409 taskqueue = pycompat.queue.Queue()
395 retval = {}
410 retval = {}
396 # partition work to more pieces than workers to minimize the chance
411 # partition work to more pieces than workers to minimize the chance
397 # of uneven distribution of large tasks between the workers
412 # of uneven distribution of large tasks between the workers
398 for pargs in partition(args, workers * 20):
413 for pargs in partition(args, workers * 20):
399 taskqueue.put(pargs)
414 taskqueue.put(pargs)
400 for _i in range(workers):
415 for _i in range(workers):
401 t = Worker(taskqueue, resultqueue, func, staticargs)
416 t = Worker(taskqueue, resultqueue, func, staticargs)
402 threads.append(t)
417 threads.append(t)
403 t.start()
418 t.start()
404 try:
419 try:
405 while len(threads) > 0:
420 while len(threads) > 0:
406 while not resultqueue.empty():
421 while not resultqueue.empty():
407 res = resultqueue.get()
422 res = resultqueue.get()
408 if hasretval and res[0]:
423 if hasretval and res[0]:
409 retval.update(res[1])
424 retval.update(res[1])
410 else:
425 else:
411 yield res
426 yield res
412 threads[0].join(0.05)
427 threads[0].join(0.05)
413 finishedthreads = [_t for _t in threads if not _t.is_alive()]
428 finishedthreads = [_t for _t in threads if not _t.is_alive()]
414 for t in finishedthreads:
429 for t in finishedthreads:
415 if t.exception is not None:
430 if t.exception is not None:
416 raise t.exception
431 raise t.exception
417 threads.remove(t)
432 threads.remove(t)
418 except (Exception, KeyboardInterrupt): # re-raises
433 except (Exception, KeyboardInterrupt): # re-raises
419 trykillworkers()
434 trykillworkers()
420 raise
435 raise
421 while not resultqueue.empty():
436 while not resultqueue.empty():
422 res = resultqueue.get()
437 res = resultqueue.get()
423 if hasretval and res[0]:
438 if hasretval and res[0]:
424 retval.update(res[1])
439 retval.update(res[1])
425 else:
440 else:
426 yield res
441 yield res
427 if hasretval:
442 if hasretval:
428 yield True, retval
443 yield True, retval
429
444
430
445
431 if pycompat.iswindows:
446 if pycompat.iswindows:
432 _platformworker = _windowsworker
447 _platformworker = _windowsworker
433 else:
448 else:
434 _platformworker = _posixworker
449 _platformworker = _posixworker
435 _exitstatus = _posixexitstatus
450 _exitstatus = _posixexitstatus
436
451
437
452
438 def partition(lst, nslices):
453 def partition(lst, nslices):
439 """partition a list into N slices of roughly equal size
454 """partition a list into N slices of roughly equal size
440
455
441 The current strategy takes every Nth element from the input. If
456 The current strategy takes every Nth element from the input. If
442 we ever write workers that need to preserve grouping in input
457 we ever write workers that need to preserve grouping in input
443 we should consider allowing callers to specify a partition strategy.
458 we should consider allowing callers to specify a partition strategy.
444
459
445 olivia is not a fan of this partitioning strategy when files are involved.
460 olivia is not a fan of this partitioning strategy when files are involved.
446 In his words:
461 In his words:
447
462
448 Single-threaded Mercurial makes a point of creating and visiting
463 Single-threaded Mercurial makes a point of creating and visiting
449 files in a fixed order (alphabetical). When creating files in order,
464 files in a fixed order (alphabetical). When creating files in order,
450 a typical filesystem is likely to allocate them on nearby regions on
465 a typical filesystem is likely to allocate them on nearby regions on
451 disk. Thus, when revisiting in the same order, locality is maximized
466 disk. Thus, when revisiting in the same order, locality is maximized
452 and various forms of OS and disk-level caching and read-ahead get a
467 and various forms of OS and disk-level caching and read-ahead get a
453 chance to work.
468 chance to work.
454
469
455 This effect can be quite significant on spinning disks. I discovered it
470 This effect can be quite significant on spinning disks. I discovered it
456 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
471 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
457 Tarring a repo and copying it to another disk effectively randomized
472 Tarring a repo and copying it to another disk effectively randomized
458 the revlog ordering on disk by sorting the revlogs by hash and suddenly
473 the revlog ordering on disk by sorting the revlogs by hash and suddenly
459 performance of my kernel checkout benchmark dropped by ~10x because the
474 performance of my kernel checkout benchmark dropped by ~10x because the
460 "working set" of sectors visited no longer fit in the drive's cache and
475 "working set" of sectors visited no longer fit in the drive's cache and
461 the workload switched from streaming to random I/O.
476 the workload switched from streaming to random I/O.
462
477
463 What we should really be doing is have workers read filenames from a
478 What we should really be doing is have workers read filenames from a
464 ordered queue. This preserves locality and also keeps any worker from
479 ordered queue. This preserves locality and also keeps any worker from
465 getting more than one file out of balance.
480 getting more than one file out of balance.
466 """
481 """
467 for i in range(nslices):
482 for i in range(nslices):
468 yield lst[i::nslices]
483 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now