##// END OF EJS Templates
worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob -
r50125:5d28246b default
parent child Browse files
Show More
@@ -1,474 +1,478 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader:
71 class _blockingreader:
72 def __init__(self, wrapped):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
73 self._wrapped = wrapped
74
74
75 # Do NOT implement readinto() by making it delegate to
75 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
77 # with just read() and readline(), so we don't need to implement it.
78
78
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
80
80
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
82 def readinto(self, b):
82 def readinto(self, b):
83 pos = 0
83 pos = 0
84 size = len(b)
84 size = len(b)
85
85
86 while pos < size:
86 while pos < size:
87 ret = self._wrapped.readinto(b[pos:])
87 ret = self._wrapped.readinto(b[pos:])
88 if not ret:
88 if not ret:
89 break
89 break
90 pos += ret
90 pos += ret
91
91
92 return pos
92 return pos
93
93
94 def readline(self):
94 def readline(self):
95 return self._wrapped.readline()
95 return self._wrapped.readline()
96
96
97 # issue multiple reads until size is fulfilled
97 # issue multiple reads until size is fulfilled
98 def read(self, size=-1):
98 def read(self, size=-1):
99 if size < 0:
99 if size < 0:
100 return self._wrapped.readall()
100 return self._wrapped.readall()
101
101
102 buf = bytearray(size)
102 buf = bytearray(size)
103 view = memoryview(buf)
103 view = memoryview(buf)
104 pos = 0
104 pos = 0
105
105
106 while pos < size:
106 while pos < size:
107 ret = self._wrapped.readinto(view[pos:])
107 ret = self._wrapped.readinto(view[pos:])
108 if not ret:
108 if not ret:
109 break
109 break
110 pos += ret
110 pos += ret
111
111
112 del view
112 del view
113 del buf[pos:]
113 del buf[pos:]
114 return bytes(buf)
114 return bytes(buf)
115
115
116
116
117 if pycompat.isposix or pycompat.iswindows:
117 if pycompat.isposix or pycompat.iswindows:
118 _STARTUP_COST = 0.01
118 _STARTUP_COST = 0.01
119 # The Windows worker is thread based. If tasks are CPU bound, threads
119 # The Windows worker is thread based. If tasks are CPU bound, threads
120 # in the presence of the GIL result in excessive context switching and
120 # in the presence of the GIL result in excessive context switching and
121 # this overhead can slow down execution.
121 # this overhead can slow down execution.
122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
123 else:
123 else:
124 _STARTUP_COST = 1e30
124 _STARTUP_COST = 1e30
125 _DISALLOW_THREAD_UNSAFE = False
125 _DISALLOW_THREAD_UNSAFE = False
126
126
127
127
128 def worthwhile(ui, costperop, nops, threadsafe=True):
128 def worthwhile(ui, costperop, nops, threadsafe=True):
129 """try to determine whether the benefit of multiple processes can
129 """try to determine whether the benefit of multiple processes can
130 outweigh the cost of starting them"""
130 outweigh the cost of starting them"""
131
131
132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
133 return False
133 return False
134
134
135 linear = costperop * nops
135 linear = costperop * nops
136 workers = _numworkers(ui)
136 workers = _numworkers(ui)
137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
138 return benefit >= 0.15
138 return benefit >= 0.15
139
139
140
140
141 def worker(
141 def worker(
142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
143 ):
143 ):
144 """run a function, possibly in parallel in multiple worker
144 """run a function, possibly in parallel in multiple worker
145 processes.
145 processes.
146
146
147 returns a progress iterator
147 returns a progress iterator
148
148
149 costperarg - cost of a single task
149 costperarg - cost of a single task
150
150
151 func - function to run. It is expected to return a progress iterator.
151 func - function to run. It is expected to return a progress iterator.
152
152
153 staticargs - arguments to pass to every invocation of the function
153 staticargs - arguments to pass to every invocation of the function
154
154
155 args - arguments to split into chunks, to pass to individual
155 args - arguments to split into chunks, to pass to individual
156 workers
156 workers
157
157
158 hasretval - when True, func and the current function return an progress
158 hasretval - when True, func and the current function return an progress
159 iterator then a dict (encoded as an iterator that yield many (False, ..)
159 iterator then a dict (encoded as an iterator that yield many (False, ..)
160 then a (True, dict)). The dicts are joined in some arbitrary order, so
160 then a (True, dict)). The dicts are joined in some arbitrary order, so
161 overlapping keys are a bad idea.
161 overlapping keys are a bad idea.
162
162
163 threadsafe - whether work items are thread safe and can be executed using
163 threadsafe - whether work items are thread safe and can be executed using
164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
165 release the GIL.
165 release the GIL.
166 """
166 """
167 enabled = ui.configbool(b'worker', b'enabled')
167 enabled = ui.configbool(b'worker', b'enabled')
168 if enabled and _platformworker is _posixworker and not ismainthread():
168 if enabled and _platformworker is _posixworker and not ismainthread():
169 # The POSIX worker has to install a handler for SIGCHLD.
169 # The POSIX worker has to install a handler for SIGCHLD.
170 # Python up to 3.9 only allows this in the main thread.
170 # Python up to 3.9 only allows this in the main thread.
171 enabled = False
171 enabled = False
172
172
173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
174 return _platformworker(ui, func, staticargs, args, hasretval)
174 return _platformworker(ui, func, staticargs, args, hasretval)
175 return func(*staticargs + (args,))
175 return func(*staticargs + (args,))
176
176
177
177
178 def _posixworker(ui, func, staticargs, args, hasretval):
178 def _posixworker(ui, func, staticargs, args, hasretval):
179 workers = _numworkers(ui)
179 workers = _numworkers(ui)
180 oldhandler = signal.getsignal(signal.SIGINT)
180 oldhandler = signal.getsignal(signal.SIGINT)
181 signal.signal(signal.SIGINT, signal.SIG_IGN)
181 signal.signal(signal.SIGINT, signal.SIG_IGN)
182 pids, problem = set(), [0]
182 pids, problem = set(), [0]
183
183
184 def killworkers():
184 def killworkers():
185 # unregister SIGCHLD handler as all children will be killed. This
185 # unregister SIGCHLD handler as all children will be killed. This
186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
187 # could be updated while iterating, which would cause inconsistency.
187 # could be updated while iterating, which would cause inconsistency.
188 signal.signal(signal.SIGCHLD, oldchldhandler)
188 signal.signal(signal.SIGCHLD, oldchldhandler)
189 # if one worker bails, there's no good reason to wait for the rest
189 # if one worker bails, there's no good reason to wait for the rest
190 for p in pids:
190 for p in pids:
191 try:
191 try:
192 os.kill(p, signal.SIGTERM)
192 os.kill(p, signal.SIGTERM)
193 except OSError as err:
193 except OSError as err:
194 if err.errno != errno.ESRCH:
194 if err.errno != errno.ESRCH:
195 raise
195 raise
196
196
197 def waitforworkers(blocking=True):
197 def waitforworkers(blocking=True):
198 for pid in pids.copy():
198 for pid in pids.copy():
199 p = st = 0
199 p = st = 0
200 while True:
200 while True:
201 try:
201 try:
202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
203 break
203 break
204 except OSError as e:
204 except OSError as e:
205 if e.errno == errno.EINTR:
205 if e.errno == errno.EINTR:
206 continue
206 continue
207 elif e.errno == errno.ECHILD:
207 elif e.errno == errno.ECHILD:
208 # child would already be reaped, but pids yet been
208 # child would already be reaped, but pids yet been
209 # updated (maybe interrupted just after waitpid)
209 # updated (maybe interrupted just after waitpid)
210 pids.discard(pid)
210 pids.discard(pid)
211 break
211 break
212 else:
212 else:
213 raise
213 raise
214 if not p:
214 if not p:
215 # skip subsequent steps, because child process should
215 # skip subsequent steps, because child process should
216 # be still running in this case
216 # be still running in this case
217 continue
217 continue
218 pids.discard(p)
218 pids.discard(p)
219 st = _exitstatus(st)
219 st = _exitstatus(st)
220 if st and not problem[0]:
220 if st and not problem[0]:
221 problem[0] = st
221 problem[0] = st
222
222
223 def sigchldhandler(signum, frame):
223 def sigchldhandler(signum, frame):
224 waitforworkers(blocking=False)
224 waitforworkers(blocking=False)
225 if problem[0]:
225 if problem[0]:
226 killworkers()
226 killworkers()
227
227
228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
229 ui.flush()
229 ui.flush()
230 parentpid = os.getpid()
230 parentpid = os.getpid()
231 pipes = []
231 pipes = []
232 retval = {}
232 retval = {}
233 for pargs in partition(args, min(workers, len(args))):
233 for pargs in partition(args, min(workers, len(args))):
234 # Every worker gets its own pipe to send results on, so we don't have to
234 # Every worker gets its own pipe to send results on, so we don't have to
235 # implement atomic writes larger than PIPE_BUF. Each forked process has
235 # implement atomic writes larger than PIPE_BUF. Each forked process has
236 # its own pipe's descriptors in the local variables, and the parent
236 # its own pipe's descriptors in the local variables, and the parent
237 # process has the full list of pipe descriptors (and it doesn't really
237 # process has the full list of pipe descriptors (and it doesn't really
238 # care what order they're in).
238 # care what order they're in).
239 rfd, wfd = os.pipe()
239 rfd, wfd = os.pipe()
240 pipes.append((rfd, wfd))
240 pipes.append((rfd, wfd))
241 # make sure we use os._exit in all worker code paths. otherwise the
241 # make sure we use os._exit in all worker code paths. otherwise the
242 # worker may do some clean-ups which could cause surprises like
242 # worker may do some clean-ups which could cause surprises like
243 # deadlock. see sshpeer.cleanup for example.
243 # deadlock. see sshpeer.cleanup for example.
244 # override error handling *before* fork. this is necessary because
244 # override error handling *before* fork. this is necessary because
245 # exception (signal) may arrive after fork, before "pid =" assignment
245 # exception (signal) may arrive after fork, before "pid =" assignment
246 # completes, and other exception handler (dispatch.py) can lead to
246 # completes, and other exception handler (dispatch.py) can lead to
247 # unexpected code path without os._exit.
247 # unexpected code path without os._exit.
248 ret = -1
248 ret = -1
249 try:
249 try:
250 pid = os.fork()
250 pid = os.fork()
251 if pid == 0:
251 if pid == 0:
252 signal.signal(signal.SIGINT, oldhandler)
252 signal.signal(signal.SIGINT, oldhandler)
253 signal.signal(signal.SIGCHLD, oldchldhandler)
253 signal.signal(signal.SIGCHLD, oldchldhandler)
254
254
255 def workerfunc():
255 def workerfunc():
256 for r, w in pipes[:-1]:
256 for r, w in pipes[:-1]:
257 os.close(r)
257 os.close(r)
258 os.close(w)
258 os.close(w)
259 os.close(rfd)
259 os.close(rfd)
260 for result in func(*(staticargs + (pargs,))):
260 for result in func(*(staticargs + (pargs,))):
261 os.write(wfd, pickle.dumps(result))
261 os.write(wfd, pickle.dumps(result))
262 return 0
262 return 0
263
263
264 ret = scmutil.callcatch(ui, workerfunc)
264 ret = scmutil.callcatch(ui, workerfunc)
265 except: # parent re-raises, child never returns
265 except: # parent re-raises, child never returns
266 if os.getpid() == parentpid:
266 if os.getpid() == parentpid:
267 raise
267 raise
268 exctype = sys.exc_info()[0]
268 exctype = sys.exc_info()[0]
269 force = not issubclass(exctype, KeyboardInterrupt)
269 force = not issubclass(exctype, KeyboardInterrupt)
270 ui.traceback(force=force)
270 ui.traceback(force=force)
271 finally:
271 finally:
272 if os.getpid() != parentpid:
272 if os.getpid() != parentpid:
273 try:
273 try:
274 ui.flush()
274 ui.flush()
275 except: # never returns, no re-raises
275 except: # never returns, no re-raises
276 pass
276 pass
277 finally:
277 finally:
278 os._exit(ret & 255)
278 os._exit(ret & 255)
279 pids.add(pid)
279 pids.add(pid)
280 selector = selectors.DefaultSelector()
280 selector = selectors.DefaultSelector()
281 for rfd, wfd in pipes:
281 for rfd, wfd in pipes:
282 os.close(wfd)
282 os.close(wfd)
283 # The stream has to be unbuffered. Otherwise, if all data is read from
284 # the raw file into the buffer, the selector thinks that the FD is not
285 # ready to read while pickle.load() could read from the buffer. This
286 # would delay the processing of readable items.
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
287 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
284
288
285 def cleanup():
289 def cleanup():
286 signal.signal(signal.SIGINT, oldhandler)
290 signal.signal(signal.SIGINT, oldhandler)
287 waitforworkers()
291 waitforworkers()
288 signal.signal(signal.SIGCHLD, oldchldhandler)
292 signal.signal(signal.SIGCHLD, oldchldhandler)
289 selector.close()
293 selector.close()
290 return problem[0]
294 return problem[0]
291
295
292 try:
296 try:
293 openpipes = len(pipes)
297 openpipes = len(pipes)
294 while openpipes > 0:
298 while openpipes > 0:
295 for key, events in selector.select():
299 for key, events in selector.select():
296 try:
300 try:
297 # The pytype error likely goes away on a modern version of
301 # The pytype error likely goes away on a modern version of
298 # pytype having a modern typeshed snapshot.
302 # pytype having a modern typeshed snapshot.
299 # pytype: disable=wrong-arg-types
303 # pytype: disable=wrong-arg-types
300 res = pickle.load(_blockingreader(key.fileobj))
304 res = pickle.load(_blockingreader(key.fileobj))
301 # pytype: enable=wrong-arg-types
305 # pytype: enable=wrong-arg-types
302 if hasretval and res[0]:
306 if hasretval and res[0]:
303 retval.update(res[1])
307 retval.update(res[1])
304 else:
308 else:
305 yield res
309 yield res
306 except EOFError:
310 except EOFError:
307 selector.unregister(key.fileobj)
311 selector.unregister(key.fileobj)
308 key.fileobj.close()
312 key.fileobj.close()
309 openpipes -= 1
313 openpipes -= 1
310 except IOError as e:
314 except IOError as e:
311 if e.errno == errno.EINTR:
315 if e.errno == errno.EINTR:
312 continue
316 continue
313 raise
317 raise
314 except: # re-raises
318 except: # re-raises
315 killworkers()
319 killworkers()
316 cleanup()
320 cleanup()
317 raise
321 raise
318 status = cleanup()
322 status = cleanup()
319 if status:
323 if status:
320 if status < 0:
324 if status < 0:
321 os.kill(os.getpid(), -status)
325 os.kill(os.getpid(), -status)
322 raise error.WorkerError(status)
326 raise error.WorkerError(status)
323 if hasretval:
327 if hasretval:
324 yield True, retval
328 yield True, retval
325
329
326
330
327 def _posixexitstatus(code):
331 def _posixexitstatus(code):
328 """convert a posix exit status into the same form returned by
332 """convert a posix exit status into the same form returned by
329 os.spawnv
333 os.spawnv
330
334
331 returns None if the process was stopped instead of exiting"""
335 returns None if the process was stopped instead of exiting"""
332 if os.WIFEXITED(code):
336 if os.WIFEXITED(code):
333 return os.WEXITSTATUS(code)
337 return os.WEXITSTATUS(code)
334 elif os.WIFSIGNALED(code):
338 elif os.WIFSIGNALED(code):
335 return -(os.WTERMSIG(code))
339 return -(os.WTERMSIG(code))
336
340
337
341
338 def _windowsworker(ui, func, staticargs, args, hasretval):
342 def _windowsworker(ui, func, staticargs, args, hasretval):
339 class Worker(threading.Thread):
343 class Worker(threading.Thread):
340 def __init__(
344 def __init__(
341 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
345 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
342 ):
346 ):
343 threading.Thread.__init__(self, *args, **kwargs)
347 threading.Thread.__init__(self, *args, **kwargs)
344 self._taskqueue = taskqueue
348 self._taskqueue = taskqueue
345 self._resultqueue = resultqueue
349 self._resultqueue = resultqueue
346 self._func = func
350 self._func = func
347 self._staticargs = staticargs
351 self._staticargs = staticargs
348 self._interrupted = False
352 self._interrupted = False
349 self.daemon = True
353 self.daemon = True
350 self.exception = None
354 self.exception = None
351
355
352 def interrupt(self):
356 def interrupt(self):
353 self._interrupted = True
357 self._interrupted = True
354
358
355 def run(self):
359 def run(self):
356 try:
360 try:
357 while not self._taskqueue.empty():
361 while not self._taskqueue.empty():
358 try:
362 try:
359 args = self._taskqueue.get_nowait()
363 args = self._taskqueue.get_nowait()
360 for res in self._func(*self._staticargs + (args,)):
364 for res in self._func(*self._staticargs + (args,)):
361 self._resultqueue.put(res)
365 self._resultqueue.put(res)
362 # threading doesn't provide a native way to
366 # threading doesn't provide a native way to
363 # interrupt execution. handle it manually at every
367 # interrupt execution. handle it manually at every
364 # iteration.
368 # iteration.
365 if self._interrupted:
369 if self._interrupted:
366 return
370 return
367 except pycompat.queue.Empty:
371 except pycompat.queue.Empty:
368 break
372 break
369 except Exception as e:
373 except Exception as e:
370 # store the exception such that the main thread can resurface
374 # store the exception such that the main thread can resurface
371 # it as if the func was running without workers.
375 # it as if the func was running without workers.
372 self.exception = e
376 self.exception = e
373 raise
377 raise
374
378
375 threads = []
379 threads = []
376
380
377 def trykillworkers():
381 def trykillworkers():
378 # Allow up to 1 second to clean worker threads nicely
382 # Allow up to 1 second to clean worker threads nicely
379 cleanupend = time.time() + 1
383 cleanupend = time.time() + 1
380 for t in threads:
384 for t in threads:
381 t.interrupt()
385 t.interrupt()
382 for t in threads:
386 for t in threads:
383 remainingtime = cleanupend - time.time()
387 remainingtime = cleanupend - time.time()
384 t.join(remainingtime)
388 t.join(remainingtime)
385 if t.is_alive():
389 if t.is_alive():
386 # pass over the workers joining failure. it is more
390 # pass over the workers joining failure. it is more
387 # important to surface the inital exception than the
391 # important to surface the inital exception than the
388 # fact that one of workers may be processing a large
392 # fact that one of workers may be processing a large
389 # task and does not get to handle the interruption.
393 # task and does not get to handle the interruption.
390 ui.warn(
394 ui.warn(
391 _(
395 _(
392 b"failed to kill worker threads while "
396 b"failed to kill worker threads while "
393 b"handling an exception\n"
397 b"handling an exception\n"
394 )
398 )
395 )
399 )
396 return
400 return
397
401
398 workers = _numworkers(ui)
402 workers = _numworkers(ui)
399 resultqueue = pycompat.queue.Queue()
403 resultqueue = pycompat.queue.Queue()
400 taskqueue = pycompat.queue.Queue()
404 taskqueue = pycompat.queue.Queue()
401 retval = {}
405 retval = {}
402 # partition work to more pieces than workers to minimize the chance
406 # partition work to more pieces than workers to minimize the chance
403 # of uneven distribution of large tasks between the workers
407 # of uneven distribution of large tasks between the workers
404 for pargs in partition(args, workers * 20):
408 for pargs in partition(args, workers * 20):
405 taskqueue.put(pargs)
409 taskqueue.put(pargs)
406 for _i in range(workers):
410 for _i in range(workers):
407 t = Worker(taskqueue, resultqueue, func, staticargs)
411 t = Worker(taskqueue, resultqueue, func, staticargs)
408 threads.append(t)
412 threads.append(t)
409 t.start()
413 t.start()
410 try:
414 try:
411 while len(threads) > 0:
415 while len(threads) > 0:
412 while not resultqueue.empty():
416 while not resultqueue.empty():
413 res = resultqueue.get()
417 res = resultqueue.get()
414 if hasretval and res[0]:
418 if hasretval and res[0]:
415 retval.update(res[1])
419 retval.update(res[1])
416 else:
420 else:
417 yield res
421 yield res
418 threads[0].join(0.05)
422 threads[0].join(0.05)
419 finishedthreads = [_t for _t in threads if not _t.is_alive()]
423 finishedthreads = [_t for _t in threads if not _t.is_alive()]
420 for t in finishedthreads:
424 for t in finishedthreads:
421 if t.exception is not None:
425 if t.exception is not None:
422 raise t.exception
426 raise t.exception
423 threads.remove(t)
427 threads.remove(t)
424 except (Exception, KeyboardInterrupt): # re-raises
428 except (Exception, KeyboardInterrupt): # re-raises
425 trykillworkers()
429 trykillworkers()
426 raise
430 raise
427 while not resultqueue.empty():
431 while not resultqueue.empty():
428 res = resultqueue.get()
432 res = resultqueue.get()
429 if hasretval and res[0]:
433 if hasretval and res[0]:
430 retval.update(res[1])
434 retval.update(res[1])
431 else:
435 else:
432 yield res
436 yield res
433 if hasretval:
437 if hasretval:
434 yield True, retval
438 yield True, retval
435
439
436
440
437 if pycompat.iswindows:
441 if pycompat.iswindows:
438 _platformworker = _windowsworker
442 _platformworker = _windowsworker
439 else:
443 else:
440 _platformworker = _posixworker
444 _platformworker = _posixworker
441 _exitstatus = _posixexitstatus
445 _exitstatus = _posixexitstatus
442
446
443
447
444 def partition(lst, nslices):
448 def partition(lst, nslices):
445 """partition a list into N slices of roughly equal size
449 """partition a list into N slices of roughly equal size
446
450
447 The current strategy takes every Nth element from the input. If
451 The current strategy takes every Nth element from the input. If
448 we ever write workers that need to preserve grouping in input
452 we ever write workers that need to preserve grouping in input
449 we should consider allowing callers to specify a partition strategy.
453 we should consider allowing callers to specify a partition strategy.
450
454
451 olivia is not a fan of this partitioning strategy when files are involved.
455 olivia is not a fan of this partitioning strategy when files are involved.
452 In his words:
456 In his words:
453
457
454 Single-threaded Mercurial makes a point of creating and visiting
458 Single-threaded Mercurial makes a point of creating and visiting
455 files in a fixed order (alphabetical). When creating files in order,
459 files in a fixed order (alphabetical). When creating files in order,
456 a typical filesystem is likely to allocate them on nearby regions on
460 a typical filesystem is likely to allocate them on nearby regions on
457 disk. Thus, when revisiting in the same order, locality is maximized
461 disk. Thus, when revisiting in the same order, locality is maximized
458 and various forms of OS and disk-level caching and read-ahead get a
462 and various forms of OS and disk-level caching and read-ahead get a
459 chance to work.
463 chance to work.
460
464
461 This effect can be quite significant on spinning disks. I discovered it
465 This effect can be quite significant on spinning disks. I discovered it
462 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
466 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
463 Tarring a repo and copying it to another disk effectively randomized
467 Tarring a repo and copying it to another disk effectively randomized
464 the revlog ordering on disk by sorting the revlogs by hash and suddenly
468 the revlog ordering on disk by sorting the revlogs by hash and suddenly
465 performance of my kernel checkout benchmark dropped by ~10x because the
469 performance of my kernel checkout benchmark dropped by ~10x because the
466 "working set" of sectors visited no longer fit in the drive's cache and
470 "working set" of sectors visited no longer fit in the drive's cache and
467 the workload switched from streaming to random I/O.
471 the workload switched from streaming to random I/O.
468
472
469 What we should really be doing is have workers read filenames from a
473 What we should really be doing is have workers read filenames from a
470 ordered queue. This preserves locality and also keeps any worker from
474 ordered queue. This preserves locality and also keeps any worker from
471 getting more than one file out of balance.
475 getting more than one file out of balance.
472 """
476 """
473 for i in range(nslices):
477 for i in range(nslices):
474 yield lst[i::nslices]
478 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now