##// END OF EJS Templates
posixworker: avoid creating workers that end up getting no work...
Martin von Zweigbergk -
r45936:26eb62bd default
parent child Browse files
Show More
@@ -1,451 +1,451 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.ispy3:
68 if pycompat.ispy3:
69
69
70 class _blockingreader(object):
70 class _blockingreader(object):
71 def __init__(self, wrapped):
71 def __init__(self, wrapped):
72 self._wrapped = wrapped
72 self._wrapped = wrapped
73
73
74 def __getattr__(self, attr):
74 def __getattr__(self, attr):
75 return getattr(self._wrapped, attr)
75 return getattr(self._wrapped, attr)
76
76
77 # issue multiple reads until size is fulfilled
77 # issue multiple reads until size is fulfilled
78 def read(self, size=-1):
78 def read(self, size=-1):
79 if size < 0:
79 if size < 0:
80 return self._wrapped.readall()
80 return self._wrapped.readall()
81
81
82 buf = bytearray(size)
82 buf = bytearray(size)
83 view = memoryview(buf)
83 view = memoryview(buf)
84 pos = 0
84 pos = 0
85
85
86 while pos < size:
86 while pos < size:
87 ret = self._wrapped.readinto(view[pos:])
87 ret = self._wrapped.readinto(view[pos:])
88 if not ret:
88 if not ret:
89 break
89 break
90 pos += ret
90 pos += ret
91
91
92 del view
92 del view
93 del buf[pos:]
93 del buf[pos:]
94 return buf
94 return buf
95
95
96
96
97 else:
97 else:
98
98
99 def _blockingreader(wrapped):
99 def _blockingreader(wrapped):
100 return wrapped
100 return wrapped
101
101
102
102
103 if pycompat.isposix or pycompat.iswindows:
103 if pycompat.isposix or pycompat.iswindows:
104 _STARTUP_COST = 0.01
104 _STARTUP_COST = 0.01
105 # The Windows worker is thread based. If tasks are CPU bound, threads
105 # The Windows worker is thread based. If tasks are CPU bound, threads
106 # in the presence of the GIL result in excessive context switching and
106 # in the presence of the GIL result in excessive context switching and
107 # this overhead can slow down execution.
107 # this overhead can slow down execution.
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 else:
109 else:
110 _STARTUP_COST = 1e30
110 _STARTUP_COST = 1e30
111 _DISALLOW_THREAD_UNSAFE = False
111 _DISALLOW_THREAD_UNSAFE = False
112
112
113
113
114 def worthwhile(ui, costperop, nops, threadsafe=True):
114 def worthwhile(ui, costperop, nops, threadsafe=True):
115 '''try to determine whether the benefit of multiple processes can
115 '''try to determine whether the benefit of multiple processes can
116 outweigh the cost of starting them'''
116 outweigh the cost of starting them'''
117
117
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 return False
119 return False
120
120
121 linear = costperop * nops
121 linear = costperop * nops
122 workers = _numworkers(ui)
122 workers = _numworkers(ui)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 return benefit >= 0.15
124 return benefit >= 0.15
125
125
126
126
127 def worker(
127 def worker(
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
129 ):
129 ):
130 '''run a function, possibly in parallel in multiple worker
130 '''run a function, possibly in parallel in multiple worker
131 processes.
131 processes.
132
132
133 returns a progress iterator
133 returns a progress iterator
134
134
135 costperarg - cost of a single task
135 costperarg - cost of a single task
136
136
137 func - function to run. It is expected to return a progress iterator.
137 func - function to run. It is expected to return a progress iterator.
138
138
139 staticargs - arguments to pass to every invocation of the function
139 staticargs - arguments to pass to every invocation of the function
140
140
141 args - arguments to split into chunks, to pass to individual
141 args - arguments to split into chunks, to pass to individual
142 workers
142 workers
143
143
144 hasretval - when True, func and the current function return an progress
144 hasretval - when True, func and the current function return an progress
145 iterator then a dict (encoded as an iterator that yield many (False, ..)
145 iterator then a dict (encoded as an iterator that yield many (False, ..)
146 then a (True, dict)). The dicts are joined in some arbitrary order, so
146 then a (True, dict)). The dicts are joined in some arbitrary order, so
147 overlapping keys are a bad idea.
147 overlapping keys are a bad idea.
148
148
149 threadsafe - whether work items are thread safe and can be executed using
149 threadsafe - whether work items are thread safe and can be executed using
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 release the GIL.
151 release the GIL.
152 '''
152 '''
153 enabled = ui.configbool(b'worker', b'enabled')
153 enabled = ui.configbool(b'worker', b'enabled')
154 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
154 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
155 return _platformworker(ui, func, staticargs, args, hasretval)
155 return _platformworker(ui, func, staticargs, args, hasretval)
156 return func(*staticargs + (args,))
156 return func(*staticargs + (args,))
157
157
158
158
159 def _posixworker(ui, func, staticargs, args, hasretval):
159 def _posixworker(ui, func, staticargs, args, hasretval):
160 workers = _numworkers(ui)
160 workers = _numworkers(ui)
161 oldhandler = signal.getsignal(signal.SIGINT)
161 oldhandler = signal.getsignal(signal.SIGINT)
162 signal.signal(signal.SIGINT, signal.SIG_IGN)
162 signal.signal(signal.SIGINT, signal.SIG_IGN)
163 pids, problem = set(), [0]
163 pids, problem = set(), [0]
164
164
165 def killworkers():
165 def killworkers():
166 # unregister SIGCHLD handler as all children will be killed. This
166 # unregister SIGCHLD handler as all children will be killed. This
167 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
167 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
168 # could be updated while iterating, which would cause inconsistency.
168 # could be updated while iterating, which would cause inconsistency.
169 signal.signal(signal.SIGCHLD, oldchldhandler)
169 signal.signal(signal.SIGCHLD, oldchldhandler)
170 # if one worker bails, there's no good reason to wait for the rest
170 # if one worker bails, there's no good reason to wait for the rest
171 for p in pids:
171 for p in pids:
172 try:
172 try:
173 os.kill(p, signal.SIGTERM)
173 os.kill(p, signal.SIGTERM)
174 except OSError as err:
174 except OSError as err:
175 if err.errno != errno.ESRCH:
175 if err.errno != errno.ESRCH:
176 raise
176 raise
177
177
178 def waitforworkers(blocking=True):
178 def waitforworkers(blocking=True):
179 for pid in pids.copy():
179 for pid in pids.copy():
180 p = st = 0
180 p = st = 0
181 while True:
181 while True:
182 try:
182 try:
183 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
183 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
184 break
184 break
185 except OSError as e:
185 except OSError as e:
186 if e.errno == errno.EINTR:
186 if e.errno == errno.EINTR:
187 continue
187 continue
188 elif e.errno == errno.ECHILD:
188 elif e.errno == errno.ECHILD:
189 # child would already be reaped, but pids yet been
189 # child would already be reaped, but pids yet been
190 # updated (maybe interrupted just after waitpid)
190 # updated (maybe interrupted just after waitpid)
191 pids.discard(pid)
191 pids.discard(pid)
192 break
192 break
193 else:
193 else:
194 raise
194 raise
195 if not p:
195 if not p:
196 # skip subsequent steps, because child process should
196 # skip subsequent steps, because child process should
197 # be still running in this case
197 # be still running in this case
198 continue
198 continue
199 pids.discard(p)
199 pids.discard(p)
200 st = _exitstatus(st)
200 st = _exitstatus(st)
201 if st and not problem[0]:
201 if st and not problem[0]:
202 problem[0] = st
202 problem[0] = st
203
203
204 def sigchldhandler(signum, frame):
204 def sigchldhandler(signum, frame):
205 waitforworkers(blocking=False)
205 waitforworkers(blocking=False)
206 if problem[0]:
206 if problem[0]:
207 killworkers()
207 killworkers()
208
208
209 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
209 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
210 ui.flush()
210 ui.flush()
211 parentpid = os.getpid()
211 parentpid = os.getpid()
212 pipes = []
212 pipes = []
213 retval = {}
213 retval = {}
214 for pargs in partition(args, workers):
214 for pargs in partition(args, min(workers, len(args))):
215 # Every worker gets its own pipe to send results on, so we don't have to
215 # Every worker gets its own pipe to send results on, so we don't have to
216 # implement atomic writes larger than PIPE_BUF. Each forked process has
216 # implement atomic writes larger than PIPE_BUF. Each forked process has
217 # its own pipe's descriptors in the local variables, and the parent
217 # its own pipe's descriptors in the local variables, and the parent
218 # process has the full list of pipe descriptors (and it doesn't really
218 # process has the full list of pipe descriptors (and it doesn't really
219 # care what order they're in).
219 # care what order they're in).
220 rfd, wfd = os.pipe()
220 rfd, wfd = os.pipe()
221 pipes.append((rfd, wfd))
221 pipes.append((rfd, wfd))
222 # make sure we use os._exit in all worker code paths. otherwise the
222 # make sure we use os._exit in all worker code paths. otherwise the
223 # worker may do some clean-ups which could cause surprises like
223 # worker may do some clean-ups which could cause surprises like
224 # deadlock. see sshpeer.cleanup for example.
224 # deadlock. see sshpeer.cleanup for example.
225 # override error handling *before* fork. this is necessary because
225 # override error handling *before* fork. this is necessary because
226 # exception (signal) may arrive after fork, before "pid =" assignment
226 # exception (signal) may arrive after fork, before "pid =" assignment
227 # completes, and other exception handler (dispatch.py) can lead to
227 # completes, and other exception handler (dispatch.py) can lead to
228 # unexpected code path without os._exit.
228 # unexpected code path without os._exit.
229 ret = -1
229 ret = -1
230 try:
230 try:
231 pid = os.fork()
231 pid = os.fork()
232 if pid == 0:
232 if pid == 0:
233 signal.signal(signal.SIGINT, oldhandler)
233 signal.signal(signal.SIGINT, oldhandler)
234 signal.signal(signal.SIGCHLD, oldchldhandler)
234 signal.signal(signal.SIGCHLD, oldchldhandler)
235
235
236 def workerfunc():
236 def workerfunc():
237 for r, w in pipes[:-1]:
237 for r, w in pipes[:-1]:
238 os.close(r)
238 os.close(r)
239 os.close(w)
239 os.close(w)
240 os.close(rfd)
240 os.close(rfd)
241 for result in func(*(staticargs + (pargs,))):
241 for result in func(*(staticargs + (pargs,))):
242 os.write(wfd, util.pickle.dumps(result))
242 os.write(wfd, util.pickle.dumps(result))
243 return 0
243 return 0
244
244
245 ret = scmutil.callcatch(ui, workerfunc)
245 ret = scmutil.callcatch(ui, workerfunc)
246 except: # parent re-raises, child never returns
246 except: # parent re-raises, child never returns
247 if os.getpid() == parentpid:
247 if os.getpid() == parentpid:
248 raise
248 raise
249 exctype = sys.exc_info()[0]
249 exctype = sys.exc_info()[0]
250 force = not issubclass(exctype, KeyboardInterrupt)
250 force = not issubclass(exctype, KeyboardInterrupt)
251 ui.traceback(force=force)
251 ui.traceback(force=force)
252 finally:
252 finally:
253 if os.getpid() != parentpid:
253 if os.getpid() != parentpid:
254 try:
254 try:
255 ui.flush()
255 ui.flush()
256 except: # never returns, no re-raises
256 except: # never returns, no re-raises
257 pass
257 pass
258 finally:
258 finally:
259 os._exit(ret & 255)
259 os._exit(ret & 255)
260 pids.add(pid)
260 pids.add(pid)
261 selector = selectors.DefaultSelector()
261 selector = selectors.DefaultSelector()
262 for rfd, wfd in pipes:
262 for rfd, wfd in pipes:
263 os.close(wfd)
263 os.close(wfd)
264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
265
265
266 def cleanup():
266 def cleanup():
267 signal.signal(signal.SIGINT, oldhandler)
267 signal.signal(signal.SIGINT, oldhandler)
268 waitforworkers()
268 waitforworkers()
269 signal.signal(signal.SIGCHLD, oldchldhandler)
269 signal.signal(signal.SIGCHLD, oldchldhandler)
270 selector.close()
270 selector.close()
271 return problem[0]
271 return problem[0]
272
272
273 try:
273 try:
274 openpipes = len(pipes)
274 openpipes = len(pipes)
275 while openpipes > 0:
275 while openpipes > 0:
276 for key, events in selector.select():
276 for key, events in selector.select():
277 try:
277 try:
278 res = util.pickle.load(_blockingreader(key.fileobj))
278 res = util.pickle.load(_blockingreader(key.fileobj))
279 if hasretval and res[0]:
279 if hasretval and res[0]:
280 retval.update(res[1])
280 retval.update(res[1])
281 else:
281 else:
282 yield res
282 yield res
283 except EOFError:
283 except EOFError:
284 selector.unregister(key.fileobj)
284 selector.unregister(key.fileobj)
285 key.fileobj.close()
285 key.fileobj.close()
286 openpipes -= 1
286 openpipes -= 1
287 except IOError as e:
287 except IOError as e:
288 if e.errno == errno.EINTR:
288 if e.errno == errno.EINTR:
289 continue
289 continue
290 raise
290 raise
291 except: # re-raises
291 except: # re-raises
292 killworkers()
292 killworkers()
293 cleanup()
293 cleanup()
294 raise
294 raise
295 status = cleanup()
295 status = cleanup()
296 if status:
296 if status:
297 if status < 0:
297 if status < 0:
298 os.kill(os.getpid(), -status)
298 os.kill(os.getpid(), -status)
299 sys.exit(status)
299 sys.exit(status)
300 if hasretval:
300 if hasretval:
301 yield True, retval
301 yield True, retval
302
302
303
303
304 def _posixexitstatus(code):
304 def _posixexitstatus(code):
305 '''convert a posix exit status into the same form returned by
305 '''convert a posix exit status into the same form returned by
306 os.spawnv
306 os.spawnv
307
307
308 returns None if the process was stopped instead of exiting'''
308 returns None if the process was stopped instead of exiting'''
309 if os.WIFEXITED(code):
309 if os.WIFEXITED(code):
310 return os.WEXITSTATUS(code)
310 return os.WEXITSTATUS(code)
311 elif os.WIFSIGNALED(code):
311 elif os.WIFSIGNALED(code):
312 return -(os.WTERMSIG(code))
312 return -(os.WTERMSIG(code))
313
313
314
314
315 def _windowsworker(ui, func, staticargs, args, hasretval):
315 def _windowsworker(ui, func, staticargs, args, hasretval):
316 class Worker(threading.Thread):
316 class Worker(threading.Thread):
317 def __init__(
317 def __init__(
318 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
318 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
319 ):
319 ):
320 threading.Thread.__init__(self, *args, **kwargs)
320 threading.Thread.__init__(self, *args, **kwargs)
321 self._taskqueue = taskqueue
321 self._taskqueue = taskqueue
322 self._resultqueue = resultqueue
322 self._resultqueue = resultqueue
323 self._func = func
323 self._func = func
324 self._staticargs = staticargs
324 self._staticargs = staticargs
325 self._interrupted = False
325 self._interrupted = False
326 self.daemon = True
326 self.daemon = True
327 self.exception = None
327 self.exception = None
328
328
329 def interrupt(self):
329 def interrupt(self):
330 self._interrupted = True
330 self._interrupted = True
331
331
332 def run(self):
332 def run(self):
333 try:
333 try:
334 while not self._taskqueue.empty():
334 while not self._taskqueue.empty():
335 try:
335 try:
336 args = self._taskqueue.get_nowait()
336 args = self._taskqueue.get_nowait()
337 for res in self._func(*self._staticargs + (args,)):
337 for res in self._func(*self._staticargs + (args,)):
338 self._resultqueue.put(res)
338 self._resultqueue.put(res)
339 # threading doesn't provide a native way to
339 # threading doesn't provide a native way to
340 # interrupt execution. handle it manually at every
340 # interrupt execution. handle it manually at every
341 # iteration.
341 # iteration.
342 if self._interrupted:
342 if self._interrupted:
343 return
343 return
344 except pycompat.queue.Empty:
344 except pycompat.queue.Empty:
345 break
345 break
346 except Exception as e:
346 except Exception as e:
347 # store the exception such that the main thread can resurface
347 # store the exception such that the main thread can resurface
348 # it as if the func was running without workers.
348 # it as if the func was running without workers.
349 self.exception = e
349 self.exception = e
350 raise
350 raise
351
351
352 threads = []
352 threads = []
353
353
354 def trykillworkers():
354 def trykillworkers():
355 # Allow up to 1 second to clean worker threads nicely
355 # Allow up to 1 second to clean worker threads nicely
356 cleanupend = time.time() + 1
356 cleanupend = time.time() + 1
357 for t in threads:
357 for t in threads:
358 t.interrupt()
358 t.interrupt()
359 for t in threads:
359 for t in threads:
360 remainingtime = cleanupend - time.time()
360 remainingtime = cleanupend - time.time()
361 t.join(remainingtime)
361 t.join(remainingtime)
362 if t.is_alive():
362 if t.is_alive():
363 # pass over the workers joining failure. it is more
363 # pass over the workers joining failure. it is more
364 # important to surface the inital exception than the
364 # important to surface the inital exception than the
365 # fact that one of workers may be processing a large
365 # fact that one of workers may be processing a large
366 # task and does not get to handle the interruption.
366 # task and does not get to handle the interruption.
367 ui.warn(
367 ui.warn(
368 _(
368 _(
369 b"failed to kill worker threads while "
369 b"failed to kill worker threads while "
370 b"handling an exception\n"
370 b"handling an exception\n"
371 )
371 )
372 )
372 )
373 return
373 return
374
374
375 workers = _numworkers(ui)
375 workers = _numworkers(ui)
376 resultqueue = pycompat.queue.Queue()
376 resultqueue = pycompat.queue.Queue()
377 taskqueue = pycompat.queue.Queue()
377 taskqueue = pycompat.queue.Queue()
378 retval = {}
378 retval = {}
379 # partition work to more pieces than workers to minimize the chance
379 # partition work to more pieces than workers to minimize the chance
380 # of uneven distribution of large tasks between the workers
380 # of uneven distribution of large tasks between the workers
381 for pargs in partition(args, workers * 20):
381 for pargs in partition(args, workers * 20):
382 taskqueue.put(pargs)
382 taskqueue.put(pargs)
383 for _i in range(workers):
383 for _i in range(workers):
384 t = Worker(taskqueue, resultqueue, func, staticargs)
384 t = Worker(taskqueue, resultqueue, func, staticargs)
385 threads.append(t)
385 threads.append(t)
386 t.start()
386 t.start()
387 try:
387 try:
388 while len(threads) > 0:
388 while len(threads) > 0:
389 while not resultqueue.empty():
389 while not resultqueue.empty():
390 res = resultqueue.get()
390 res = resultqueue.get()
391 if hasretval and res[0]:
391 if hasretval and res[0]:
392 retval.update(res[1])
392 retval.update(res[1])
393 else:
393 else:
394 yield res
394 yield res
395 threads[0].join(0.05)
395 threads[0].join(0.05)
396 finishedthreads = [_t for _t in threads if not _t.is_alive()]
396 finishedthreads = [_t for _t in threads if not _t.is_alive()]
397 for t in finishedthreads:
397 for t in finishedthreads:
398 if t.exception is not None:
398 if t.exception is not None:
399 raise t.exception
399 raise t.exception
400 threads.remove(t)
400 threads.remove(t)
401 except (Exception, KeyboardInterrupt): # re-raises
401 except (Exception, KeyboardInterrupt): # re-raises
402 trykillworkers()
402 trykillworkers()
403 raise
403 raise
404 while not resultqueue.empty():
404 while not resultqueue.empty():
405 res = resultqueue.get()
405 res = resultqueue.get()
406 if hasretval and res[0]:
406 if hasretval and res[0]:
407 retval.update(res[1])
407 retval.update(res[1])
408 else:
408 else:
409 yield res
409 yield res
410 if hasretval:
410 if hasretval:
411 yield True, retval
411 yield True, retval
412
412
413
413
414 if pycompat.iswindows:
414 if pycompat.iswindows:
415 _platformworker = _windowsworker
415 _platformworker = _windowsworker
416 else:
416 else:
417 _platformworker = _posixworker
417 _platformworker = _posixworker
418 _exitstatus = _posixexitstatus
418 _exitstatus = _posixexitstatus
419
419
420
420
421 def partition(lst, nslices):
421 def partition(lst, nslices):
422 '''partition a list into N slices of roughly equal size
422 '''partition a list into N slices of roughly equal size
423
423
424 The current strategy takes every Nth element from the input. If
424 The current strategy takes every Nth element from the input. If
425 we ever write workers that need to preserve grouping in input
425 we ever write workers that need to preserve grouping in input
426 we should consider allowing callers to specify a partition strategy.
426 we should consider allowing callers to specify a partition strategy.
427
427
428 mpm is not a fan of this partitioning strategy when files are involved.
428 mpm is not a fan of this partitioning strategy when files are involved.
429 In his words:
429 In his words:
430
430
431 Single-threaded Mercurial makes a point of creating and visiting
431 Single-threaded Mercurial makes a point of creating and visiting
432 files in a fixed order (alphabetical). When creating files in order,
432 files in a fixed order (alphabetical). When creating files in order,
433 a typical filesystem is likely to allocate them on nearby regions on
433 a typical filesystem is likely to allocate them on nearby regions on
434 disk. Thus, when revisiting in the same order, locality is maximized
434 disk. Thus, when revisiting in the same order, locality is maximized
435 and various forms of OS and disk-level caching and read-ahead get a
435 and various forms of OS and disk-level caching and read-ahead get a
436 chance to work.
436 chance to work.
437
437
438 This effect can be quite significant on spinning disks. I discovered it
438 This effect can be quite significant on spinning disks. I discovered it
439 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
439 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
440 Tarring a repo and copying it to another disk effectively randomized
440 Tarring a repo and copying it to another disk effectively randomized
441 the revlog ordering on disk by sorting the revlogs by hash and suddenly
441 the revlog ordering on disk by sorting the revlogs by hash and suddenly
442 performance of my kernel checkout benchmark dropped by ~10x because the
442 performance of my kernel checkout benchmark dropped by ~10x because the
443 "working set" of sectors visited no longer fit in the drive's cache and
443 "working set" of sectors visited no longer fit in the drive's cache and
444 the workload switched from streaming to random I/O.
444 the workload switched from streaming to random I/O.
445
445
446 What we should really be doing is have workers read filenames from a
446 What we should really be doing is have workers read filenames from a
447 ordered queue. This preserves locality and also keeps any worker from
447 ordered queue. This preserves locality and also keeps any worker from
448 getting more than one file out of balance.
448 getting more than one file out of balance.
449 '''
449 '''
450 for i in range(nslices):
450 for i in range(nslices):
451 yield lst[i::nslices]
451 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now