##// END OF EJS Templates
worker: silence type error when calling pickle...
Gregory Szorc -
r49767:a0674e91 default
parent child Browse files
Show More
@@ -1,455 +1,459 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 def ismainthread():
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
68 return threading.current_thread() == threading.main_thread()
69
69
70
70
71 class _blockingreader(object):
71 class _blockingreader(object):
72 def __init__(self, wrapped):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
73 self._wrapped = wrapped
74
74
75 # Do NOT implement readinto() by making it delegate to
75 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
77 # with just read() and readline(), so we don't need to implement it.
78
78
79 def readline(self):
79 def readline(self):
80 return self._wrapped.readline()
80 return self._wrapped.readline()
81
81
82 # issue multiple reads until size is fulfilled
82 # issue multiple reads until size is fulfilled
83 def read(self, size=-1):
83 def read(self, size=-1):
84 if size < 0:
84 if size < 0:
85 return self._wrapped.readall()
85 return self._wrapped.readall()
86
86
87 buf = bytearray(size)
87 buf = bytearray(size)
88 view = memoryview(buf)
88 view = memoryview(buf)
89 pos = 0
89 pos = 0
90
90
91 while pos < size:
91 while pos < size:
92 ret = self._wrapped.readinto(view[pos:])
92 ret = self._wrapped.readinto(view[pos:])
93 if not ret:
93 if not ret:
94 break
94 break
95 pos += ret
95 pos += ret
96
96
97 del view
97 del view
98 del buf[pos:]
98 del buf[pos:]
99 return bytes(buf)
99 return bytes(buf)
100
100
101
101
102 if pycompat.isposix or pycompat.iswindows:
102 if pycompat.isposix or pycompat.iswindows:
103 _STARTUP_COST = 0.01
103 _STARTUP_COST = 0.01
104 # The Windows worker is thread based. If tasks are CPU bound, threads
104 # The Windows worker is thread based. If tasks are CPU bound, threads
105 # in the presence of the GIL result in excessive context switching and
105 # in the presence of the GIL result in excessive context switching and
106 # this overhead can slow down execution.
106 # this overhead can slow down execution.
107 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
107 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
108 else:
108 else:
109 _STARTUP_COST = 1e30
109 _STARTUP_COST = 1e30
110 _DISALLOW_THREAD_UNSAFE = False
110 _DISALLOW_THREAD_UNSAFE = False
111
111
112
112
113 def worthwhile(ui, costperop, nops, threadsafe=True):
113 def worthwhile(ui, costperop, nops, threadsafe=True):
114 """try to determine whether the benefit of multiple processes can
114 """try to determine whether the benefit of multiple processes can
115 outweigh the cost of starting them"""
115 outweigh the cost of starting them"""
116
116
117 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
117 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
118 return False
118 return False
119
119
120 linear = costperop * nops
120 linear = costperop * nops
121 workers = _numworkers(ui)
121 workers = _numworkers(ui)
122 benefit = linear - (_STARTUP_COST * workers + linear / workers)
122 benefit = linear - (_STARTUP_COST * workers + linear / workers)
123 return benefit >= 0.15
123 return benefit >= 0.15
124
124
125
125
126 def worker(
126 def worker(
127 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
127 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ):
128 ):
129 """run a function, possibly in parallel in multiple worker
129 """run a function, possibly in parallel in multiple worker
130 processes.
130 processes.
131
131
132 returns a progress iterator
132 returns a progress iterator
133
133
134 costperarg - cost of a single task
134 costperarg - cost of a single task
135
135
136 func - function to run. It is expected to return a progress iterator.
136 func - function to run. It is expected to return a progress iterator.
137
137
138 staticargs - arguments to pass to every invocation of the function
138 staticargs - arguments to pass to every invocation of the function
139
139
140 args - arguments to split into chunks, to pass to individual
140 args - arguments to split into chunks, to pass to individual
141 workers
141 workers
142
142
143 hasretval - when True, func and the current function return an progress
143 hasretval - when True, func and the current function return an progress
144 iterator then a dict (encoded as an iterator that yield many (False, ..)
144 iterator then a dict (encoded as an iterator that yield many (False, ..)
145 then a (True, dict)). The dicts are joined in some arbitrary order, so
145 then a (True, dict)). The dicts are joined in some arbitrary order, so
146 overlapping keys are a bad idea.
146 overlapping keys are a bad idea.
147
147
148 threadsafe - whether work items are thread safe and can be executed using
148 threadsafe - whether work items are thread safe and can be executed using
149 a thread-based worker. Should be disabled for CPU heavy tasks that don't
149 a thread-based worker. Should be disabled for CPU heavy tasks that don't
150 release the GIL.
150 release the GIL.
151 """
151 """
152 enabled = ui.configbool(b'worker', b'enabled')
152 enabled = ui.configbool(b'worker', b'enabled')
153 if enabled and _platformworker is _posixworker and not ismainthread():
153 if enabled and _platformworker is _posixworker and not ismainthread():
154 # The POSIX worker has to install a handler for SIGCHLD.
154 # The POSIX worker has to install a handler for SIGCHLD.
155 # Python up to 3.9 only allows this in the main thread.
155 # Python up to 3.9 only allows this in the main thread.
156 enabled = False
156 enabled = False
157
157
158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
159 return _platformworker(ui, func, staticargs, args, hasretval)
159 return _platformworker(ui, func, staticargs, args, hasretval)
160 return func(*staticargs + (args,))
160 return func(*staticargs + (args,))
161
161
162
162
163 def _posixworker(ui, func, staticargs, args, hasretval):
163 def _posixworker(ui, func, staticargs, args, hasretval):
164 workers = _numworkers(ui)
164 workers = _numworkers(ui)
165 oldhandler = signal.getsignal(signal.SIGINT)
165 oldhandler = signal.getsignal(signal.SIGINT)
166 signal.signal(signal.SIGINT, signal.SIG_IGN)
166 signal.signal(signal.SIGINT, signal.SIG_IGN)
167 pids, problem = set(), [0]
167 pids, problem = set(), [0]
168
168
169 def killworkers():
169 def killworkers():
170 # unregister SIGCHLD handler as all children will be killed. This
170 # unregister SIGCHLD handler as all children will be killed. This
171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
172 # could be updated while iterating, which would cause inconsistency.
172 # could be updated while iterating, which would cause inconsistency.
173 signal.signal(signal.SIGCHLD, oldchldhandler)
173 signal.signal(signal.SIGCHLD, oldchldhandler)
174 # if one worker bails, there's no good reason to wait for the rest
174 # if one worker bails, there's no good reason to wait for the rest
175 for p in pids:
175 for p in pids:
176 try:
176 try:
177 os.kill(p, signal.SIGTERM)
177 os.kill(p, signal.SIGTERM)
178 except OSError as err:
178 except OSError as err:
179 if err.errno != errno.ESRCH:
179 if err.errno != errno.ESRCH:
180 raise
180 raise
181
181
182 def waitforworkers(blocking=True):
182 def waitforworkers(blocking=True):
183 for pid in pids.copy():
183 for pid in pids.copy():
184 p = st = 0
184 p = st = 0
185 while True:
185 while True:
186 try:
186 try:
187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
188 break
188 break
189 except OSError as e:
189 except OSError as e:
190 if e.errno == errno.EINTR:
190 if e.errno == errno.EINTR:
191 continue
191 continue
192 elif e.errno == errno.ECHILD:
192 elif e.errno == errno.ECHILD:
193 # child would already be reaped, but pids yet been
193 # child would already be reaped, but pids yet been
194 # updated (maybe interrupted just after waitpid)
194 # updated (maybe interrupted just after waitpid)
195 pids.discard(pid)
195 pids.discard(pid)
196 break
196 break
197 else:
197 else:
198 raise
198 raise
199 if not p:
199 if not p:
200 # skip subsequent steps, because child process should
200 # skip subsequent steps, because child process should
201 # be still running in this case
201 # be still running in this case
202 continue
202 continue
203 pids.discard(p)
203 pids.discard(p)
204 st = _exitstatus(st)
204 st = _exitstatus(st)
205 if st and not problem[0]:
205 if st and not problem[0]:
206 problem[0] = st
206 problem[0] = st
207
207
208 def sigchldhandler(signum, frame):
208 def sigchldhandler(signum, frame):
209 waitforworkers(blocking=False)
209 waitforworkers(blocking=False)
210 if problem[0]:
210 if problem[0]:
211 killworkers()
211 killworkers()
212
212
213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
214 ui.flush()
214 ui.flush()
215 parentpid = os.getpid()
215 parentpid = os.getpid()
216 pipes = []
216 pipes = []
217 retval = {}
217 retval = {}
218 for pargs in partition(args, min(workers, len(args))):
218 for pargs in partition(args, min(workers, len(args))):
219 # Every worker gets its own pipe to send results on, so we don't have to
219 # Every worker gets its own pipe to send results on, so we don't have to
220 # implement atomic writes larger than PIPE_BUF. Each forked process has
220 # implement atomic writes larger than PIPE_BUF. Each forked process has
221 # its own pipe's descriptors in the local variables, and the parent
221 # its own pipe's descriptors in the local variables, and the parent
222 # process has the full list of pipe descriptors (and it doesn't really
222 # process has the full list of pipe descriptors (and it doesn't really
223 # care what order they're in).
223 # care what order they're in).
224 rfd, wfd = os.pipe()
224 rfd, wfd = os.pipe()
225 pipes.append((rfd, wfd))
225 pipes.append((rfd, wfd))
226 # make sure we use os._exit in all worker code paths. otherwise the
226 # make sure we use os._exit in all worker code paths. otherwise the
227 # worker may do some clean-ups which could cause surprises like
227 # worker may do some clean-ups which could cause surprises like
228 # deadlock. see sshpeer.cleanup for example.
228 # deadlock. see sshpeer.cleanup for example.
229 # override error handling *before* fork. this is necessary because
229 # override error handling *before* fork. this is necessary because
230 # exception (signal) may arrive after fork, before "pid =" assignment
230 # exception (signal) may arrive after fork, before "pid =" assignment
231 # completes, and other exception handler (dispatch.py) can lead to
231 # completes, and other exception handler (dispatch.py) can lead to
232 # unexpected code path without os._exit.
232 # unexpected code path without os._exit.
233 ret = -1
233 ret = -1
234 try:
234 try:
235 pid = os.fork()
235 pid = os.fork()
236 if pid == 0:
236 if pid == 0:
237 signal.signal(signal.SIGINT, oldhandler)
237 signal.signal(signal.SIGINT, oldhandler)
238 signal.signal(signal.SIGCHLD, oldchldhandler)
238 signal.signal(signal.SIGCHLD, oldchldhandler)
239
239
240 def workerfunc():
240 def workerfunc():
241 for r, w in pipes[:-1]:
241 for r, w in pipes[:-1]:
242 os.close(r)
242 os.close(r)
243 os.close(w)
243 os.close(w)
244 os.close(rfd)
244 os.close(rfd)
245 for result in func(*(staticargs + (pargs,))):
245 for result in func(*(staticargs + (pargs,))):
246 os.write(wfd, pickle.dumps(result))
246 os.write(wfd, pickle.dumps(result))
247 return 0
247 return 0
248
248
249 ret = scmutil.callcatch(ui, workerfunc)
249 ret = scmutil.callcatch(ui, workerfunc)
250 except: # parent re-raises, child never returns
250 except: # parent re-raises, child never returns
251 if os.getpid() == parentpid:
251 if os.getpid() == parentpid:
252 raise
252 raise
253 exctype = sys.exc_info()[0]
253 exctype = sys.exc_info()[0]
254 force = not issubclass(exctype, KeyboardInterrupt)
254 force = not issubclass(exctype, KeyboardInterrupt)
255 ui.traceback(force=force)
255 ui.traceback(force=force)
256 finally:
256 finally:
257 if os.getpid() != parentpid:
257 if os.getpid() != parentpid:
258 try:
258 try:
259 ui.flush()
259 ui.flush()
260 except: # never returns, no re-raises
260 except: # never returns, no re-raises
261 pass
261 pass
262 finally:
262 finally:
263 os._exit(ret & 255)
263 os._exit(ret & 255)
264 pids.add(pid)
264 pids.add(pid)
265 selector = selectors.DefaultSelector()
265 selector = selectors.DefaultSelector()
266 for rfd, wfd in pipes:
266 for rfd, wfd in pipes:
267 os.close(wfd)
267 os.close(wfd)
268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
269
269
270 def cleanup():
270 def cleanup():
271 signal.signal(signal.SIGINT, oldhandler)
271 signal.signal(signal.SIGINT, oldhandler)
272 waitforworkers()
272 waitforworkers()
273 signal.signal(signal.SIGCHLD, oldchldhandler)
273 signal.signal(signal.SIGCHLD, oldchldhandler)
274 selector.close()
274 selector.close()
275 return problem[0]
275 return problem[0]
276
276
277 try:
277 try:
278 openpipes = len(pipes)
278 openpipes = len(pipes)
279 while openpipes > 0:
279 while openpipes > 0:
280 for key, events in selector.select():
280 for key, events in selector.select():
281 try:
281 try:
282 # The pytype error likely goes away on a modern version of
283 # pytype having a modern typeshed snapshot.
284 # pytype: disable=wrong-arg-types
282 res = pickle.load(_blockingreader(key.fileobj))
285 res = pickle.load(_blockingreader(key.fileobj))
286 # pytype: enable=wrong-arg-types
283 if hasretval and res[0]:
287 if hasretval and res[0]:
284 retval.update(res[1])
288 retval.update(res[1])
285 else:
289 else:
286 yield res
290 yield res
287 except EOFError:
291 except EOFError:
288 selector.unregister(key.fileobj)
292 selector.unregister(key.fileobj)
289 key.fileobj.close()
293 key.fileobj.close()
290 openpipes -= 1
294 openpipes -= 1
291 except IOError as e:
295 except IOError as e:
292 if e.errno == errno.EINTR:
296 if e.errno == errno.EINTR:
293 continue
297 continue
294 raise
298 raise
295 except: # re-raises
299 except: # re-raises
296 killworkers()
300 killworkers()
297 cleanup()
301 cleanup()
298 raise
302 raise
299 status = cleanup()
303 status = cleanup()
300 if status:
304 if status:
301 if status < 0:
305 if status < 0:
302 os.kill(os.getpid(), -status)
306 os.kill(os.getpid(), -status)
303 raise error.WorkerError(status)
307 raise error.WorkerError(status)
304 if hasretval:
308 if hasretval:
305 yield True, retval
309 yield True, retval
306
310
307
311
308 def _posixexitstatus(code):
312 def _posixexitstatus(code):
309 """convert a posix exit status into the same form returned by
313 """convert a posix exit status into the same form returned by
310 os.spawnv
314 os.spawnv
311
315
312 returns None if the process was stopped instead of exiting"""
316 returns None if the process was stopped instead of exiting"""
313 if os.WIFEXITED(code):
317 if os.WIFEXITED(code):
314 return os.WEXITSTATUS(code)
318 return os.WEXITSTATUS(code)
315 elif os.WIFSIGNALED(code):
319 elif os.WIFSIGNALED(code):
316 return -(os.WTERMSIG(code))
320 return -(os.WTERMSIG(code))
317
321
318
322
319 def _windowsworker(ui, func, staticargs, args, hasretval):
323 def _windowsworker(ui, func, staticargs, args, hasretval):
320 class Worker(threading.Thread):
324 class Worker(threading.Thread):
321 def __init__(
325 def __init__(
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
326 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 ):
327 ):
324 threading.Thread.__init__(self, *args, **kwargs)
328 threading.Thread.__init__(self, *args, **kwargs)
325 self._taskqueue = taskqueue
329 self._taskqueue = taskqueue
326 self._resultqueue = resultqueue
330 self._resultqueue = resultqueue
327 self._func = func
331 self._func = func
328 self._staticargs = staticargs
332 self._staticargs = staticargs
329 self._interrupted = False
333 self._interrupted = False
330 self.daemon = True
334 self.daemon = True
331 self.exception = None
335 self.exception = None
332
336
333 def interrupt(self):
337 def interrupt(self):
334 self._interrupted = True
338 self._interrupted = True
335
339
336 def run(self):
340 def run(self):
337 try:
341 try:
338 while not self._taskqueue.empty():
342 while not self._taskqueue.empty():
339 try:
343 try:
340 args = self._taskqueue.get_nowait()
344 args = self._taskqueue.get_nowait()
341 for res in self._func(*self._staticargs + (args,)):
345 for res in self._func(*self._staticargs + (args,)):
342 self._resultqueue.put(res)
346 self._resultqueue.put(res)
343 # threading doesn't provide a native way to
347 # threading doesn't provide a native way to
344 # interrupt execution. handle it manually at every
348 # interrupt execution. handle it manually at every
345 # iteration.
349 # iteration.
346 if self._interrupted:
350 if self._interrupted:
347 return
351 return
348 except pycompat.queue.Empty:
352 except pycompat.queue.Empty:
349 break
353 break
350 except Exception as e:
354 except Exception as e:
351 # store the exception such that the main thread can resurface
355 # store the exception such that the main thread can resurface
352 # it as if the func was running without workers.
356 # it as if the func was running without workers.
353 self.exception = e
357 self.exception = e
354 raise
358 raise
355
359
356 threads = []
360 threads = []
357
361
358 def trykillworkers():
362 def trykillworkers():
359 # Allow up to 1 second to clean worker threads nicely
363 # Allow up to 1 second to clean worker threads nicely
360 cleanupend = time.time() + 1
364 cleanupend = time.time() + 1
361 for t in threads:
365 for t in threads:
362 t.interrupt()
366 t.interrupt()
363 for t in threads:
367 for t in threads:
364 remainingtime = cleanupend - time.time()
368 remainingtime = cleanupend - time.time()
365 t.join(remainingtime)
369 t.join(remainingtime)
366 if t.is_alive():
370 if t.is_alive():
367 # pass over the workers joining failure. it is more
371 # pass over the workers joining failure. it is more
368 # important to surface the inital exception than the
372 # important to surface the inital exception than the
369 # fact that one of workers may be processing a large
373 # fact that one of workers may be processing a large
370 # task and does not get to handle the interruption.
374 # task and does not get to handle the interruption.
371 ui.warn(
375 ui.warn(
372 _(
376 _(
373 b"failed to kill worker threads while "
377 b"failed to kill worker threads while "
374 b"handling an exception\n"
378 b"handling an exception\n"
375 )
379 )
376 )
380 )
377 return
381 return
378
382
379 workers = _numworkers(ui)
383 workers = _numworkers(ui)
380 resultqueue = pycompat.queue.Queue()
384 resultqueue = pycompat.queue.Queue()
381 taskqueue = pycompat.queue.Queue()
385 taskqueue = pycompat.queue.Queue()
382 retval = {}
386 retval = {}
383 # partition work to more pieces than workers to minimize the chance
387 # partition work to more pieces than workers to minimize the chance
384 # of uneven distribution of large tasks between the workers
388 # of uneven distribution of large tasks between the workers
385 for pargs in partition(args, workers * 20):
389 for pargs in partition(args, workers * 20):
386 taskqueue.put(pargs)
390 taskqueue.put(pargs)
387 for _i in range(workers):
391 for _i in range(workers):
388 t = Worker(taskqueue, resultqueue, func, staticargs)
392 t = Worker(taskqueue, resultqueue, func, staticargs)
389 threads.append(t)
393 threads.append(t)
390 t.start()
394 t.start()
391 try:
395 try:
392 while len(threads) > 0:
396 while len(threads) > 0:
393 while not resultqueue.empty():
397 while not resultqueue.empty():
394 res = resultqueue.get()
398 res = resultqueue.get()
395 if hasretval and res[0]:
399 if hasretval and res[0]:
396 retval.update(res[1])
400 retval.update(res[1])
397 else:
401 else:
398 yield res
402 yield res
399 threads[0].join(0.05)
403 threads[0].join(0.05)
400 finishedthreads = [_t for _t in threads if not _t.is_alive()]
404 finishedthreads = [_t for _t in threads if not _t.is_alive()]
401 for t in finishedthreads:
405 for t in finishedthreads:
402 if t.exception is not None:
406 if t.exception is not None:
403 raise t.exception
407 raise t.exception
404 threads.remove(t)
408 threads.remove(t)
405 except (Exception, KeyboardInterrupt): # re-raises
409 except (Exception, KeyboardInterrupt): # re-raises
406 trykillworkers()
410 trykillworkers()
407 raise
411 raise
408 while not resultqueue.empty():
412 while not resultqueue.empty():
409 res = resultqueue.get()
413 res = resultqueue.get()
410 if hasretval and res[0]:
414 if hasretval and res[0]:
411 retval.update(res[1])
415 retval.update(res[1])
412 else:
416 else:
413 yield res
417 yield res
414 if hasretval:
418 if hasretval:
415 yield True, retval
419 yield True, retval
416
420
417
421
418 if pycompat.iswindows:
422 if pycompat.iswindows:
419 _platformworker = _windowsworker
423 _platformworker = _windowsworker
420 else:
424 else:
421 _platformworker = _posixworker
425 _platformworker = _posixworker
422 _exitstatus = _posixexitstatus
426 _exitstatus = _posixexitstatus
423
427
424
428
425 def partition(lst, nslices):
429 def partition(lst, nslices):
426 """partition a list into N slices of roughly equal size
430 """partition a list into N slices of roughly equal size
427
431
428 The current strategy takes every Nth element from the input. If
432 The current strategy takes every Nth element from the input. If
429 we ever write workers that need to preserve grouping in input
433 we ever write workers that need to preserve grouping in input
430 we should consider allowing callers to specify a partition strategy.
434 we should consider allowing callers to specify a partition strategy.
431
435
432 olivia is not a fan of this partitioning strategy when files are involved.
436 olivia is not a fan of this partitioning strategy when files are involved.
433 In his words:
437 In his words:
434
438
435 Single-threaded Mercurial makes a point of creating and visiting
439 Single-threaded Mercurial makes a point of creating and visiting
436 files in a fixed order (alphabetical). When creating files in order,
440 files in a fixed order (alphabetical). When creating files in order,
437 a typical filesystem is likely to allocate them on nearby regions on
441 a typical filesystem is likely to allocate them on nearby regions on
438 disk. Thus, when revisiting in the same order, locality is maximized
442 disk. Thus, when revisiting in the same order, locality is maximized
439 and various forms of OS and disk-level caching and read-ahead get a
443 and various forms of OS and disk-level caching and read-ahead get a
440 chance to work.
444 chance to work.
441
445
442 This effect can be quite significant on spinning disks. I discovered it
446 This effect can be quite significant on spinning disks. I discovered it
443 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
447 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
444 Tarring a repo and copying it to another disk effectively randomized
448 Tarring a repo and copying it to another disk effectively randomized
445 the revlog ordering on disk by sorting the revlogs by hash and suddenly
449 the revlog ordering on disk by sorting the revlogs by hash and suddenly
446 performance of my kernel checkout benchmark dropped by ~10x because the
450 performance of my kernel checkout benchmark dropped by ~10x because the
447 "working set" of sectors visited no longer fit in the drive's cache and
451 "working set" of sectors visited no longer fit in the drive's cache and
448 the workload switched from streaming to random I/O.
452 the workload switched from streaming to random I/O.
449
453
450 What we should really be doing is have workers read filenames from a
454 What we should really be doing is have workers read filenames from a
451 ordered queue. This preserves locality and also keeps any worker from
455 ordered queue. This preserves locality and also keeps any worker from
452 getting more than one file out of balance.
456 getting more than one file out of balance.
453 """
457 """
454 for i in range(nslices):
458 for i in range(nslices):
455 yield lst[i::nslices]
459 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now