##// END OF EJS Templates
worker: avoid potential partial write of pickled data...
Manuel Jacob -
r50164:395f2806 default
parent child Browse files
Show More
@@ -1,471 +1,473 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader:
71 class _blockingreader:
72 """Wrap unbuffered stream such that pickle.load() works with it.
72 """Wrap unbuffered stream such that pickle.load() works with it.
73
73
74 pickle.load() expects that calls to read() and readinto() read as many
74 pickle.load() expects that calls to read() and readinto() read as many
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 pickle.load() raises an EOFError.
76 pickle.load() raises an EOFError.
77 """
77 """
78
78
79 def __init__(self, wrapped):
79 def __init__(self, wrapped):
80 self._wrapped = wrapped
80 self._wrapped = wrapped
81
81
82 def readline(self):
82 def readline(self):
83 return self._wrapped.readline()
83 return self._wrapped.readline()
84
84
85 def readinto(self, buf):
85 def readinto(self, buf):
86 pos = 0
86 pos = 0
87 size = len(buf)
87 size = len(buf)
88
88
89 with memoryview(buf) as view:
89 with memoryview(buf) as view:
90 while pos < size:
90 while pos < size:
91 with view[pos:] as subview:
91 with view[pos:] as subview:
92 ret = self._wrapped.readinto(subview)
92 ret = self._wrapped.readinto(subview)
93 if not ret:
93 if not ret:
94 break
94 break
95 pos += ret
95 pos += ret
96
96
97 return pos
97 return pos
98
98
99 # issue multiple reads until size is fulfilled (or EOF is encountered)
99 # issue multiple reads until size is fulfilled (or EOF is encountered)
100 def read(self, size=-1):
100 def read(self, size=-1):
101 if size < 0:
101 if size < 0:
102 return self._wrapped.readall()
102 return self._wrapped.readall()
103
103
104 buf = bytearray(size)
104 buf = bytearray(size)
105 n_read = self.readinto(buf)
105 n_read = self.readinto(buf)
106 del buf[n_read:]
106 del buf[n_read:]
107 return bytes(buf)
107 return bytes(buf)
108
108
109
109
110 if pycompat.isposix or pycompat.iswindows:
110 if pycompat.isposix or pycompat.iswindows:
111 _STARTUP_COST = 0.01
111 _STARTUP_COST = 0.01
112 # The Windows worker is thread based. If tasks are CPU bound, threads
112 # The Windows worker is thread based. If tasks are CPU bound, threads
113 # in the presence of the GIL result in excessive context switching and
113 # in the presence of the GIL result in excessive context switching and
114 # this overhead can slow down execution.
114 # this overhead can slow down execution.
115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
116 else:
116 else:
117 _STARTUP_COST = 1e30
117 _STARTUP_COST = 1e30
118 _DISALLOW_THREAD_UNSAFE = False
118 _DISALLOW_THREAD_UNSAFE = False
119
119
120
120
121 def worthwhile(ui, costperop, nops, threadsafe=True):
121 def worthwhile(ui, costperop, nops, threadsafe=True):
122 """try to determine whether the benefit of multiple processes can
122 """try to determine whether the benefit of multiple processes can
123 outweigh the cost of starting them"""
123 outweigh the cost of starting them"""
124
124
125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
126 return False
126 return False
127
127
128 linear = costperop * nops
128 linear = costperop * nops
129 workers = _numworkers(ui)
129 workers = _numworkers(ui)
130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
131 return benefit >= 0.15
131 return benefit >= 0.15
132
132
133
133
134 def worker(
134 def worker(
135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
136 ):
136 ):
137 """run a function, possibly in parallel in multiple worker
137 """run a function, possibly in parallel in multiple worker
138 processes.
138 processes.
139
139
140 returns a progress iterator
140 returns a progress iterator
141
141
142 costperarg - cost of a single task
142 costperarg - cost of a single task
143
143
144 func - function to run. It is expected to return a progress iterator.
144 func - function to run. It is expected to return a progress iterator.
145
145
146 staticargs - arguments to pass to every invocation of the function
146 staticargs - arguments to pass to every invocation of the function
147
147
148 args - arguments to split into chunks, to pass to individual
148 args - arguments to split into chunks, to pass to individual
149 workers
149 workers
150
150
151 hasretval - when True, func and the current function return an progress
151 hasretval - when True, func and the current function return an progress
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
154 overlapping keys are a bad idea.
154 overlapping keys are a bad idea.
155
155
156 threadsafe - whether work items are thread safe and can be executed using
156 threadsafe - whether work items are thread safe and can be executed using
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
158 release the GIL.
158 release the GIL.
159 """
159 """
160 enabled = ui.configbool(b'worker', b'enabled')
160 enabled = ui.configbool(b'worker', b'enabled')
161 if enabled and _platformworker is _posixworker and not ismainthread():
161 if enabled and _platformworker is _posixworker and not ismainthread():
162 # The POSIX worker has to install a handler for SIGCHLD.
162 # The POSIX worker has to install a handler for SIGCHLD.
163 # Python up to 3.9 only allows this in the main thread.
163 # Python up to 3.9 only allows this in the main thread.
164 enabled = False
164 enabled = False
165
165
166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
167 return _platformworker(ui, func, staticargs, args, hasretval)
167 return _platformworker(ui, func, staticargs, args, hasretval)
168 return func(*staticargs + (args,))
168 return func(*staticargs + (args,))
169
169
170
170
171 def _posixworker(ui, func, staticargs, args, hasretval):
171 def _posixworker(ui, func, staticargs, args, hasretval):
172 workers = _numworkers(ui)
172 workers = _numworkers(ui)
173 oldhandler = signal.getsignal(signal.SIGINT)
173 oldhandler = signal.getsignal(signal.SIGINT)
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
175 pids, problem = set(), [0]
175 pids, problem = set(), [0]
176
176
177 def killworkers():
177 def killworkers():
178 # unregister SIGCHLD handler as all children will be killed. This
178 # unregister SIGCHLD handler as all children will be killed. This
179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
180 # could be updated while iterating, which would cause inconsistency.
180 # could be updated while iterating, which would cause inconsistency.
181 signal.signal(signal.SIGCHLD, oldchldhandler)
181 signal.signal(signal.SIGCHLD, oldchldhandler)
182 # if one worker bails, there's no good reason to wait for the rest
182 # if one worker bails, there's no good reason to wait for the rest
183 for p in pids:
183 for p in pids:
184 try:
184 try:
185 os.kill(p, signal.SIGTERM)
185 os.kill(p, signal.SIGTERM)
186 except OSError as err:
186 except OSError as err:
187 if err.errno != errno.ESRCH:
187 if err.errno != errno.ESRCH:
188 raise
188 raise
189
189
190 def waitforworkers(blocking=True):
190 def waitforworkers(blocking=True):
191 for pid in pids.copy():
191 for pid in pids.copy():
192 p = st = 0
192 p = st = 0
193 while True:
193 while True:
194 try:
194 try:
195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
196 break
196 break
197 except OSError as e:
197 except OSError as e:
198 if e.errno == errno.EINTR:
198 if e.errno == errno.EINTR:
199 continue
199 continue
200 elif e.errno == errno.ECHILD:
200 elif e.errno == errno.ECHILD:
201 # child would already be reaped, but pids yet been
201 # child would already be reaped, but pids yet been
202 # updated (maybe interrupted just after waitpid)
202 # updated (maybe interrupted just after waitpid)
203 pids.discard(pid)
203 pids.discard(pid)
204 break
204 break
205 else:
205 else:
206 raise
206 raise
207 if not p:
207 if not p:
208 # skip subsequent steps, because child process should
208 # skip subsequent steps, because child process should
209 # be still running in this case
209 # be still running in this case
210 continue
210 continue
211 pids.discard(p)
211 pids.discard(p)
212 st = _exitstatus(st)
212 st = _exitstatus(st)
213 if st and not problem[0]:
213 if st and not problem[0]:
214 problem[0] = st
214 problem[0] = st
215
215
216 def sigchldhandler(signum, frame):
216 def sigchldhandler(signum, frame):
217 waitforworkers(blocking=False)
217 waitforworkers(blocking=False)
218 if problem[0]:
218 if problem[0]:
219 killworkers()
219 killworkers()
220
220
221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
222 ui.flush()
222 ui.flush()
223 parentpid = os.getpid()
223 parentpid = os.getpid()
224 pipes = []
224 pipes = []
225 retval = {}
225 retval = {}
226 for pargs in partition(args, min(workers, len(args))):
226 for pargs in partition(args, min(workers, len(args))):
227 # Every worker gets its own pipe to send results on, so we don't have to
227 # Every worker gets its own pipe to send results on, so we don't have to
228 # implement atomic writes larger than PIPE_BUF. Each forked process has
228 # implement atomic writes larger than PIPE_BUF. Each forked process has
229 # its own pipe's descriptors in the local variables, and the parent
229 # its own pipe's descriptors in the local variables, and the parent
230 # process has the full list of pipe descriptors (and it doesn't really
230 # process has the full list of pipe descriptors (and it doesn't really
231 # care what order they're in).
231 # care what order they're in).
232 rfd, wfd = os.pipe()
232 rfd, wfd = os.pipe()
233 pipes.append((rfd, wfd))
233 pipes.append((rfd, wfd))
234 # make sure we use os._exit in all worker code paths. otherwise the
234 # make sure we use os._exit in all worker code paths. otherwise the
235 # worker may do some clean-ups which could cause surprises like
235 # worker may do some clean-ups which could cause surprises like
236 # deadlock. see sshpeer.cleanup for example.
236 # deadlock. see sshpeer.cleanup for example.
237 # override error handling *before* fork. this is necessary because
237 # override error handling *before* fork. this is necessary because
238 # exception (signal) may arrive after fork, before "pid =" assignment
238 # exception (signal) may arrive after fork, before "pid =" assignment
239 # completes, and other exception handler (dispatch.py) can lead to
239 # completes, and other exception handler (dispatch.py) can lead to
240 # unexpected code path without os._exit.
240 # unexpected code path without os._exit.
241 ret = -1
241 ret = -1
242 try:
242 try:
243 pid = os.fork()
243 pid = os.fork()
244 if pid == 0:
244 if pid == 0:
245 signal.signal(signal.SIGINT, oldhandler)
245 signal.signal(signal.SIGINT, oldhandler)
246 signal.signal(signal.SIGCHLD, oldchldhandler)
246 signal.signal(signal.SIGCHLD, oldchldhandler)
247
247
248 def workerfunc():
248 def workerfunc():
249 for r, w in pipes[:-1]:
249 for r, w in pipes[:-1]:
250 os.close(r)
250 os.close(r)
251 os.close(w)
251 os.close(w)
252 os.close(rfd)
252 os.close(rfd)
253 with os.fdopen(wfd, 'wb') as wf:
253 for result in func(*(staticargs + (pargs,))):
254 for result in func(*(staticargs + (pargs,))):
254 os.write(wfd, pickle.dumps(result))
255 pickle.dump(result, wf)
256 wf.flush()
255 return 0
257 return 0
256
258
257 ret = scmutil.callcatch(ui, workerfunc)
259 ret = scmutil.callcatch(ui, workerfunc)
258 except: # parent re-raises, child never returns
260 except: # parent re-raises, child never returns
259 if os.getpid() == parentpid:
261 if os.getpid() == parentpid:
260 raise
262 raise
261 exctype = sys.exc_info()[0]
263 exctype = sys.exc_info()[0]
262 force = not issubclass(exctype, KeyboardInterrupt)
264 force = not issubclass(exctype, KeyboardInterrupt)
263 ui.traceback(force=force)
265 ui.traceback(force=force)
264 finally:
266 finally:
265 if os.getpid() != parentpid:
267 if os.getpid() != parentpid:
266 try:
268 try:
267 ui.flush()
269 ui.flush()
268 except: # never returns, no re-raises
270 except: # never returns, no re-raises
269 pass
271 pass
270 finally:
272 finally:
271 os._exit(ret & 255)
273 os._exit(ret & 255)
272 pids.add(pid)
274 pids.add(pid)
273 selector = selectors.DefaultSelector()
275 selector = selectors.DefaultSelector()
274 for rfd, wfd in pipes:
276 for rfd, wfd in pipes:
275 os.close(wfd)
277 os.close(wfd)
276 # The stream has to be unbuffered. Otherwise, if all data is read from
278 # The stream has to be unbuffered. Otherwise, if all data is read from
277 # the raw file into the buffer, the selector thinks that the FD is not
279 # the raw file into the buffer, the selector thinks that the FD is not
278 # ready to read while pickle.load() could read from the buffer. This
280 # ready to read while pickle.load() could read from the buffer. This
279 # would delay the processing of readable items.
281 # would delay the processing of readable items.
280 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
282 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
281
283
282 def cleanup():
284 def cleanup():
283 signal.signal(signal.SIGINT, oldhandler)
285 signal.signal(signal.SIGINT, oldhandler)
284 waitforworkers()
286 waitforworkers()
285 signal.signal(signal.SIGCHLD, oldchldhandler)
287 signal.signal(signal.SIGCHLD, oldchldhandler)
286 selector.close()
288 selector.close()
287 return problem[0]
289 return problem[0]
288
290
289 try:
291 try:
290 openpipes = len(pipes)
292 openpipes = len(pipes)
291 while openpipes > 0:
293 while openpipes > 0:
292 for key, events in selector.select():
294 for key, events in selector.select():
293 try:
295 try:
294 # The pytype error likely goes away on a modern version of
296 # The pytype error likely goes away on a modern version of
295 # pytype having a modern typeshed snapshot.
297 # pytype having a modern typeshed snapshot.
296 # pytype: disable=wrong-arg-types
298 # pytype: disable=wrong-arg-types
297 res = pickle.load(_blockingreader(key.fileobj))
299 res = pickle.load(_blockingreader(key.fileobj))
298 # pytype: enable=wrong-arg-types
300 # pytype: enable=wrong-arg-types
299 if hasretval and res[0]:
301 if hasretval and res[0]:
300 retval.update(res[1])
302 retval.update(res[1])
301 else:
303 else:
302 yield res
304 yield res
303 except EOFError:
305 except EOFError:
304 selector.unregister(key.fileobj)
306 selector.unregister(key.fileobj)
305 key.fileobj.close()
307 key.fileobj.close()
306 openpipes -= 1
308 openpipes -= 1
307 except IOError as e:
309 except IOError as e:
308 if e.errno == errno.EINTR:
310 if e.errno == errno.EINTR:
309 continue
311 continue
310 raise
312 raise
311 except: # re-raises
313 except: # re-raises
312 killworkers()
314 killworkers()
313 cleanup()
315 cleanup()
314 raise
316 raise
315 status = cleanup()
317 status = cleanup()
316 if status:
318 if status:
317 if status < 0:
319 if status < 0:
318 os.kill(os.getpid(), -status)
320 os.kill(os.getpid(), -status)
319 raise error.WorkerError(status)
321 raise error.WorkerError(status)
320 if hasretval:
322 if hasretval:
321 yield True, retval
323 yield True, retval
322
324
323
325
324 def _posixexitstatus(code):
326 def _posixexitstatus(code):
325 """convert a posix exit status into the same form returned by
327 """convert a posix exit status into the same form returned by
326 os.spawnv
328 os.spawnv
327
329
328 returns None if the process was stopped instead of exiting"""
330 returns None if the process was stopped instead of exiting"""
329 if os.WIFEXITED(code):
331 if os.WIFEXITED(code):
330 return os.WEXITSTATUS(code)
332 return os.WEXITSTATUS(code)
331 elif os.WIFSIGNALED(code):
333 elif os.WIFSIGNALED(code):
332 return -(os.WTERMSIG(code))
334 return -(os.WTERMSIG(code))
333
335
334
336
335 def _windowsworker(ui, func, staticargs, args, hasretval):
337 def _windowsworker(ui, func, staticargs, args, hasretval):
336 class Worker(threading.Thread):
338 class Worker(threading.Thread):
337 def __init__(
339 def __init__(
338 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
340 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
339 ):
341 ):
340 threading.Thread.__init__(self, *args, **kwargs)
342 threading.Thread.__init__(self, *args, **kwargs)
341 self._taskqueue = taskqueue
343 self._taskqueue = taskqueue
342 self._resultqueue = resultqueue
344 self._resultqueue = resultqueue
343 self._func = func
345 self._func = func
344 self._staticargs = staticargs
346 self._staticargs = staticargs
345 self._interrupted = False
347 self._interrupted = False
346 self.daemon = True
348 self.daemon = True
347 self.exception = None
349 self.exception = None
348
350
349 def interrupt(self):
351 def interrupt(self):
350 self._interrupted = True
352 self._interrupted = True
351
353
352 def run(self):
354 def run(self):
353 try:
355 try:
354 while not self._taskqueue.empty():
356 while not self._taskqueue.empty():
355 try:
357 try:
356 args = self._taskqueue.get_nowait()
358 args = self._taskqueue.get_nowait()
357 for res in self._func(*self._staticargs + (args,)):
359 for res in self._func(*self._staticargs + (args,)):
358 self._resultqueue.put(res)
360 self._resultqueue.put(res)
359 # threading doesn't provide a native way to
361 # threading doesn't provide a native way to
360 # interrupt execution. handle it manually at every
362 # interrupt execution. handle it manually at every
361 # iteration.
363 # iteration.
362 if self._interrupted:
364 if self._interrupted:
363 return
365 return
364 except pycompat.queue.Empty:
366 except pycompat.queue.Empty:
365 break
367 break
366 except Exception as e:
368 except Exception as e:
367 # store the exception such that the main thread can resurface
369 # store the exception such that the main thread can resurface
368 # it as if the func was running without workers.
370 # it as if the func was running without workers.
369 self.exception = e
371 self.exception = e
370 raise
372 raise
371
373
372 threads = []
374 threads = []
373
375
374 def trykillworkers():
376 def trykillworkers():
375 # Allow up to 1 second to clean worker threads nicely
377 # Allow up to 1 second to clean worker threads nicely
376 cleanupend = time.time() + 1
378 cleanupend = time.time() + 1
377 for t in threads:
379 for t in threads:
378 t.interrupt()
380 t.interrupt()
379 for t in threads:
381 for t in threads:
380 remainingtime = cleanupend - time.time()
382 remainingtime = cleanupend - time.time()
381 t.join(remainingtime)
383 t.join(remainingtime)
382 if t.is_alive():
384 if t.is_alive():
383 # pass over the workers joining failure. it is more
385 # pass over the workers joining failure. it is more
384 # important to surface the inital exception than the
386 # important to surface the inital exception than the
385 # fact that one of workers may be processing a large
387 # fact that one of workers may be processing a large
386 # task and does not get to handle the interruption.
388 # task and does not get to handle the interruption.
387 ui.warn(
389 ui.warn(
388 _(
390 _(
389 b"failed to kill worker threads while "
391 b"failed to kill worker threads while "
390 b"handling an exception\n"
392 b"handling an exception\n"
391 )
393 )
392 )
394 )
393 return
395 return
394
396
395 workers = _numworkers(ui)
397 workers = _numworkers(ui)
396 resultqueue = pycompat.queue.Queue()
398 resultqueue = pycompat.queue.Queue()
397 taskqueue = pycompat.queue.Queue()
399 taskqueue = pycompat.queue.Queue()
398 retval = {}
400 retval = {}
399 # partition work to more pieces than workers to minimize the chance
401 # partition work to more pieces than workers to minimize the chance
400 # of uneven distribution of large tasks between the workers
402 # of uneven distribution of large tasks between the workers
401 for pargs in partition(args, workers * 20):
403 for pargs in partition(args, workers * 20):
402 taskqueue.put(pargs)
404 taskqueue.put(pargs)
403 for _i in range(workers):
405 for _i in range(workers):
404 t = Worker(taskqueue, resultqueue, func, staticargs)
406 t = Worker(taskqueue, resultqueue, func, staticargs)
405 threads.append(t)
407 threads.append(t)
406 t.start()
408 t.start()
407 try:
409 try:
408 while len(threads) > 0:
410 while len(threads) > 0:
409 while not resultqueue.empty():
411 while not resultqueue.empty():
410 res = resultqueue.get()
412 res = resultqueue.get()
411 if hasretval and res[0]:
413 if hasretval and res[0]:
412 retval.update(res[1])
414 retval.update(res[1])
413 else:
415 else:
414 yield res
416 yield res
415 threads[0].join(0.05)
417 threads[0].join(0.05)
416 finishedthreads = [_t for _t in threads if not _t.is_alive()]
418 finishedthreads = [_t for _t in threads if not _t.is_alive()]
417 for t in finishedthreads:
419 for t in finishedthreads:
418 if t.exception is not None:
420 if t.exception is not None:
419 raise t.exception
421 raise t.exception
420 threads.remove(t)
422 threads.remove(t)
421 except (Exception, KeyboardInterrupt): # re-raises
423 except (Exception, KeyboardInterrupt): # re-raises
422 trykillworkers()
424 trykillworkers()
423 raise
425 raise
424 while not resultqueue.empty():
426 while not resultqueue.empty():
425 res = resultqueue.get()
427 res = resultqueue.get()
426 if hasretval and res[0]:
428 if hasretval and res[0]:
427 retval.update(res[1])
429 retval.update(res[1])
428 else:
430 else:
429 yield res
431 yield res
430 if hasretval:
432 if hasretval:
431 yield True, retval
433 yield True, retval
432
434
433
435
434 if pycompat.iswindows:
436 if pycompat.iswindows:
435 _platformworker = _windowsworker
437 _platformworker = _windowsworker
436 else:
438 else:
437 _platformworker = _posixworker
439 _platformworker = _posixworker
438 _exitstatus = _posixexitstatus
440 _exitstatus = _posixexitstatus
439
441
440
442
441 def partition(lst, nslices):
443 def partition(lst, nslices):
442 """partition a list into N slices of roughly equal size
444 """partition a list into N slices of roughly equal size
443
445
444 The current strategy takes every Nth element from the input. If
446 The current strategy takes every Nth element from the input. If
445 we ever write workers that need to preserve grouping in input
447 we ever write workers that need to preserve grouping in input
446 we should consider allowing callers to specify a partition strategy.
448 we should consider allowing callers to specify a partition strategy.
447
449
448 olivia is not a fan of this partitioning strategy when files are involved.
450 olivia is not a fan of this partitioning strategy when files are involved.
449 In his words:
451 In his words:
450
452
451 Single-threaded Mercurial makes a point of creating and visiting
453 Single-threaded Mercurial makes a point of creating and visiting
452 files in a fixed order (alphabetical). When creating files in order,
454 files in a fixed order (alphabetical). When creating files in order,
453 a typical filesystem is likely to allocate them on nearby regions on
455 a typical filesystem is likely to allocate them on nearby regions on
454 disk. Thus, when revisiting in the same order, locality is maximized
456 disk. Thus, when revisiting in the same order, locality is maximized
455 and various forms of OS and disk-level caching and read-ahead get a
457 and various forms of OS and disk-level caching and read-ahead get a
456 chance to work.
458 chance to work.
457
459
458 This effect can be quite significant on spinning disks. I discovered it
460 This effect can be quite significant on spinning disks. I discovered it
459 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
461 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
460 Tarring a repo and copying it to another disk effectively randomized
462 Tarring a repo and copying it to another disk effectively randomized
461 the revlog ordering on disk by sorting the revlogs by hash and suddenly
463 the revlog ordering on disk by sorting the revlogs by hash and suddenly
462 performance of my kernel checkout benchmark dropped by ~10x because the
464 performance of my kernel checkout benchmark dropped by ~10x because the
463 "working set" of sectors visited no longer fit in the drive's cache and
465 "working set" of sectors visited no longer fit in the drive's cache and
464 the workload switched from streaming to random I/O.
466 the workload switched from streaming to random I/O.
465
467
466 What we should really be doing is have workers read filenames from a
468 What we should really be doing is have workers read filenames from a
467 ordered queue. This preserves locality and also keeps any worker from
469 ordered queue. This preserves locality and also keeps any worker from
468 getting more than one file out of balance.
470 getting more than one file out of balance.
469 """
471 """
470 for i in range(nslices):
472 for i in range(nslices):
471 yield lst[i::nslices]
473 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now