##// END OF EJS Templates
worker: implement _blockingreader.readinto() (issue6444)...
Manuel Jacob -
r50128:52072252 default
parent child Browse files
Show More
@@ -1,485 +1,471 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader:
71 class _blockingreader:
72 """Wrap unbuffered stream such that pickle.load() works with it.
72 """Wrap unbuffered stream such that pickle.load() works with it.
73
73
74 pickle.load() expects that calls to read() and readinto() read as many
74 pickle.load() expects that calls to read() and readinto() read as many
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 pickle.load() raises an EOFError.
76 pickle.load() raises an EOFError.
77 """
77 """
78
78
79 def __init__(self, wrapped):
79 def __init__(self, wrapped):
80 self._wrapped = wrapped
80 self._wrapped = wrapped
81
81
82 # Do NOT implement readinto() by making it delegate to
83 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
84 # with just read() and readline(), so we don't need to implement it.
85
86 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
87
88 # This is required for python 3.8, prior to 3.8.2. See issue6444.
89 def readinto(self, b):
90 pos = 0
91 size = len(b)
92
93 while pos < size:
94 ret = self._wrapped.readinto(b[pos:])
95 if not ret:
96 break
97 pos += ret
98
99 return pos
100
101 def readline(self):
82 def readline(self):
102 return self._wrapped.readline()
83 return self._wrapped.readline()
103
84
104 # issue multiple reads until size is fulfilled (or EOF is encountered)
85 def readinto(self, buf):
105 def read(self, size=-1):
106 if size < 0:
107 return self._wrapped.readall()
108
109 buf = bytearray(size)
110 pos = 0
86 pos = 0
87 size = len(buf)
111
88
112 with memoryview(buf) as view:
89 with memoryview(buf) as view:
113 while pos < size:
90 while pos < size:
114 with view[pos:] as subview:
91 with view[pos:] as subview:
115 ret = self._wrapped.readinto(subview)
92 ret = self._wrapped.readinto(subview)
116 if not ret:
93 if not ret:
117 break
94 break
118 pos += ret
95 pos += ret
119
96
120 del buf[pos:]
97 return pos
98
99 # issue multiple reads until size is fulfilled (or EOF is encountered)
100 def read(self, size=-1):
101 if size < 0:
102 return self._wrapped.readall()
103
104 buf = bytearray(size)
105 n_read = self.readinto(buf)
106 del buf[n_read:]
121 return bytes(buf)
107 return bytes(buf)
122
108
123
109
124 if pycompat.isposix or pycompat.iswindows:
110 if pycompat.isposix or pycompat.iswindows:
125 _STARTUP_COST = 0.01
111 _STARTUP_COST = 0.01
126 # The Windows worker is thread based. If tasks are CPU bound, threads
112 # The Windows worker is thread based. If tasks are CPU bound, threads
127 # in the presence of the GIL result in excessive context switching and
113 # in the presence of the GIL result in excessive context switching and
128 # this overhead can slow down execution.
114 # this overhead can slow down execution.
129 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
130 else:
116 else:
131 _STARTUP_COST = 1e30
117 _STARTUP_COST = 1e30
132 _DISALLOW_THREAD_UNSAFE = False
118 _DISALLOW_THREAD_UNSAFE = False
133
119
134
120
135 def worthwhile(ui, costperop, nops, threadsafe=True):
121 def worthwhile(ui, costperop, nops, threadsafe=True):
136 """try to determine whether the benefit of multiple processes can
122 """try to determine whether the benefit of multiple processes can
137 outweigh the cost of starting them"""
123 outweigh the cost of starting them"""
138
124
139 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
140 return False
126 return False
141
127
142 linear = costperop * nops
128 linear = costperop * nops
143 workers = _numworkers(ui)
129 workers = _numworkers(ui)
144 benefit = linear - (_STARTUP_COST * workers + linear / workers)
130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
145 return benefit >= 0.15
131 return benefit >= 0.15
146
132
147
133
148 def worker(
134 def worker(
149 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
150 ):
136 ):
151 """run a function, possibly in parallel in multiple worker
137 """run a function, possibly in parallel in multiple worker
152 processes.
138 processes.
153
139
154 returns a progress iterator
140 returns a progress iterator
155
141
156 costperarg - cost of a single task
142 costperarg - cost of a single task
157
143
158 func - function to run. It is expected to return a progress iterator.
144 func - function to run. It is expected to return a progress iterator.
159
145
160 staticargs - arguments to pass to every invocation of the function
146 staticargs - arguments to pass to every invocation of the function
161
147
162 args - arguments to split into chunks, to pass to individual
148 args - arguments to split into chunks, to pass to individual
163 workers
149 workers
164
150
165 hasretval - when True, func and the current function return an progress
151 hasretval - when True, func and the current function return an progress
166 iterator then a dict (encoded as an iterator that yield many (False, ..)
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
167 then a (True, dict)). The dicts are joined in some arbitrary order, so
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
168 overlapping keys are a bad idea.
154 overlapping keys are a bad idea.
169
155
170 threadsafe - whether work items are thread safe and can be executed using
156 threadsafe - whether work items are thread safe and can be executed using
171 a thread-based worker. Should be disabled for CPU heavy tasks that don't
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
172 release the GIL.
158 release the GIL.
173 """
159 """
174 enabled = ui.configbool(b'worker', b'enabled')
160 enabled = ui.configbool(b'worker', b'enabled')
175 if enabled and _platformworker is _posixworker and not ismainthread():
161 if enabled and _platformworker is _posixworker and not ismainthread():
176 # The POSIX worker has to install a handler for SIGCHLD.
162 # The POSIX worker has to install a handler for SIGCHLD.
177 # Python up to 3.9 only allows this in the main thread.
163 # Python up to 3.9 only allows this in the main thread.
178 enabled = False
164 enabled = False
179
165
180 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
181 return _platformworker(ui, func, staticargs, args, hasretval)
167 return _platformworker(ui, func, staticargs, args, hasretval)
182 return func(*staticargs + (args,))
168 return func(*staticargs + (args,))
183
169
184
170
185 def _posixworker(ui, func, staticargs, args, hasretval):
171 def _posixworker(ui, func, staticargs, args, hasretval):
186 workers = _numworkers(ui)
172 workers = _numworkers(ui)
187 oldhandler = signal.getsignal(signal.SIGINT)
173 oldhandler = signal.getsignal(signal.SIGINT)
188 signal.signal(signal.SIGINT, signal.SIG_IGN)
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
189 pids, problem = set(), [0]
175 pids, problem = set(), [0]
190
176
191 def killworkers():
177 def killworkers():
192 # unregister SIGCHLD handler as all children will be killed. This
178 # unregister SIGCHLD handler as all children will be killed. This
193 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
194 # could be updated while iterating, which would cause inconsistency.
180 # could be updated while iterating, which would cause inconsistency.
195 signal.signal(signal.SIGCHLD, oldchldhandler)
181 signal.signal(signal.SIGCHLD, oldchldhandler)
196 # if one worker bails, there's no good reason to wait for the rest
182 # if one worker bails, there's no good reason to wait for the rest
197 for p in pids:
183 for p in pids:
198 try:
184 try:
199 os.kill(p, signal.SIGTERM)
185 os.kill(p, signal.SIGTERM)
200 except OSError as err:
186 except OSError as err:
201 if err.errno != errno.ESRCH:
187 if err.errno != errno.ESRCH:
202 raise
188 raise
203
189
204 def waitforworkers(blocking=True):
190 def waitforworkers(blocking=True):
205 for pid in pids.copy():
191 for pid in pids.copy():
206 p = st = 0
192 p = st = 0
207 while True:
193 while True:
208 try:
194 try:
209 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
210 break
196 break
211 except OSError as e:
197 except OSError as e:
212 if e.errno == errno.EINTR:
198 if e.errno == errno.EINTR:
213 continue
199 continue
214 elif e.errno == errno.ECHILD:
200 elif e.errno == errno.ECHILD:
215 # child would already be reaped, but pids yet been
201 # child would already be reaped, but pids yet been
216 # updated (maybe interrupted just after waitpid)
202 # updated (maybe interrupted just after waitpid)
217 pids.discard(pid)
203 pids.discard(pid)
218 break
204 break
219 else:
205 else:
220 raise
206 raise
221 if not p:
207 if not p:
222 # skip subsequent steps, because child process should
208 # skip subsequent steps, because child process should
223 # be still running in this case
209 # be still running in this case
224 continue
210 continue
225 pids.discard(p)
211 pids.discard(p)
226 st = _exitstatus(st)
212 st = _exitstatus(st)
227 if st and not problem[0]:
213 if st and not problem[0]:
228 problem[0] = st
214 problem[0] = st
229
215
230 def sigchldhandler(signum, frame):
216 def sigchldhandler(signum, frame):
231 waitforworkers(blocking=False)
217 waitforworkers(blocking=False)
232 if problem[0]:
218 if problem[0]:
233 killworkers()
219 killworkers()
234
220
235 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
236 ui.flush()
222 ui.flush()
237 parentpid = os.getpid()
223 parentpid = os.getpid()
238 pipes = []
224 pipes = []
239 retval = {}
225 retval = {}
240 for pargs in partition(args, min(workers, len(args))):
226 for pargs in partition(args, min(workers, len(args))):
241 # Every worker gets its own pipe to send results on, so we don't have to
227 # Every worker gets its own pipe to send results on, so we don't have to
242 # implement atomic writes larger than PIPE_BUF. Each forked process has
228 # implement atomic writes larger than PIPE_BUF. Each forked process has
243 # its own pipe's descriptors in the local variables, and the parent
229 # its own pipe's descriptors in the local variables, and the parent
244 # process has the full list of pipe descriptors (and it doesn't really
230 # process has the full list of pipe descriptors (and it doesn't really
245 # care what order they're in).
231 # care what order they're in).
246 rfd, wfd = os.pipe()
232 rfd, wfd = os.pipe()
247 pipes.append((rfd, wfd))
233 pipes.append((rfd, wfd))
248 # make sure we use os._exit in all worker code paths. otherwise the
234 # make sure we use os._exit in all worker code paths. otherwise the
249 # worker may do some clean-ups which could cause surprises like
235 # worker may do some clean-ups which could cause surprises like
250 # deadlock. see sshpeer.cleanup for example.
236 # deadlock. see sshpeer.cleanup for example.
251 # override error handling *before* fork. this is necessary because
237 # override error handling *before* fork. this is necessary because
252 # exception (signal) may arrive after fork, before "pid =" assignment
238 # exception (signal) may arrive after fork, before "pid =" assignment
253 # completes, and other exception handler (dispatch.py) can lead to
239 # completes, and other exception handler (dispatch.py) can lead to
254 # unexpected code path without os._exit.
240 # unexpected code path without os._exit.
255 ret = -1
241 ret = -1
256 try:
242 try:
257 pid = os.fork()
243 pid = os.fork()
258 if pid == 0:
244 if pid == 0:
259 signal.signal(signal.SIGINT, oldhandler)
245 signal.signal(signal.SIGINT, oldhandler)
260 signal.signal(signal.SIGCHLD, oldchldhandler)
246 signal.signal(signal.SIGCHLD, oldchldhandler)
261
247
262 def workerfunc():
248 def workerfunc():
263 for r, w in pipes[:-1]:
249 for r, w in pipes[:-1]:
264 os.close(r)
250 os.close(r)
265 os.close(w)
251 os.close(w)
266 os.close(rfd)
252 os.close(rfd)
267 for result in func(*(staticargs + (pargs,))):
253 for result in func(*(staticargs + (pargs,))):
268 os.write(wfd, pickle.dumps(result))
254 os.write(wfd, pickle.dumps(result))
269 return 0
255 return 0
270
256
271 ret = scmutil.callcatch(ui, workerfunc)
257 ret = scmutil.callcatch(ui, workerfunc)
272 except: # parent re-raises, child never returns
258 except: # parent re-raises, child never returns
273 if os.getpid() == parentpid:
259 if os.getpid() == parentpid:
274 raise
260 raise
275 exctype = sys.exc_info()[0]
261 exctype = sys.exc_info()[0]
276 force = not issubclass(exctype, KeyboardInterrupt)
262 force = not issubclass(exctype, KeyboardInterrupt)
277 ui.traceback(force=force)
263 ui.traceback(force=force)
278 finally:
264 finally:
279 if os.getpid() != parentpid:
265 if os.getpid() != parentpid:
280 try:
266 try:
281 ui.flush()
267 ui.flush()
282 except: # never returns, no re-raises
268 except: # never returns, no re-raises
283 pass
269 pass
284 finally:
270 finally:
285 os._exit(ret & 255)
271 os._exit(ret & 255)
286 pids.add(pid)
272 pids.add(pid)
287 selector = selectors.DefaultSelector()
273 selector = selectors.DefaultSelector()
288 for rfd, wfd in pipes:
274 for rfd, wfd in pipes:
289 os.close(wfd)
275 os.close(wfd)
290 # The stream has to be unbuffered. Otherwise, if all data is read from
276 # The stream has to be unbuffered. Otherwise, if all data is read from
291 # the raw file into the buffer, the selector thinks that the FD is not
277 # the raw file into the buffer, the selector thinks that the FD is not
292 # ready to read while pickle.load() could read from the buffer. This
278 # ready to read while pickle.load() could read from the buffer. This
293 # would delay the processing of readable items.
279 # would delay the processing of readable items.
294 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
280 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
295
281
296 def cleanup():
282 def cleanup():
297 signal.signal(signal.SIGINT, oldhandler)
283 signal.signal(signal.SIGINT, oldhandler)
298 waitforworkers()
284 waitforworkers()
299 signal.signal(signal.SIGCHLD, oldchldhandler)
285 signal.signal(signal.SIGCHLD, oldchldhandler)
300 selector.close()
286 selector.close()
301 return problem[0]
287 return problem[0]
302
288
303 try:
289 try:
304 openpipes = len(pipes)
290 openpipes = len(pipes)
305 while openpipes > 0:
291 while openpipes > 0:
306 for key, events in selector.select():
292 for key, events in selector.select():
307 try:
293 try:
308 # The pytype error likely goes away on a modern version of
294 # The pytype error likely goes away on a modern version of
309 # pytype having a modern typeshed snapshot.
295 # pytype having a modern typeshed snapshot.
310 # pytype: disable=wrong-arg-types
296 # pytype: disable=wrong-arg-types
311 res = pickle.load(_blockingreader(key.fileobj))
297 res = pickle.load(_blockingreader(key.fileobj))
312 # pytype: enable=wrong-arg-types
298 # pytype: enable=wrong-arg-types
313 if hasretval and res[0]:
299 if hasretval and res[0]:
314 retval.update(res[1])
300 retval.update(res[1])
315 else:
301 else:
316 yield res
302 yield res
317 except EOFError:
303 except EOFError:
318 selector.unregister(key.fileobj)
304 selector.unregister(key.fileobj)
319 key.fileobj.close()
305 key.fileobj.close()
320 openpipes -= 1
306 openpipes -= 1
321 except IOError as e:
307 except IOError as e:
322 if e.errno == errno.EINTR:
308 if e.errno == errno.EINTR:
323 continue
309 continue
324 raise
310 raise
325 except: # re-raises
311 except: # re-raises
326 killworkers()
312 killworkers()
327 cleanup()
313 cleanup()
328 raise
314 raise
329 status = cleanup()
315 status = cleanup()
330 if status:
316 if status:
331 if status < 0:
317 if status < 0:
332 os.kill(os.getpid(), -status)
318 os.kill(os.getpid(), -status)
333 raise error.WorkerError(status)
319 raise error.WorkerError(status)
334 if hasretval:
320 if hasretval:
335 yield True, retval
321 yield True, retval
336
322
337
323
338 def _posixexitstatus(code):
324 def _posixexitstatus(code):
339 """convert a posix exit status into the same form returned by
325 """convert a posix exit status into the same form returned by
340 os.spawnv
326 os.spawnv
341
327
342 returns None if the process was stopped instead of exiting"""
328 returns None if the process was stopped instead of exiting"""
343 if os.WIFEXITED(code):
329 if os.WIFEXITED(code):
344 return os.WEXITSTATUS(code)
330 return os.WEXITSTATUS(code)
345 elif os.WIFSIGNALED(code):
331 elif os.WIFSIGNALED(code):
346 return -(os.WTERMSIG(code))
332 return -(os.WTERMSIG(code))
347
333
348
334
349 def _windowsworker(ui, func, staticargs, args, hasretval):
335 def _windowsworker(ui, func, staticargs, args, hasretval):
350 class Worker(threading.Thread):
336 class Worker(threading.Thread):
351 def __init__(
337 def __init__(
352 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
338 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
353 ):
339 ):
354 threading.Thread.__init__(self, *args, **kwargs)
340 threading.Thread.__init__(self, *args, **kwargs)
355 self._taskqueue = taskqueue
341 self._taskqueue = taskqueue
356 self._resultqueue = resultqueue
342 self._resultqueue = resultqueue
357 self._func = func
343 self._func = func
358 self._staticargs = staticargs
344 self._staticargs = staticargs
359 self._interrupted = False
345 self._interrupted = False
360 self.daemon = True
346 self.daemon = True
361 self.exception = None
347 self.exception = None
362
348
363 def interrupt(self):
349 def interrupt(self):
364 self._interrupted = True
350 self._interrupted = True
365
351
366 def run(self):
352 def run(self):
367 try:
353 try:
368 while not self._taskqueue.empty():
354 while not self._taskqueue.empty():
369 try:
355 try:
370 args = self._taskqueue.get_nowait()
356 args = self._taskqueue.get_nowait()
371 for res in self._func(*self._staticargs + (args,)):
357 for res in self._func(*self._staticargs + (args,)):
372 self._resultqueue.put(res)
358 self._resultqueue.put(res)
373 # threading doesn't provide a native way to
359 # threading doesn't provide a native way to
374 # interrupt execution. handle it manually at every
360 # interrupt execution. handle it manually at every
375 # iteration.
361 # iteration.
376 if self._interrupted:
362 if self._interrupted:
377 return
363 return
378 except pycompat.queue.Empty:
364 except pycompat.queue.Empty:
379 break
365 break
380 except Exception as e:
366 except Exception as e:
381 # store the exception such that the main thread can resurface
367 # store the exception such that the main thread can resurface
382 # it as if the func was running without workers.
368 # it as if the func was running without workers.
383 self.exception = e
369 self.exception = e
384 raise
370 raise
385
371
386 threads = []
372 threads = []
387
373
388 def trykillworkers():
374 def trykillworkers():
389 # Allow up to 1 second to clean worker threads nicely
375 # Allow up to 1 second to clean worker threads nicely
390 cleanupend = time.time() + 1
376 cleanupend = time.time() + 1
391 for t in threads:
377 for t in threads:
392 t.interrupt()
378 t.interrupt()
393 for t in threads:
379 for t in threads:
394 remainingtime = cleanupend - time.time()
380 remainingtime = cleanupend - time.time()
395 t.join(remainingtime)
381 t.join(remainingtime)
396 if t.is_alive():
382 if t.is_alive():
397 # pass over the workers joining failure. it is more
383 # pass over the workers joining failure. it is more
398 # important to surface the inital exception than the
384 # important to surface the inital exception than the
399 # fact that one of workers may be processing a large
385 # fact that one of workers may be processing a large
400 # task and does not get to handle the interruption.
386 # task and does not get to handle the interruption.
401 ui.warn(
387 ui.warn(
402 _(
388 _(
403 b"failed to kill worker threads while "
389 b"failed to kill worker threads while "
404 b"handling an exception\n"
390 b"handling an exception\n"
405 )
391 )
406 )
392 )
407 return
393 return
408
394
409 workers = _numworkers(ui)
395 workers = _numworkers(ui)
410 resultqueue = pycompat.queue.Queue()
396 resultqueue = pycompat.queue.Queue()
411 taskqueue = pycompat.queue.Queue()
397 taskqueue = pycompat.queue.Queue()
412 retval = {}
398 retval = {}
413 # partition work to more pieces than workers to minimize the chance
399 # partition work to more pieces than workers to minimize the chance
414 # of uneven distribution of large tasks between the workers
400 # of uneven distribution of large tasks between the workers
415 for pargs in partition(args, workers * 20):
401 for pargs in partition(args, workers * 20):
416 taskqueue.put(pargs)
402 taskqueue.put(pargs)
417 for _i in range(workers):
403 for _i in range(workers):
418 t = Worker(taskqueue, resultqueue, func, staticargs)
404 t = Worker(taskqueue, resultqueue, func, staticargs)
419 threads.append(t)
405 threads.append(t)
420 t.start()
406 t.start()
421 try:
407 try:
422 while len(threads) > 0:
408 while len(threads) > 0:
423 while not resultqueue.empty():
409 while not resultqueue.empty():
424 res = resultqueue.get()
410 res = resultqueue.get()
425 if hasretval and res[0]:
411 if hasretval and res[0]:
426 retval.update(res[1])
412 retval.update(res[1])
427 else:
413 else:
428 yield res
414 yield res
429 threads[0].join(0.05)
415 threads[0].join(0.05)
430 finishedthreads = [_t for _t in threads if not _t.is_alive()]
416 finishedthreads = [_t for _t in threads if not _t.is_alive()]
431 for t in finishedthreads:
417 for t in finishedthreads:
432 if t.exception is not None:
418 if t.exception is not None:
433 raise t.exception
419 raise t.exception
434 threads.remove(t)
420 threads.remove(t)
435 except (Exception, KeyboardInterrupt): # re-raises
421 except (Exception, KeyboardInterrupt): # re-raises
436 trykillworkers()
422 trykillworkers()
437 raise
423 raise
438 while not resultqueue.empty():
424 while not resultqueue.empty():
439 res = resultqueue.get()
425 res = resultqueue.get()
440 if hasretval and res[0]:
426 if hasretval and res[0]:
441 retval.update(res[1])
427 retval.update(res[1])
442 else:
428 else:
443 yield res
429 yield res
444 if hasretval:
430 if hasretval:
445 yield True, retval
431 yield True, retval
446
432
447
433
448 if pycompat.iswindows:
434 if pycompat.iswindows:
449 _platformworker = _windowsworker
435 _platformworker = _windowsworker
450 else:
436 else:
451 _platformworker = _posixworker
437 _platformworker = _posixworker
452 _exitstatus = _posixexitstatus
438 _exitstatus = _posixexitstatus
453
439
454
440
455 def partition(lst, nslices):
441 def partition(lst, nslices):
456 """partition a list into N slices of roughly equal size
442 """partition a list into N slices of roughly equal size
457
443
458 The current strategy takes every Nth element from the input. If
444 The current strategy takes every Nth element from the input. If
459 we ever write workers that need to preserve grouping in input
445 we ever write workers that need to preserve grouping in input
460 we should consider allowing callers to specify a partition strategy.
446 we should consider allowing callers to specify a partition strategy.
461
447
462 olivia is not a fan of this partitioning strategy when files are involved.
448 olivia is not a fan of this partitioning strategy when files are involved.
463 In his words:
449 In his words:
464
450
465 Single-threaded Mercurial makes a point of creating and visiting
451 Single-threaded Mercurial makes a point of creating and visiting
466 files in a fixed order (alphabetical). When creating files in order,
452 files in a fixed order (alphabetical). When creating files in order,
467 a typical filesystem is likely to allocate them on nearby regions on
453 a typical filesystem is likely to allocate them on nearby regions on
468 disk. Thus, when revisiting in the same order, locality is maximized
454 disk. Thus, when revisiting in the same order, locality is maximized
469 and various forms of OS and disk-level caching and read-ahead get a
455 and various forms of OS and disk-level caching and read-ahead get a
470 chance to work.
456 chance to work.
471
457
472 This effect can be quite significant on spinning disks. I discovered it
458 This effect can be quite significant on spinning disks. I discovered it
473 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
459 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
474 Tarring a repo and copying it to another disk effectively randomized
460 Tarring a repo and copying it to another disk effectively randomized
475 the revlog ordering on disk by sorting the revlogs by hash and suddenly
461 the revlog ordering on disk by sorting the revlogs by hash and suddenly
476 performance of my kernel checkout benchmark dropped by ~10x because the
462 performance of my kernel checkout benchmark dropped by ~10x because the
477 "working set" of sectors visited no longer fit in the drive's cache and
463 "working set" of sectors visited no longer fit in the drive's cache and
478 the workload switched from streaming to random I/O.
464 the workload switched from streaming to random I/O.
479
465
480 What we should really be doing is have workers read filenames from a
466 What we should really be doing is have workers read filenames from a
481 ordered queue. This preserves locality and also keeps any worker from
467 ordered queue. This preserves locality and also keeps any worker from
482 getting more than one file out of balance.
468 getting more than one file out of balance.
483 """
469 """
484 for i in range(nslices):
470 for i in range(nslices):
485 yield lst[i::nslices]
471 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now