##// END OF EJS Templates
worker: add docstring to _blockingreader
Manuel Jacob -
r50126:4d42a5fb default
parent child Browse files
Show More
@@ -1,478 +1,485 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader:
71 class _blockingreader:
72 """Wrap unbuffered stream such that pickle.load() works with it.
73
74 pickle.load() expects that calls to read() and readinto() read as many
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 pickle.load() raises an EOFError.
77 """
78
72 def __init__(self, wrapped):
79 def __init__(self, wrapped):
73 self._wrapped = wrapped
80 self._wrapped = wrapped
74
81
75 # Do NOT implement readinto() by making it delegate to
82 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
83 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
84 # with just read() and readline(), so we don't need to implement it.
78
85
79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
86 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
80
87
81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
88 # This is required for python 3.8, prior to 3.8.2. See issue6444.
82 def readinto(self, b):
89 def readinto(self, b):
83 pos = 0
90 pos = 0
84 size = len(b)
91 size = len(b)
85
92
86 while pos < size:
93 while pos < size:
87 ret = self._wrapped.readinto(b[pos:])
94 ret = self._wrapped.readinto(b[pos:])
88 if not ret:
95 if not ret:
89 break
96 break
90 pos += ret
97 pos += ret
91
98
92 return pos
99 return pos
93
100
94 def readline(self):
101 def readline(self):
95 return self._wrapped.readline()
102 return self._wrapped.readline()
96
103
97 # issue multiple reads until size is fulfilled
104 # issue multiple reads until size is fulfilled (or EOF is encountered)
98 def read(self, size=-1):
105 def read(self, size=-1):
99 if size < 0:
106 if size < 0:
100 return self._wrapped.readall()
107 return self._wrapped.readall()
101
108
102 buf = bytearray(size)
109 buf = bytearray(size)
103 view = memoryview(buf)
110 view = memoryview(buf)
104 pos = 0
111 pos = 0
105
112
106 while pos < size:
113 while pos < size:
107 ret = self._wrapped.readinto(view[pos:])
114 ret = self._wrapped.readinto(view[pos:])
108 if not ret:
115 if not ret:
109 break
116 break
110 pos += ret
117 pos += ret
111
118
112 del view
119 del view
113 del buf[pos:]
120 del buf[pos:]
114 return bytes(buf)
121 return bytes(buf)
115
122
116
123
117 if pycompat.isposix or pycompat.iswindows:
124 if pycompat.isposix or pycompat.iswindows:
118 _STARTUP_COST = 0.01
125 _STARTUP_COST = 0.01
119 # The Windows worker is thread based. If tasks are CPU bound, threads
126 # The Windows worker is thread based. If tasks are CPU bound, threads
120 # in the presence of the GIL result in excessive context switching and
127 # in the presence of the GIL result in excessive context switching and
121 # this overhead can slow down execution.
128 # this overhead can slow down execution.
122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
129 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
123 else:
130 else:
124 _STARTUP_COST = 1e30
131 _STARTUP_COST = 1e30
125 _DISALLOW_THREAD_UNSAFE = False
132 _DISALLOW_THREAD_UNSAFE = False
126
133
127
134
128 def worthwhile(ui, costperop, nops, threadsafe=True):
135 def worthwhile(ui, costperop, nops, threadsafe=True):
129 """try to determine whether the benefit of multiple processes can
136 """try to determine whether the benefit of multiple processes can
130 outweigh the cost of starting them"""
137 outweigh the cost of starting them"""
131
138
132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
139 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
133 return False
140 return False
134
141
135 linear = costperop * nops
142 linear = costperop * nops
136 workers = _numworkers(ui)
143 workers = _numworkers(ui)
137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
144 benefit = linear - (_STARTUP_COST * workers + linear / workers)
138 return benefit >= 0.15
145 return benefit >= 0.15
139
146
140
147
141 def worker(
148 def worker(
142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
149 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
143 ):
150 ):
144 """run a function, possibly in parallel in multiple worker
151 """run a function, possibly in parallel in multiple worker
145 processes.
152 processes.
146
153
147 returns a progress iterator
154 returns a progress iterator
148
155
149 costperarg - cost of a single task
156 costperarg - cost of a single task
150
157
151 func - function to run. It is expected to return a progress iterator.
158 func - function to run. It is expected to return a progress iterator.
152
159
153 staticargs - arguments to pass to every invocation of the function
160 staticargs - arguments to pass to every invocation of the function
154
161
155 args - arguments to split into chunks, to pass to individual
162 args - arguments to split into chunks, to pass to individual
156 workers
163 workers
157
164
158 hasretval - when True, func and the current function return an progress
165 hasretval - when True, func and the current function return an progress
159 iterator then a dict (encoded as an iterator that yield many (False, ..)
166 iterator then a dict (encoded as an iterator that yield many (False, ..)
160 then a (True, dict)). The dicts are joined in some arbitrary order, so
167 then a (True, dict)). The dicts are joined in some arbitrary order, so
161 overlapping keys are a bad idea.
168 overlapping keys are a bad idea.
162
169
163 threadsafe - whether work items are thread safe and can be executed using
170 threadsafe - whether work items are thread safe and can be executed using
164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
171 a thread-based worker. Should be disabled for CPU heavy tasks that don't
165 release the GIL.
172 release the GIL.
166 """
173 """
167 enabled = ui.configbool(b'worker', b'enabled')
174 enabled = ui.configbool(b'worker', b'enabled')
168 if enabled and _platformworker is _posixworker and not ismainthread():
175 if enabled and _platformworker is _posixworker and not ismainthread():
169 # The POSIX worker has to install a handler for SIGCHLD.
176 # The POSIX worker has to install a handler for SIGCHLD.
170 # Python up to 3.9 only allows this in the main thread.
177 # Python up to 3.9 only allows this in the main thread.
171 enabled = False
178 enabled = False
172
179
173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
180 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
174 return _platformworker(ui, func, staticargs, args, hasretval)
181 return _platformworker(ui, func, staticargs, args, hasretval)
175 return func(*staticargs + (args,))
182 return func(*staticargs + (args,))
176
183
177
184
178 def _posixworker(ui, func, staticargs, args, hasretval):
185 def _posixworker(ui, func, staticargs, args, hasretval):
179 workers = _numworkers(ui)
186 workers = _numworkers(ui)
180 oldhandler = signal.getsignal(signal.SIGINT)
187 oldhandler = signal.getsignal(signal.SIGINT)
181 signal.signal(signal.SIGINT, signal.SIG_IGN)
188 signal.signal(signal.SIGINT, signal.SIG_IGN)
182 pids, problem = set(), [0]
189 pids, problem = set(), [0]
183
190
184 def killworkers():
191 def killworkers():
185 # unregister SIGCHLD handler as all children will be killed. This
192 # unregister SIGCHLD handler as all children will be killed. This
186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
193 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
187 # could be updated while iterating, which would cause inconsistency.
194 # could be updated while iterating, which would cause inconsistency.
188 signal.signal(signal.SIGCHLD, oldchldhandler)
195 signal.signal(signal.SIGCHLD, oldchldhandler)
189 # if one worker bails, there's no good reason to wait for the rest
196 # if one worker bails, there's no good reason to wait for the rest
190 for p in pids:
197 for p in pids:
191 try:
198 try:
192 os.kill(p, signal.SIGTERM)
199 os.kill(p, signal.SIGTERM)
193 except OSError as err:
200 except OSError as err:
194 if err.errno != errno.ESRCH:
201 if err.errno != errno.ESRCH:
195 raise
202 raise
196
203
197 def waitforworkers(blocking=True):
204 def waitforworkers(blocking=True):
198 for pid in pids.copy():
205 for pid in pids.copy():
199 p = st = 0
206 p = st = 0
200 while True:
207 while True:
201 try:
208 try:
202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
209 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
203 break
210 break
204 except OSError as e:
211 except OSError as e:
205 if e.errno == errno.EINTR:
212 if e.errno == errno.EINTR:
206 continue
213 continue
207 elif e.errno == errno.ECHILD:
214 elif e.errno == errno.ECHILD:
208 # child would already be reaped, but pids yet been
215 # child would already be reaped, but pids yet been
209 # updated (maybe interrupted just after waitpid)
216 # updated (maybe interrupted just after waitpid)
210 pids.discard(pid)
217 pids.discard(pid)
211 break
218 break
212 else:
219 else:
213 raise
220 raise
214 if not p:
221 if not p:
215 # skip subsequent steps, because child process should
222 # skip subsequent steps, because child process should
216 # be still running in this case
223 # be still running in this case
217 continue
224 continue
218 pids.discard(p)
225 pids.discard(p)
219 st = _exitstatus(st)
226 st = _exitstatus(st)
220 if st and not problem[0]:
227 if st and not problem[0]:
221 problem[0] = st
228 problem[0] = st
222
229
223 def sigchldhandler(signum, frame):
230 def sigchldhandler(signum, frame):
224 waitforworkers(blocking=False)
231 waitforworkers(blocking=False)
225 if problem[0]:
232 if problem[0]:
226 killworkers()
233 killworkers()
227
234
228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
235 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
229 ui.flush()
236 ui.flush()
230 parentpid = os.getpid()
237 parentpid = os.getpid()
231 pipes = []
238 pipes = []
232 retval = {}
239 retval = {}
233 for pargs in partition(args, min(workers, len(args))):
240 for pargs in partition(args, min(workers, len(args))):
234 # Every worker gets its own pipe to send results on, so we don't have to
241 # Every worker gets its own pipe to send results on, so we don't have to
235 # implement atomic writes larger than PIPE_BUF. Each forked process has
242 # implement atomic writes larger than PIPE_BUF. Each forked process has
236 # its own pipe's descriptors in the local variables, and the parent
243 # its own pipe's descriptors in the local variables, and the parent
237 # process has the full list of pipe descriptors (and it doesn't really
244 # process has the full list of pipe descriptors (and it doesn't really
238 # care what order they're in).
245 # care what order they're in).
239 rfd, wfd = os.pipe()
246 rfd, wfd = os.pipe()
240 pipes.append((rfd, wfd))
247 pipes.append((rfd, wfd))
241 # make sure we use os._exit in all worker code paths. otherwise the
248 # make sure we use os._exit in all worker code paths. otherwise the
242 # worker may do some clean-ups which could cause surprises like
249 # worker may do some clean-ups which could cause surprises like
243 # deadlock. see sshpeer.cleanup for example.
250 # deadlock. see sshpeer.cleanup for example.
244 # override error handling *before* fork. this is necessary because
251 # override error handling *before* fork. this is necessary because
245 # exception (signal) may arrive after fork, before "pid =" assignment
252 # exception (signal) may arrive after fork, before "pid =" assignment
246 # completes, and other exception handler (dispatch.py) can lead to
253 # completes, and other exception handler (dispatch.py) can lead to
247 # unexpected code path without os._exit.
254 # unexpected code path without os._exit.
248 ret = -1
255 ret = -1
249 try:
256 try:
250 pid = os.fork()
257 pid = os.fork()
251 if pid == 0:
258 if pid == 0:
252 signal.signal(signal.SIGINT, oldhandler)
259 signal.signal(signal.SIGINT, oldhandler)
253 signal.signal(signal.SIGCHLD, oldchldhandler)
260 signal.signal(signal.SIGCHLD, oldchldhandler)
254
261
255 def workerfunc():
262 def workerfunc():
256 for r, w in pipes[:-1]:
263 for r, w in pipes[:-1]:
257 os.close(r)
264 os.close(r)
258 os.close(w)
265 os.close(w)
259 os.close(rfd)
266 os.close(rfd)
260 for result in func(*(staticargs + (pargs,))):
267 for result in func(*(staticargs + (pargs,))):
261 os.write(wfd, pickle.dumps(result))
268 os.write(wfd, pickle.dumps(result))
262 return 0
269 return 0
263
270
264 ret = scmutil.callcatch(ui, workerfunc)
271 ret = scmutil.callcatch(ui, workerfunc)
265 except: # parent re-raises, child never returns
272 except: # parent re-raises, child never returns
266 if os.getpid() == parentpid:
273 if os.getpid() == parentpid:
267 raise
274 raise
268 exctype = sys.exc_info()[0]
275 exctype = sys.exc_info()[0]
269 force = not issubclass(exctype, KeyboardInterrupt)
276 force = not issubclass(exctype, KeyboardInterrupt)
270 ui.traceback(force=force)
277 ui.traceback(force=force)
271 finally:
278 finally:
272 if os.getpid() != parentpid:
279 if os.getpid() != parentpid:
273 try:
280 try:
274 ui.flush()
281 ui.flush()
275 except: # never returns, no re-raises
282 except: # never returns, no re-raises
276 pass
283 pass
277 finally:
284 finally:
278 os._exit(ret & 255)
285 os._exit(ret & 255)
279 pids.add(pid)
286 pids.add(pid)
280 selector = selectors.DefaultSelector()
287 selector = selectors.DefaultSelector()
281 for rfd, wfd in pipes:
288 for rfd, wfd in pipes:
282 os.close(wfd)
289 os.close(wfd)
283 # The stream has to be unbuffered. Otherwise, if all data is read from
290 # The stream has to be unbuffered. Otherwise, if all data is read from
284 # the raw file into the buffer, the selector thinks that the FD is not
291 # the raw file into the buffer, the selector thinks that the FD is not
285 # ready to read while pickle.load() could read from the buffer. This
292 # ready to read while pickle.load() could read from the buffer. This
286 # would delay the processing of readable items.
293 # would delay the processing of readable items.
287 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
294 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
288
295
289 def cleanup():
296 def cleanup():
290 signal.signal(signal.SIGINT, oldhandler)
297 signal.signal(signal.SIGINT, oldhandler)
291 waitforworkers()
298 waitforworkers()
292 signal.signal(signal.SIGCHLD, oldchldhandler)
299 signal.signal(signal.SIGCHLD, oldchldhandler)
293 selector.close()
300 selector.close()
294 return problem[0]
301 return problem[0]
295
302
296 try:
303 try:
297 openpipes = len(pipes)
304 openpipes = len(pipes)
298 while openpipes > 0:
305 while openpipes > 0:
299 for key, events in selector.select():
306 for key, events in selector.select():
300 try:
307 try:
301 # The pytype error likely goes away on a modern version of
308 # The pytype error likely goes away on a modern version of
302 # pytype having a modern typeshed snapshot.
309 # pytype having a modern typeshed snapshot.
303 # pytype: disable=wrong-arg-types
310 # pytype: disable=wrong-arg-types
304 res = pickle.load(_blockingreader(key.fileobj))
311 res = pickle.load(_blockingreader(key.fileobj))
305 # pytype: enable=wrong-arg-types
312 # pytype: enable=wrong-arg-types
306 if hasretval and res[0]:
313 if hasretval and res[0]:
307 retval.update(res[1])
314 retval.update(res[1])
308 else:
315 else:
309 yield res
316 yield res
310 except EOFError:
317 except EOFError:
311 selector.unregister(key.fileobj)
318 selector.unregister(key.fileobj)
312 key.fileobj.close()
319 key.fileobj.close()
313 openpipes -= 1
320 openpipes -= 1
314 except IOError as e:
321 except IOError as e:
315 if e.errno == errno.EINTR:
322 if e.errno == errno.EINTR:
316 continue
323 continue
317 raise
324 raise
318 except: # re-raises
325 except: # re-raises
319 killworkers()
326 killworkers()
320 cleanup()
327 cleanup()
321 raise
328 raise
322 status = cleanup()
329 status = cleanup()
323 if status:
330 if status:
324 if status < 0:
331 if status < 0:
325 os.kill(os.getpid(), -status)
332 os.kill(os.getpid(), -status)
326 raise error.WorkerError(status)
333 raise error.WorkerError(status)
327 if hasretval:
334 if hasretval:
328 yield True, retval
335 yield True, retval
329
336
330
337
331 def _posixexitstatus(code):
338 def _posixexitstatus(code):
332 """convert a posix exit status into the same form returned by
339 """convert a posix exit status into the same form returned by
333 os.spawnv
340 os.spawnv
334
341
335 returns None if the process was stopped instead of exiting"""
342 returns None if the process was stopped instead of exiting"""
336 if os.WIFEXITED(code):
343 if os.WIFEXITED(code):
337 return os.WEXITSTATUS(code)
344 return os.WEXITSTATUS(code)
338 elif os.WIFSIGNALED(code):
345 elif os.WIFSIGNALED(code):
339 return -(os.WTERMSIG(code))
346 return -(os.WTERMSIG(code))
340
347
341
348
342 def _windowsworker(ui, func, staticargs, args, hasretval):
349 def _windowsworker(ui, func, staticargs, args, hasretval):
343 class Worker(threading.Thread):
350 class Worker(threading.Thread):
344 def __init__(
351 def __init__(
345 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
352 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
346 ):
353 ):
347 threading.Thread.__init__(self, *args, **kwargs)
354 threading.Thread.__init__(self, *args, **kwargs)
348 self._taskqueue = taskqueue
355 self._taskqueue = taskqueue
349 self._resultqueue = resultqueue
356 self._resultqueue = resultqueue
350 self._func = func
357 self._func = func
351 self._staticargs = staticargs
358 self._staticargs = staticargs
352 self._interrupted = False
359 self._interrupted = False
353 self.daemon = True
360 self.daemon = True
354 self.exception = None
361 self.exception = None
355
362
356 def interrupt(self):
363 def interrupt(self):
357 self._interrupted = True
364 self._interrupted = True
358
365
359 def run(self):
366 def run(self):
360 try:
367 try:
361 while not self._taskqueue.empty():
368 while not self._taskqueue.empty():
362 try:
369 try:
363 args = self._taskqueue.get_nowait()
370 args = self._taskqueue.get_nowait()
364 for res in self._func(*self._staticargs + (args,)):
371 for res in self._func(*self._staticargs + (args,)):
365 self._resultqueue.put(res)
372 self._resultqueue.put(res)
366 # threading doesn't provide a native way to
373 # threading doesn't provide a native way to
367 # interrupt execution. handle it manually at every
374 # interrupt execution. handle it manually at every
368 # iteration.
375 # iteration.
369 if self._interrupted:
376 if self._interrupted:
370 return
377 return
371 except pycompat.queue.Empty:
378 except pycompat.queue.Empty:
372 break
379 break
373 except Exception as e:
380 except Exception as e:
374 # store the exception such that the main thread can resurface
381 # store the exception such that the main thread can resurface
375 # it as if the func was running without workers.
382 # it as if the func was running without workers.
376 self.exception = e
383 self.exception = e
377 raise
384 raise
378
385
379 threads = []
386 threads = []
380
387
381 def trykillworkers():
388 def trykillworkers():
382 # Allow up to 1 second to clean worker threads nicely
389 # Allow up to 1 second to clean worker threads nicely
383 cleanupend = time.time() + 1
390 cleanupend = time.time() + 1
384 for t in threads:
391 for t in threads:
385 t.interrupt()
392 t.interrupt()
386 for t in threads:
393 for t in threads:
387 remainingtime = cleanupend - time.time()
394 remainingtime = cleanupend - time.time()
388 t.join(remainingtime)
395 t.join(remainingtime)
389 if t.is_alive():
396 if t.is_alive():
390 # pass over the workers joining failure. it is more
397 # pass over the workers joining failure. it is more
391 # important to surface the inital exception than the
398 # important to surface the inital exception than the
392 # fact that one of workers may be processing a large
399 # fact that one of workers may be processing a large
393 # task and does not get to handle the interruption.
400 # task and does not get to handle the interruption.
394 ui.warn(
401 ui.warn(
395 _(
402 _(
396 b"failed to kill worker threads while "
403 b"failed to kill worker threads while "
397 b"handling an exception\n"
404 b"handling an exception\n"
398 )
405 )
399 )
406 )
400 return
407 return
401
408
402 workers = _numworkers(ui)
409 workers = _numworkers(ui)
403 resultqueue = pycompat.queue.Queue()
410 resultqueue = pycompat.queue.Queue()
404 taskqueue = pycompat.queue.Queue()
411 taskqueue = pycompat.queue.Queue()
405 retval = {}
412 retval = {}
406 # partition work to more pieces than workers to minimize the chance
413 # partition work to more pieces than workers to minimize the chance
407 # of uneven distribution of large tasks between the workers
414 # of uneven distribution of large tasks between the workers
408 for pargs in partition(args, workers * 20):
415 for pargs in partition(args, workers * 20):
409 taskqueue.put(pargs)
416 taskqueue.put(pargs)
410 for _i in range(workers):
417 for _i in range(workers):
411 t = Worker(taskqueue, resultqueue, func, staticargs)
418 t = Worker(taskqueue, resultqueue, func, staticargs)
412 threads.append(t)
419 threads.append(t)
413 t.start()
420 t.start()
414 try:
421 try:
415 while len(threads) > 0:
422 while len(threads) > 0:
416 while not resultqueue.empty():
423 while not resultqueue.empty():
417 res = resultqueue.get()
424 res = resultqueue.get()
418 if hasretval and res[0]:
425 if hasretval and res[0]:
419 retval.update(res[1])
426 retval.update(res[1])
420 else:
427 else:
421 yield res
428 yield res
422 threads[0].join(0.05)
429 threads[0].join(0.05)
423 finishedthreads = [_t for _t in threads if not _t.is_alive()]
430 finishedthreads = [_t for _t in threads if not _t.is_alive()]
424 for t in finishedthreads:
431 for t in finishedthreads:
425 if t.exception is not None:
432 if t.exception is not None:
426 raise t.exception
433 raise t.exception
427 threads.remove(t)
434 threads.remove(t)
428 except (Exception, KeyboardInterrupt): # re-raises
435 except (Exception, KeyboardInterrupt): # re-raises
429 trykillworkers()
436 trykillworkers()
430 raise
437 raise
431 while not resultqueue.empty():
438 while not resultqueue.empty():
432 res = resultqueue.get()
439 res = resultqueue.get()
433 if hasretval and res[0]:
440 if hasretval and res[0]:
434 retval.update(res[1])
441 retval.update(res[1])
435 else:
442 else:
436 yield res
443 yield res
437 if hasretval:
444 if hasretval:
438 yield True, retval
445 yield True, retval
439
446
440
447
441 if pycompat.iswindows:
448 if pycompat.iswindows:
442 _platformworker = _windowsworker
449 _platformworker = _windowsworker
443 else:
450 else:
444 _platformworker = _posixworker
451 _platformworker = _posixworker
445 _exitstatus = _posixexitstatus
452 _exitstatus = _posixexitstatus
446
453
447
454
448 def partition(lst, nslices):
455 def partition(lst, nslices):
449 """partition a list into N slices of roughly equal size
456 """partition a list into N slices of roughly equal size
450
457
451 The current strategy takes every Nth element from the input. If
458 The current strategy takes every Nth element from the input. If
452 we ever write workers that need to preserve grouping in input
459 we ever write workers that need to preserve grouping in input
453 we should consider allowing callers to specify a partition strategy.
460 we should consider allowing callers to specify a partition strategy.
454
461
455 olivia is not a fan of this partitioning strategy when files are involved.
462 olivia is not a fan of this partitioning strategy when files are involved.
456 In his words:
463 In his words:
457
464
458 Single-threaded Mercurial makes a point of creating and visiting
465 Single-threaded Mercurial makes a point of creating and visiting
459 files in a fixed order (alphabetical). When creating files in order,
466 files in a fixed order (alphabetical). When creating files in order,
460 a typical filesystem is likely to allocate them on nearby regions on
467 a typical filesystem is likely to allocate them on nearby regions on
461 disk. Thus, when revisiting in the same order, locality is maximized
468 disk. Thus, when revisiting in the same order, locality is maximized
462 and various forms of OS and disk-level caching and read-ahead get a
469 and various forms of OS and disk-level caching and read-ahead get a
463 chance to work.
470 chance to work.
464
471
465 This effect can be quite significant on spinning disks. I discovered it
472 This effect can be quite significant on spinning disks. I discovered it
466 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
473 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
467 Tarring a repo and copying it to another disk effectively randomized
474 Tarring a repo and copying it to another disk effectively randomized
468 the revlog ordering on disk by sorting the revlogs by hash and suddenly
475 the revlog ordering on disk by sorting the revlogs by hash and suddenly
469 performance of my kernel checkout benchmark dropped by ~10x because the
476 performance of my kernel checkout benchmark dropped by ~10x because the
470 "working set" of sectors visited no longer fit in the drive's cache and
477 "working set" of sectors visited no longer fit in the drive's cache and
471 the workload switched from streaming to random I/O.
478 the workload switched from streaming to random I/O.
472
479
473 What we should really be doing is have workers read filenames from a
480 What we should really be doing is have workers read filenames from a
474 ordered queue. This preserves locality and also keeps any worker from
481 ordered queue. This preserves locality and also keeps any worker from
475 getting more than one file out of balance.
482 getting more than one file out of balance.
476 """
483 """
477 for i in range(nslices):
484 for i in range(nslices):
478 yield lst[i::nslices]
485 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now