##// END OF EJS Templates
worker: remove Python 2 support code...
Gregory Szorc -
r49756:cc0e059d default
parent child Browse files
Show More
@@ -1,467 +1,455 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 )
30 )
31
31
32
32
33 def countcpus():
33 def countcpus():
34 '''try to count the number of CPUs on the system'''
34 '''try to count the number of CPUs on the system'''
35
35
36 # posix
36 # posix
37 try:
37 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
39 if n > 0:
40 return n
40 return n
41 except (AttributeError, ValueError):
41 except (AttributeError, ValueError):
42 pass
42 pass
43
43
44 # windows
44 # windows
45 try:
45 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
47 if n > 0:
48 return n
48 return n
49 except (KeyError, ValueError):
49 except (KeyError, ValueError):
50 pass
50 pass
51
51
52 return 1
52 return 1
53
53
54
54
55 def _numworkers(ui):
55 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
56 s = ui.config(b'worker', b'numcpus')
57 if s:
57 if s:
58 try:
58 try:
59 n = int(s)
59 n = int(s)
60 if n >= 1:
60 if n >= 1:
61 return n
61 return n
62 except ValueError:
62 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
63 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
64 return min(max(countcpus(), 4), 32)
65
65
66
66
67 if pycompat.ispy3:
67 def ismainthread():
68
68 return threading.current_thread() == threading.main_thread()
69 def ismainthread():
70 return threading.current_thread() == threading.main_thread()
71
72 class _blockingreader(object):
73 def __init__(self, wrapped):
74 self._wrapped = wrapped
75
76 # Do NOT implement readinto() by making it delegate to
77 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
78 # with just read() and readline(), so we don't need to implement it.
79
80 def readline(self):
81 return self._wrapped.readline()
82
83 # issue multiple reads until size is fulfilled
84 def read(self, size=-1):
85 if size < 0:
86 return self._wrapped.readall()
87
88 buf = bytearray(size)
89 view = memoryview(buf)
90 pos = 0
91
92 while pos < size:
93 ret = self._wrapped.readinto(view[pos:])
94 if not ret:
95 break
96 pos += ret
97
98 del view
99 del buf[pos:]
100 return bytes(buf)
101
69
102
70
103 else:
71 class _blockingreader(object):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
74
75 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
78
79 def readline(self):
80 return self._wrapped.readline()
104
81
105 def ismainthread():
82 # issue multiple reads until size is fulfilled
106 # pytype: disable=module-attr
83 def read(self, size=-1):
107 return isinstance(threading.current_thread(), threading._MainThread)
84 if size < 0:
108 # pytype: enable=module-attr
85 return self._wrapped.readall()
86
87 buf = bytearray(size)
88 view = memoryview(buf)
89 pos = 0
109
90
110 def _blockingreader(wrapped):
91 while pos < size:
111 return wrapped
92 ret = self._wrapped.readinto(view[pos:])
93 if not ret:
94 break
95 pos += ret
96
97 del view
98 del buf[pos:]
99 return bytes(buf)
112
100
113
101
114 if pycompat.isposix or pycompat.iswindows:
102 if pycompat.isposix or pycompat.iswindows:
115 _STARTUP_COST = 0.01
103 _STARTUP_COST = 0.01
116 # The Windows worker is thread based. If tasks are CPU bound, threads
104 # The Windows worker is thread based. If tasks are CPU bound, threads
117 # in the presence of the GIL result in excessive context switching and
105 # in the presence of the GIL result in excessive context switching and
118 # this overhead can slow down execution.
106 # this overhead can slow down execution.
119 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
107 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
120 else:
108 else:
121 _STARTUP_COST = 1e30
109 _STARTUP_COST = 1e30
122 _DISALLOW_THREAD_UNSAFE = False
110 _DISALLOW_THREAD_UNSAFE = False
123
111
124
112
125 def worthwhile(ui, costperop, nops, threadsafe=True):
113 def worthwhile(ui, costperop, nops, threadsafe=True):
126 """try to determine whether the benefit of multiple processes can
114 """try to determine whether the benefit of multiple processes can
127 outweigh the cost of starting them"""
115 outweigh the cost of starting them"""
128
116
129 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
117 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
130 return False
118 return False
131
119
132 linear = costperop * nops
120 linear = costperop * nops
133 workers = _numworkers(ui)
121 workers = _numworkers(ui)
134 benefit = linear - (_STARTUP_COST * workers + linear / workers)
122 benefit = linear - (_STARTUP_COST * workers + linear / workers)
135 return benefit >= 0.15
123 return benefit >= 0.15
136
124
137
125
138 def worker(
126 def worker(
139 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
127 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
140 ):
128 ):
141 """run a function, possibly in parallel in multiple worker
129 """run a function, possibly in parallel in multiple worker
142 processes.
130 processes.
143
131
144 returns a progress iterator
132 returns a progress iterator
145
133
146 costperarg - cost of a single task
134 costperarg - cost of a single task
147
135
148 func - function to run. It is expected to return a progress iterator.
136 func - function to run. It is expected to return a progress iterator.
149
137
150 staticargs - arguments to pass to every invocation of the function
138 staticargs - arguments to pass to every invocation of the function
151
139
152 args - arguments to split into chunks, to pass to individual
140 args - arguments to split into chunks, to pass to individual
153 workers
141 workers
154
142
155 hasretval - when True, func and the current function return an progress
143 hasretval - when True, func and the current function return an progress
156 iterator then a dict (encoded as an iterator that yield many (False, ..)
144 iterator then a dict (encoded as an iterator that yield many (False, ..)
157 then a (True, dict)). The dicts are joined in some arbitrary order, so
145 then a (True, dict)). The dicts are joined in some arbitrary order, so
158 overlapping keys are a bad idea.
146 overlapping keys are a bad idea.
159
147
160 threadsafe - whether work items are thread safe and can be executed using
148 threadsafe - whether work items are thread safe and can be executed using
161 a thread-based worker. Should be disabled for CPU heavy tasks that don't
149 a thread-based worker. Should be disabled for CPU heavy tasks that don't
162 release the GIL.
150 release the GIL.
163 """
151 """
164 enabled = ui.configbool(b'worker', b'enabled')
152 enabled = ui.configbool(b'worker', b'enabled')
165 if enabled and _platformworker is _posixworker and not ismainthread():
153 if enabled and _platformworker is _posixworker and not ismainthread():
166 # The POSIX worker has to install a handler for SIGCHLD.
154 # The POSIX worker has to install a handler for SIGCHLD.
167 # Python up to 3.9 only allows this in the main thread.
155 # Python up to 3.9 only allows this in the main thread.
168 enabled = False
156 enabled = False
169
157
170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
171 return _platformworker(ui, func, staticargs, args, hasretval)
159 return _platformworker(ui, func, staticargs, args, hasretval)
172 return func(*staticargs + (args,))
160 return func(*staticargs + (args,))
173
161
174
162
175 def _posixworker(ui, func, staticargs, args, hasretval):
163 def _posixworker(ui, func, staticargs, args, hasretval):
176 workers = _numworkers(ui)
164 workers = _numworkers(ui)
177 oldhandler = signal.getsignal(signal.SIGINT)
165 oldhandler = signal.getsignal(signal.SIGINT)
178 signal.signal(signal.SIGINT, signal.SIG_IGN)
166 signal.signal(signal.SIGINT, signal.SIG_IGN)
179 pids, problem = set(), [0]
167 pids, problem = set(), [0]
180
168
181 def killworkers():
169 def killworkers():
182 # unregister SIGCHLD handler as all children will be killed. This
170 # unregister SIGCHLD handler as all children will be killed. This
183 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
184 # could be updated while iterating, which would cause inconsistency.
172 # could be updated while iterating, which would cause inconsistency.
185 signal.signal(signal.SIGCHLD, oldchldhandler)
173 signal.signal(signal.SIGCHLD, oldchldhandler)
186 # if one worker bails, there's no good reason to wait for the rest
174 # if one worker bails, there's no good reason to wait for the rest
187 for p in pids:
175 for p in pids:
188 try:
176 try:
189 os.kill(p, signal.SIGTERM)
177 os.kill(p, signal.SIGTERM)
190 except OSError as err:
178 except OSError as err:
191 if err.errno != errno.ESRCH:
179 if err.errno != errno.ESRCH:
192 raise
180 raise
193
181
194 def waitforworkers(blocking=True):
182 def waitforworkers(blocking=True):
195 for pid in pids.copy():
183 for pid in pids.copy():
196 p = st = 0
184 p = st = 0
197 while True:
185 while True:
198 try:
186 try:
199 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
200 break
188 break
201 except OSError as e:
189 except OSError as e:
202 if e.errno == errno.EINTR:
190 if e.errno == errno.EINTR:
203 continue
191 continue
204 elif e.errno == errno.ECHILD:
192 elif e.errno == errno.ECHILD:
205 # child would already be reaped, but pids yet been
193 # child would already be reaped, but pids yet been
206 # updated (maybe interrupted just after waitpid)
194 # updated (maybe interrupted just after waitpid)
207 pids.discard(pid)
195 pids.discard(pid)
208 break
196 break
209 else:
197 else:
210 raise
198 raise
211 if not p:
199 if not p:
212 # skip subsequent steps, because child process should
200 # skip subsequent steps, because child process should
213 # be still running in this case
201 # be still running in this case
214 continue
202 continue
215 pids.discard(p)
203 pids.discard(p)
216 st = _exitstatus(st)
204 st = _exitstatus(st)
217 if st and not problem[0]:
205 if st and not problem[0]:
218 problem[0] = st
206 problem[0] = st
219
207
220 def sigchldhandler(signum, frame):
208 def sigchldhandler(signum, frame):
221 waitforworkers(blocking=False)
209 waitforworkers(blocking=False)
222 if problem[0]:
210 if problem[0]:
223 killworkers()
211 killworkers()
224
212
225 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
226 ui.flush()
214 ui.flush()
227 parentpid = os.getpid()
215 parentpid = os.getpid()
228 pipes = []
216 pipes = []
229 retval = {}
217 retval = {}
230 for pargs in partition(args, min(workers, len(args))):
218 for pargs in partition(args, min(workers, len(args))):
231 # Every worker gets its own pipe to send results on, so we don't have to
219 # Every worker gets its own pipe to send results on, so we don't have to
232 # implement atomic writes larger than PIPE_BUF. Each forked process has
220 # implement atomic writes larger than PIPE_BUF. Each forked process has
233 # its own pipe's descriptors in the local variables, and the parent
221 # its own pipe's descriptors in the local variables, and the parent
234 # process has the full list of pipe descriptors (and it doesn't really
222 # process has the full list of pipe descriptors (and it doesn't really
235 # care what order they're in).
223 # care what order they're in).
236 rfd, wfd = os.pipe()
224 rfd, wfd = os.pipe()
237 pipes.append((rfd, wfd))
225 pipes.append((rfd, wfd))
238 # make sure we use os._exit in all worker code paths. otherwise the
226 # make sure we use os._exit in all worker code paths. otherwise the
239 # worker may do some clean-ups which could cause surprises like
227 # worker may do some clean-ups which could cause surprises like
240 # deadlock. see sshpeer.cleanup for example.
228 # deadlock. see sshpeer.cleanup for example.
241 # override error handling *before* fork. this is necessary because
229 # override error handling *before* fork. this is necessary because
242 # exception (signal) may arrive after fork, before "pid =" assignment
230 # exception (signal) may arrive after fork, before "pid =" assignment
243 # completes, and other exception handler (dispatch.py) can lead to
231 # completes, and other exception handler (dispatch.py) can lead to
244 # unexpected code path without os._exit.
232 # unexpected code path without os._exit.
245 ret = -1
233 ret = -1
246 try:
234 try:
247 pid = os.fork()
235 pid = os.fork()
248 if pid == 0:
236 if pid == 0:
249 signal.signal(signal.SIGINT, oldhandler)
237 signal.signal(signal.SIGINT, oldhandler)
250 signal.signal(signal.SIGCHLD, oldchldhandler)
238 signal.signal(signal.SIGCHLD, oldchldhandler)
251
239
252 def workerfunc():
240 def workerfunc():
253 for r, w in pipes[:-1]:
241 for r, w in pipes[:-1]:
254 os.close(r)
242 os.close(r)
255 os.close(w)
243 os.close(w)
256 os.close(rfd)
244 os.close(rfd)
257 for result in func(*(staticargs + (pargs,))):
245 for result in func(*(staticargs + (pargs,))):
258 os.write(wfd, pickle.dumps(result))
246 os.write(wfd, pickle.dumps(result))
259 return 0
247 return 0
260
248
261 ret = scmutil.callcatch(ui, workerfunc)
249 ret = scmutil.callcatch(ui, workerfunc)
262 except: # parent re-raises, child never returns
250 except: # parent re-raises, child never returns
263 if os.getpid() == parentpid:
251 if os.getpid() == parentpid:
264 raise
252 raise
265 exctype = sys.exc_info()[0]
253 exctype = sys.exc_info()[0]
266 force = not issubclass(exctype, KeyboardInterrupt)
254 force = not issubclass(exctype, KeyboardInterrupt)
267 ui.traceback(force=force)
255 ui.traceback(force=force)
268 finally:
256 finally:
269 if os.getpid() != parentpid:
257 if os.getpid() != parentpid:
270 try:
258 try:
271 ui.flush()
259 ui.flush()
272 except: # never returns, no re-raises
260 except: # never returns, no re-raises
273 pass
261 pass
274 finally:
262 finally:
275 os._exit(ret & 255)
263 os._exit(ret & 255)
276 pids.add(pid)
264 pids.add(pid)
277 selector = selectors.DefaultSelector()
265 selector = selectors.DefaultSelector()
278 for rfd, wfd in pipes:
266 for rfd, wfd in pipes:
279 os.close(wfd)
267 os.close(wfd)
280 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
281
269
282 def cleanup():
270 def cleanup():
283 signal.signal(signal.SIGINT, oldhandler)
271 signal.signal(signal.SIGINT, oldhandler)
284 waitforworkers()
272 waitforworkers()
285 signal.signal(signal.SIGCHLD, oldchldhandler)
273 signal.signal(signal.SIGCHLD, oldchldhandler)
286 selector.close()
274 selector.close()
287 return problem[0]
275 return problem[0]
288
276
289 try:
277 try:
290 openpipes = len(pipes)
278 openpipes = len(pipes)
291 while openpipes > 0:
279 while openpipes > 0:
292 for key, events in selector.select():
280 for key, events in selector.select():
293 try:
281 try:
294 res = pickle.load(_blockingreader(key.fileobj))
282 res = pickle.load(_blockingreader(key.fileobj))
295 if hasretval and res[0]:
283 if hasretval and res[0]:
296 retval.update(res[1])
284 retval.update(res[1])
297 else:
285 else:
298 yield res
286 yield res
299 except EOFError:
287 except EOFError:
300 selector.unregister(key.fileobj)
288 selector.unregister(key.fileobj)
301 key.fileobj.close()
289 key.fileobj.close()
302 openpipes -= 1
290 openpipes -= 1
303 except IOError as e:
291 except IOError as e:
304 if e.errno == errno.EINTR:
292 if e.errno == errno.EINTR:
305 continue
293 continue
306 raise
294 raise
307 except: # re-raises
295 except: # re-raises
308 killworkers()
296 killworkers()
309 cleanup()
297 cleanup()
310 raise
298 raise
311 status = cleanup()
299 status = cleanup()
312 if status:
300 if status:
313 if status < 0:
301 if status < 0:
314 os.kill(os.getpid(), -status)
302 os.kill(os.getpid(), -status)
315 raise error.WorkerError(status)
303 raise error.WorkerError(status)
316 if hasretval:
304 if hasretval:
317 yield True, retval
305 yield True, retval
318
306
319
307
320 def _posixexitstatus(code):
308 def _posixexitstatus(code):
321 """convert a posix exit status into the same form returned by
309 """convert a posix exit status into the same form returned by
322 os.spawnv
310 os.spawnv
323
311
324 returns None if the process was stopped instead of exiting"""
312 returns None if the process was stopped instead of exiting"""
325 if os.WIFEXITED(code):
313 if os.WIFEXITED(code):
326 return os.WEXITSTATUS(code)
314 return os.WEXITSTATUS(code)
327 elif os.WIFSIGNALED(code):
315 elif os.WIFSIGNALED(code):
328 return -(os.WTERMSIG(code))
316 return -(os.WTERMSIG(code))
329
317
330
318
331 def _windowsworker(ui, func, staticargs, args, hasretval):
319 def _windowsworker(ui, func, staticargs, args, hasretval):
332 class Worker(threading.Thread):
320 class Worker(threading.Thread):
333 def __init__(
321 def __init__(
334 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
335 ):
323 ):
336 threading.Thread.__init__(self, *args, **kwargs)
324 threading.Thread.__init__(self, *args, **kwargs)
337 self._taskqueue = taskqueue
325 self._taskqueue = taskqueue
338 self._resultqueue = resultqueue
326 self._resultqueue = resultqueue
339 self._func = func
327 self._func = func
340 self._staticargs = staticargs
328 self._staticargs = staticargs
341 self._interrupted = False
329 self._interrupted = False
342 self.daemon = True
330 self.daemon = True
343 self.exception = None
331 self.exception = None
344
332
345 def interrupt(self):
333 def interrupt(self):
346 self._interrupted = True
334 self._interrupted = True
347
335
348 def run(self):
336 def run(self):
349 try:
337 try:
350 while not self._taskqueue.empty():
338 while not self._taskqueue.empty():
351 try:
339 try:
352 args = self._taskqueue.get_nowait()
340 args = self._taskqueue.get_nowait()
353 for res in self._func(*self._staticargs + (args,)):
341 for res in self._func(*self._staticargs + (args,)):
354 self._resultqueue.put(res)
342 self._resultqueue.put(res)
355 # threading doesn't provide a native way to
343 # threading doesn't provide a native way to
356 # interrupt execution. handle it manually at every
344 # interrupt execution. handle it manually at every
357 # iteration.
345 # iteration.
358 if self._interrupted:
346 if self._interrupted:
359 return
347 return
360 except pycompat.queue.Empty:
348 except pycompat.queue.Empty:
361 break
349 break
362 except Exception as e:
350 except Exception as e:
363 # store the exception such that the main thread can resurface
351 # store the exception such that the main thread can resurface
364 # it as if the func was running without workers.
352 # it as if the func was running without workers.
365 self.exception = e
353 self.exception = e
366 raise
354 raise
367
355
368 threads = []
356 threads = []
369
357
370 def trykillworkers():
358 def trykillworkers():
371 # Allow up to 1 second to clean worker threads nicely
359 # Allow up to 1 second to clean worker threads nicely
372 cleanupend = time.time() + 1
360 cleanupend = time.time() + 1
373 for t in threads:
361 for t in threads:
374 t.interrupt()
362 t.interrupt()
375 for t in threads:
363 for t in threads:
376 remainingtime = cleanupend - time.time()
364 remainingtime = cleanupend - time.time()
377 t.join(remainingtime)
365 t.join(remainingtime)
378 if t.is_alive():
366 if t.is_alive():
379 # pass over the workers joining failure. it is more
367 # pass over the workers joining failure. it is more
380 # important to surface the inital exception than the
368 # important to surface the inital exception than the
381 # fact that one of workers may be processing a large
369 # fact that one of workers may be processing a large
382 # task and does not get to handle the interruption.
370 # task and does not get to handle the interruption.
383 ui.warn(
371 ui.warn(
384 _(
372 _(
385 b"failed to kill worker threads while "
373 b"failed to kill worker threads while "
386 b"handling an exception\n"
374 b"handling an exception\n"
387 )
375 )
388 )
376 )
389 return
377 return
390
378
391 workers = _numworkers(ui)
379 workers = _numworkers(ui)
392 resultqueue = pycompat.queue.Queue()
380 resultqueue = pycompat.queue.Queue()
393 taskqueue = pycompat.queue.Queue()
381 taskqueue = pycompat.queue.Queue()
394 retval = {}
382 retval = {}
395 # partition work to more pieces than workers to minimize the chance
383 # partition work to more pieces than workers to minimize the chance
396 # of uneven distribution of large tasks between the workers
384 # of uneven distribution of large tasks between the workers
397 for pargs in partition(args, workers * 20):
385 for pargs in partition(args, workers * 20):
398 taskqueue.put(pargs)
386 taskqueue.put(pargs)
399 for _i in range(workers):
387 for _i in range(workers):
400 t = Worker(taskqueue, resultqueue, func, staticargs)
388 t = Worker(taskqueue, resultqueue, func, staticargs)
401 threads.append(t)
389 threads.append(t)
402 t.start()
390 t.start()
403 try:
391 try:
404 while len(threads) > 0:
392 while len(threads) > 0:
405 while not resultqueue.empty():
393 while not resultqueue.empty():
406 res = resultqueue.get()
394 res = resultqueue.get()
407 if hasretval and res[0]:
395 if hasretval and res[0]:
408 retval.update(res[1])
396 retval.update(res[1])
409 else:
397 else:
410 yield res
398 yield res
411 threads[0].join(0.05)
399 threads[0].join(0.05)
412 finishedthreads = [_t for _t in threads if not _t.is_alive()]
400 finishedthreads = [_t for _t in threads if not _t.is_alive()]
413 for t in finishedthreads:
401 for t in finishedthreads:
414 if t.exception is not None:
402 if t.exception is not None:
415 raise t.exception
403 raise t.exception
416 threads.remove(t)
404 threads.remove(t)
417 except (Exception, KeyboardInterrupt): # re-raises
405 except (Exception, KeyboardInterrupt): # re-raises
418 trykillworkers()
406 trykillworkers()
419 raise
407 raise
420 while not resultqueue.empty():
408 while not resultqueue.empty():
421 res = resultqueue.get()
409 res = resultqueue.get()
422 if hasretval and res[0]:
410 if hasretval and res[0]:
423 retval.update(res[1])
411 retval.update(res[1])
424 else:
412 else:
425 yield res
413 yield res
426 if hasretval:
414 if hasretval:
427 yield True, retval
415 yield True, retval
428
416
429
417
430 if pycompat.iswindows:
418 if pycompat.iswindows:
431 _platformworker = _windowsworker
419 _platformworker = _windowsworker
432 else:
420 else:
433 _platformworker = _posixworker
421 _platformworker = _posixworker
434 _exitstatus = _posixexitstatus
422 _exitstatus = _posixexitstatus
435
423
436
424
437 def partition(lst, nslices):
425 def partition(lst, nslices):
438 """partition a list into N slices of roughly equal size
426 """partition a list into N slices of roughly equal size
439
427
440 The current strategy takes every Nth element from the input. If
428 The current strategy takes every Nth element from the input. If
441 we ever write workers that need to preserve grouping in input
429 we ever write workers that need to preserve grouping in input
442 we should consider allowing callers to specify a partition strategy.
430 we should consider allowing callers to specify a partition strategy.
443
431
444 olivia is not a fan of this partitioning strategy when files are involved.
432 olivia is not a fan of this partitioning strategy when files are involved.
445 In his words:
433 In his words:
446
434
447 Single-threaded Mercurial makes a point of creating and visiting
435 Single-threaded Mercurial makes a point of creating and visiting
448 files in a fixed order (alphabetical). When creating files in order,
436 files in a fixed order (alphabetical). When creating files in order,
449 a typical filesystem is likely to allocate them on nearby regions on
437 a typical filesystem is likely to allocate them on nearby regions on
450 disk. Thus, when revisiting in the same order, locality is maximized
438 disk. Thus, when revisiting in the same order, locality is maximized
451 and various forms of OS and disk-level caching and read-ahead get a
439 and various forms of OS and disk-level caching and read-ahead get a
452 chance to work.
440 chance to work.
453
441
454 This effect can be quite significant on spinning disks. I discovered it
442 This effect can be quite significant on spinning disks. I discovered it
455 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
443 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
456 Tarring a repo and copying it to another disk effectively randomized
444 Tarring a repo and copying it to another disk effectively randomized
457 the revlog ordering on disk by sorting the revlogs by hash and suddenly
445 the revlog ordering on disk by sorting the revlogs by hash and suddenly
458 performance of my kernel checkout benchmark dropped by ~10x because the
446 performance of my kernel checkout benchmark dropped by ~10x because the
459 "working set" of sectors visited no longer fit in the drive's cache and
447 "working set" of sectors visited no longer fit in the drive's cache and
460 the workload switched from streaming to random I/O.
448 the workload switched from streaming to random I/O.
461
449
462 What we should really be doing is have workers read filenames from a
450 What we should really be doing is have workers read filenames from a
463 ordered queue. This preserves locality and also keeps any worker from
451 ordered queue. This preserves locality and also keeps any worker from
464 getting more than one file out of balance.
452 getting more than one file out of balance.
465 """
453 """
466 for i in range(nslices):
454 for i in range(nslices):
467 yield lst[i::nslices]
455 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now