##// END OF EJS Templates
worker: Use buffered input from the pickle stream...
Jan Alexander Steffens (heftig) -
r44718:cb52e619 stable
parent child Browse files
Show More
@@ -1,416 +1,416 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.isposix or pycompat.iswindows:
68 if pycompat.isposix or pycompat.iswindows:
69 _STARTUP_COST = 0.01
69 _STARTUP_COST = 0.01
70 # The Windows worker is thread based. If tasks are CPU bound, threads
70 # The Windows worker is thread based. If tasks are CPU bound, threads
71 # in the presence of the GIL result in excessive context switching and
71 # in the presence of the GIL result in excessive context switching and
72 # this overhead can slow down execution.
72 # this overhead can slow down execution.
73 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
73 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
74 else:
74 else:
75 _STARTUP_COST = 1e30
75 _STARTUP_COST = 1e30
76 _DISALLOW_THREAD_UNSAFE = False
76 _DISALLOW_THREAD_UNSAFE = False
77
77
78
78
79 def worthwhile(ui, costperop, nops, threadsafe=True):
79 def worthwhile(ui, costperop, nops, threadsafe=True):
80 '''try to determine whether the benefit of multiple processes can
80 '''try to determine whether the benefit of multiple processes can
81 outweigh the cost of starting them'''
81 outweigh the cost of starting them'''
82
82
83 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
83 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
84 return False
84 return False
85
85
86 linear = costperop * nops
86 linear = costperop * nops
87 workers = _numworkers(ui)
87 workers = _numworkers(ui)
88 benefit = linear - (_STARTUP_COST * workers + linear / workers)
88 benefit = linear - (_STARTUP_COST * workers + linear / workers)
89 return benefit >= 0.15
89 return benefit >= 0.15
90
90
91
91
92 def worker(
92 def worker(
93 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
93 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
94 ):
94 ):
95 '''run a function, possibly in parallel in multiple worker
95 '''run a function, possibly in parallel in multiple worker
96 processes.
96 processes.
97
97
98 returns a progress iterator
98 returns a progress iterator
99
99
100 costperarg - cost of a single task
100 costperarg - cost of a single task
101
101
102 func - function to run. It is expected to return a progress iterator.
102 func - function to run. It is expected to return a progress iterator.
103
103
104 staticargs - arguments to pass to every invocation of the function
104 staticargs - arguments to pass to every invocation of the function
105
105
106 args - arguments to split into chunks, to pass to individual
106 args - arguments to split into chunks, to pass to individual
107 workers
107 workers
108
108
109 hasretval - when True, func and the current function return an progress
109 hasretval - when True, func and the current function return an progress
110 iterator then a dict (encoded as an iterator that yield many (False, ..)
110 iterator then a dict (encoded as an iterator that yield many (False, ..)
111 then a (True, dict)). The dicts are joined in some arbitrary order, so
111 then a (True, dict)). The dicts are joined in some arbitrary order, so
112 overlapping keys are a bad idea.
112 overlapping keys are a bad idea.
113
113
114 threadsafe - whether work items are thread safe and can be executed using
114 threadsafe - whether work items are thread safe and can be executed using
115 a thread-based worker. Should be disabled for CPU heavy tasks that don't
115 a thread-based worker. Should be disabled for CPU heavy tasks that don't
116 release the GIL.
116 release the GIL.
117 '''
117 '''
118 enabled = ui.configbool(b'worker', b'enabled')
118 enabled = ui.configbool(b'worker', b'enabled')
119 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
119 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
120 return _platformworker(ui, func, staticargs, args, hasretval)
120 return _platformworker(ui, func, staticargs, args, hasretval)
121 return func(*staticargs + (args,))
121 return func(*staticargs + (args,))
122
122
123
123
124 def _posixworker(ui, func, staticargs, args, hasretval):
124 def _posixworker(ui, func, staticargs, args, hasretval):
125 workers = _numworkers(ui)
125 workers = _numworkers(ui)
126 oldhandler = signal.getsignal(signal.SIGINT)
126 oldhandler = signal.getsignal(signal.SIGINT)
127 signal.signal(signal.SIGINT, signal.SIG_IGN)
127 signal.signal(signal.SIGINT, signal.SIG_IGN)
128 pids, problem = set(), [0]
128 pids, problem = set(), [0]
129
129
130 def killworkers():
130 def killworkers():
131 # unregister SIGCHLD handler as all children will be killed. This
131 # unregister SIGCHLD handler as all children will be killed. This
132 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
132 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
133 # could be updated while iterating, which would cause inconsistency.
133 # could be updated while iterating, which would cause inconsistency.
134 signal.signal(signal.SIGCHLD, oldchldhandler)
134 signal.signal(signal.SIGCHLD, oldchldhandler)
135 # if one worker bails, there's no good reason to wait for the rest
135 # if one worker bails, there's no good reason to wait for the rest
136 for p in pids:
136 for p in pids:
137 try:
137 try:
138 os.kill(p, signal.SIGTERM)
138 os.kill(p, signal.SIGTERM)
139 except OSError as err:
139 except OSError as err:
140 if err.errno != errno.ESRCH:
140 if err.errno != errno.ESRCH:
141 raise
141 raise
142
142
143 def waitforworkers(blocking=True):
143 def waitforworkers(blocking=True):
144 for pid in pids.copy():
144 for pid in pids.copy():
145 p = st = 0
145 p = st = 0
146 while True:
146 while True:
147 try:
147 try:
148 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
148 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
149 break
149 break
150 except OSError as e:
150 except OSError as e:
151 if e.errno == errno.EINTR:
151 if e.errno == errno.EINTR:
152 continue
152 continue
153 elif e.errno == errno.ECHILD:
153 elif e.errno == errno.ECHILD:
154 # child would already be reaped, but pids yet been
154 # child would already be reaped, but pids yet been
155 # updated (maybe interrupted just after waitpid)
155 # updated (maybe interrupted just after waitpid)
156 pids.discard(pid)
156 pids.discard(pid)
157 break
157 break
158 else:
158 else:
159 raise
159 raise
160 if not p:
160 if not p:
161 # skip subsequent steps, because child process should
161 # skip subsequent steps, because child process should
162 # be still running in this case
162 # be still running in this case
163 continue
163 continue
164 pids.discard(p)
164 pids.discard(p)
165 st = _exitstatus(st)
165 st = _exitstatus(st)
166 if st and not problem[0]:
166 if st and not problem[0]:
167 problem[0] = st
167 problem[0] = st
168
168
169 def sigchldhandler(signum, frame):
169 def sigchldhandler(signum, frame):
170 waitforworkers(blocking=False)
170 waitforworkers(blocking=False)
171 if problem[0]:
171 if problem[0]:
172 killworkers()
172 killworkers()
173
173
174 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
174 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
175 ui.flush()
175 ui.flush()
176 parentpid = os.getpid()
176 parentpid = os.getpid()
177 pipes = []
177 pipes = []
178 retval = {}
178 retval = {}
179 for pargs in partition(args, workers):
179 for pargs in partition(args, workers):
180 # Every worker gets its own pipe to send results on, so we don't have to
180 # Every worker gets its own pipe to send results on, so we don't have to
181 # implement atomic writes larger than PIPE_BUF. Each forked process has
181 # implement atomic writes larger than PIPE_BUF. Each forked process has
182 # its own pipe's descriptors in the local variables, and the parent
182 # its own pipe's descriptors in the local variables, and the parent
183 # process has the full list of pipe descriptors (and it doesn't really
183 # process has the full list of pipe descriptors (and it doesn't really
184 # care what order they're in).
184 # care what order they're in).
185 rfd, wfd = os.pipe()
185 rfd, wfd = os.pipe()
186 pipes.append((rfd, wfd))
186 pipes.append((rfd, wfd))
187 # make sure we use os._exit in all worker code paths. otherwise the
187 # make sure we use os._exit in all worker code paths. otherwise the
188 # worker may do some clean-ups which could cause surprises like
188 # worker may do some clean-ups which could cause surprises like
189 # deadlock. see sshpeer.cleanup for example.
189 # deadlock. see sshpeer.cleanup for example.
190 # override error handling *before* fork. this is necessary because
190 # override error handling *before* fork. this is necessary because
191 # exception (signal) may arrive after fork, before "pid =" assignment
191 # exception (signal) may arrive after fork, before "pid =" assignment
192 # completes, and other exception handler (dispatch.py) can lead to
192 # completes, and other exception handler (dispatch.py) can lead to
193 # unexpected code path without os._exit.
193 # unexpected code path without os._exit.
194 ret = -1
194 ret = -1
195 try:
195 try:
196 pid = os.fork()
196 pid = os.fork()
197 if pid == 0:
197 if pid == 0:
198 signal.signal(signal.SIGINT, oldhandler)
198 signal.signal(signal.SIGINT, oldhandler)
199 signal.signal(signal.SIGCHLD, oldchldhandler)
199 signal.signal(signal.SIGCHLD, oldchldhandler)
200
200
201 def workerfunc():
201 def workerfunc():
202 for r, w in pipes[:-1]:
202 for r, w in pipes[:-1]:
203 os.close(r)
203 os.close(r)
204 os.close(w)
204 os.close(w)
205 os.close(rfd)
205 os.close(rfd)
206 for result in func(*(staticargs + (pargs,))):
206 for result in func(*(staticargs + (pargs,))):
207 os.write(wfd, util.pickle.dumps(result))
207 os.write(wfd, util.pickle.dumps(result))
208 return 0
208 return 0
209
209
210 ret = scmutil.callcatch(ui, workerfunc)
210 ret = scmutil.callcatch(ui, workerfunc)
211 except: # parent re-raises, child never returns
211 except: # parent re-raises, child never returns
212 if os.getpid() == parentpid:
212 if os.getpid() == parentpid:
213 raise
213 raise
214 exctype = sys.exc_info()[0]
214 exctype = sys.exc_info()[0]
215 force = not issubclass(exctype, KeyboardInterrupt)
215 force = not issubclass(exctype, KeyboardInterrupt)
216 ui.traceback(force=force)
216 ui.traceback(force=force)
217 finally:
217 finally:
218 if os.getpid() != parentpid:
218 if os.getpid() != parentpid:
219 try:
219 try:
220 ui.flush()
220 ui.flush()
221 except: # never returns, no re-raises
221 except: # never returns, no re-raises
222 pass
222 pass
223 finally:
223 finally:
224 os._exit(ret & 255)
224 os._exit(ret & 255)
225 pids.add(pid)
225 pids.add(pid)
226 selector = selectors.DefaultSelector()
226 selector = selectors.DefaultSelector()
227 for rfd, wfd in pipes:
227 for rfd, wfd in pipes:
228 os.close(wfd)
228 os.close(wfd)
229 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
229 selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
230
230
231 def cleanup():
231 def cleanup():
232 signal.signal(signal.SIGINT, oldhandler)
232 signal.signal(signal.SIGINT, oldhandler)
233 waitforworkers()
233 waitforworkers()
234 signal.signal(signal.SIGCHLD, oldchldhandler)
234 signal.signal(signal.SIGCHLD, oldchldhandler)
235 selector.close()
235 selector.close()
236 return problem[0]
236 return problem[0]
237
237
238 try:
238 try:
239 openpipes = len(pipes)
239 openpipes = len(pipes)
240 while openpipes > 0:
240 while openpipes > 0:
241 for key, events in selector.select():
241 for key, events in selector.select():
242 try:
242 try:
243 res = util.pickle.load(key.fileobj)
243 res = util.pickle.load(key.fileobj)
244 if hasretval and res[0]:
244 if hasretval and res[0]:
245 retval.update(res[1])
245 retval.update(res[1])
246 else:
246 else:
247 yield res
247 yield res
248 except EOFError:
248 except EOFError:
249 selector.unregister(key.fileobj)
249 selector.unregister(key.fileobj)
250 key.fileobj.close()
250 key.fileobj.close()
251 openpipes -= 1
251 openpipes -= 1
252 except IOError as e:
252 except IOError as e:
253 if e.errno == errno.EINTR:
253 if e.errno == errno.EINTR:
254 continue
254 continue
255 raise
255 raise
256 except: # re-raises
256 except: # re-raises
257 killworkers()
257 killworkers()
258 cleanup()
258 cleanup()
259 raise
259 raise
260 status = cleanup()
260 status = cleanup()
261 if status:
261 if status:
262 if status < 0:
262 if status < 0:
263 os.kill(os.getpid(), -status)
263 os.kill(os.getpid(), -status)
264 sys.exit(status)
264 sys.exit(status)
265 if hasretval:
265 if hasretval:
266 yield True, retval
266 yield True, retval
267
267
268
268
269 def _posixexitstatus(code):
269 def _posixexitstatus(code):
270 '''convert a posix exit status into the same form returned by
270 '''convert a posix exit status into the same form returned by
271 os.spawnv
271 os.spawnv
272
272
273 returns None if the process was stopped instead of exiting'''
273 returns None if the process was stopped instead of exiting'''
274 if os.WIFEXITED(code):
274 if os.WIFEXITED(code):
275 return os.WEXITSTATUS(code)
275 return os.WEXITSTATUS(code)
276 elif os.WIFSIGNALED(code):
276 elif os.WIFSIGNALED(code):
277 return -(os.WTERMSIG(code))
277 return -(os.WTERMSIG(code))
278
278
279
279
280 def _windowsworker(ui, func, staticargs, args, hasretval):
280 def _windowsworker(ui, func, staticargs, args, hasretval):
281 class Worker(threading.Thread):
281 class Worker(threading.Thread):
282 def __init__(
282 def __init__(
283 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
283 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
284 ):
284 ):
285 threading.Thread.__init__(self, *args, **kwargs)
285 threading.Thread.__init__(self, *args, **kwargs)
286 self._taskqueue = taskqueue
286 self._taskqueue = taskqueue
287 self._resultqueue = resultqueue
287 self._resultqueue = resultqueue
288 self._func = func
288 self._func = func
289 self._staticargs = staticargs
289 self._staticargs = staticargs
290 self._interrupted = False
290 self._interrupted = False
291 self.daemon = True
291 self.daemon = True
292 self.exception = None
292 self.exception = None
293
293
294 def interrupt(self):
294 def interrupt(self):
295 self._interrupted = True
295 self._interrupted = True
296
296
297 def run(self):
297 def run(self):
298 try:
298 try:
299 while not self._taskqueue.empty():
299 while not self._taskqueue.empty():
300 try:
300 try:
301 args = self._taskqueue.get_nowait()
301 args = self._taskqueue.get_nowait()
302 for res in self._func(*self._staticargs + (args,)):
302 for res in self._func(*self._staticargs + (args,)):
303 self._resultqueue.put(res)
303 self._resultqueue.put(res)
304 # threading doesn't provide a native way to
304 # threading doesn't provide a native way to
305 # interrupt execution. handle it manually at every
305 # interrupt execution. handle it manually at every
306 # iteration.
306 # iteration.
307 if self._interrupted:
307 if self._interrupted:
308 return
308 return
309 except pycompat.queue.Empty:
309 except pycompat.queue.Empty:
310 break
310 break
311 except Exception as e:
311 except Exception as e:
312 # store the exception such that the main thread can resurface
312 # store the exception such that the main thread can resurface
313 # it as if the func was running without workers.
313 # it as if the func was running without workers.
314 self.exception = e
314 self.exception = e
315 raise
315 raise
316
316
317 threads = []
317 threads = []
318
318
319 def trykillworkers():
319 def trykillworkers():
320 # Allow up to 1 second to clean worker threads nicely
320 # Allow up to 1 second to clean worker threads nicely
321 cleanupend = time.time() + 1
321 cleanupend = time.time() + 1
322 for t in threads:
322 for t in threads:
323 t.interrupt()
323 t.interrupt()
324 for t in threads:
324 for t in threads:
325 remainingtime = cleanupend - time.time()
325 remainingtime = cleanupend - time.time()
326 t.join(remainingtime)
326 t.join(remainingtime)
327 if t.is_alive():
327 if t.is_alive():
328 # pass over the workers joining failure. it is more
328 # pass over the workers joining failure. it is more
329 # important to surface the inital exception than the
329 # important to surface the inital exception than the
330 # fact that one of workers may be processing a large
330 # fact that one of workers may be processing a large
331 # task and does not get to handle the interruption.
331 # task and does not get to handle the interruption.
332 ui.warn(
332 ui.warn(
333 _(
333 _(
334 b"failed to kill worker threads while "
334 b"failed to kill worker threads while "
335 b"handling an exception\n"
335 b"handling an exception\n"
336 )
336 )
337 )
337 )
338 return
338 return
339
339
340 workers = _numworkers(ui)
340 workers = _numworkers(ui)
341 resultqueue = pycompat.queue.Queue()
341 resultqueue = pycompat.queue.Queue()
342 taskqueue = pycompat.queue.Queue()
342 taskqueue = pycompat.queue.Queue()
343 retval = {}
343 retval = {}
344 # partition work to more pieces than workers to minimize the chance
344 # partition work to more pieces than workers to minimize the chance
345 # of uneven distribution of large tasks between the workers
345 # of uneven distribution of large tasks between the workers
346 for pargs in partition(args, workers * 20):
346 for pargs in partition(args, workers * 20):
347 taskqueue.put(pargs)
347 taskqueue.put(pargs)
348 for _i in range(workers):
348 for _i in range(workers):
349 t = Worker(taskqueue, resultqueue, func, staticargs)
349 t = Worker(taskqueue, resultqueue, func, staticargs)
350 threads.append(t)
350 threads.append(t)
351 t.start()
351 t.start()
352 try:
352 try:
353 while len(threads) > 0:
353 while len(threads) > 0:
354 while not resultqueue.empty():
354 while not resultqueue.empty():
355 res = resultqueue.get()
355 res = resultqueue.get()
356 if hasretval and res[0]:
356 if hasretval and res[0]:
357 retval.update(res[1])
357 retval.update(res[1])
358 else:
358 else:
359 yield res
359 yield res
360 threads[0].join(0.05)
360 threads[0].join(0.05)
361 finishedthreads = [_t for _t in threads if not _t.is_alive()]
361 finishedthreads = [_t for _t in threads if not _t.is_alive()]
362 for t in finishedthreads:
362 for t in finishedthreads:
363 if t.exception is not None:
363 if t.exception is not None:
364 raise t.exception
364 raise t.exception
365 threads.remove(t)
365 threads.remove(t)
366 except (Exception, KeyboardInterrupt): # re-raises
366 except (Exception, KeyboardInterrupt): # re-raises
367 trykillworkers()
367 trykillworkers()
368 raise
368 raise
369 while not resultqueue.empty():
369 while not resultqueue.empty():
370 res = resultqueue.get()
370 res = resultqueue.get()
371 if hasretval and res[0]:
371 if hasretval and res[0]:
372 retval.update(res[1])
372 retval.update(res[1])
373 else:
373 else:
374 yield res
374 yield res
375 if hasretval:
375 if hasretval:
376 yield True, retval
376 yield True, retval
377
377
378
378
379 if pycompat.iswindows:
379 if pycompat.iswindows:
380 _platformworker = _windowsworker
380 _platformworker = _windowsworker
381 else:
381 else:
382 _platformworker = _posixworker
382 _platformworker = _posixworker
383 _exitstatus = _posixexitstatus
383 _exitstatus = _posixexitstatus
384
384
385
385
386 def partition(lst, nslices):
386 def partition(lst, nslices):
387 '''partition a list into N slices of roughly equal size
387 '''partition a list into N slices of roughly equal size
388
388
389 The current strategy takes every Nth element from the input. If
389 The current strategy takes every Nth element from the input. If
390 we ever write workers that need to preserve grouping in input
390 we ever write workers that need to preserve grouping in input
391 we should consider allowing callers to specify a partition strategy.
391 we should consider allowing callers to specify a partition strategy.
392
392
393 mpm is not a fan of this partitioning strategy when files are involved.
393 mpm is not a fan of this partitioning strategy when files are involved.
394 In his words:
394 In his words:
395
395
396 Single-threaded Mercurial makes a point of creating and visiting
396 Single-threaded Mercurial makes a point of creating and visiting
397 files in a fixed order (alphabetical). When creating files in order,
397 files in a fixed order (alphabetical). When creating files in order,
398 a typical filesystem is likely to allocate them on nearby regions on
398 a typical filesystem is likely to allocate them on nearby regions on
399 disk. Thus, when revisiting in the same order, locality is maximized
399 disk. Thus, when revisiting in the same order, locality is maximized
400 and various forms of OS and disk-level caching and read-ahead get a
400 and various forms of OS and disk-level caching and read-ahead get a
401 chance to work.
401 chance to work.
402
402
403 This effect can be quite significant on spinning disks. I discovered it
403 This effect can be quite significant on spinning disks. I discovered it
404 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
404 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
405 Tarring a repo and copying it to another disk effectively randomized
405 Tarring a repo and copying it to another disk effectively randomized
406 the revlog ordering on disk by sorting the revlogs by hash and suddenly
406 the revlog ordering on disk by sorting the revlogs by hash and suddenly
407 performance of my kernel checkout benchmark dropped by ~10x because the
407 performance of my kernel checkout benchmark dropped by ~10x because the
408 "working set" of sectors visited no longer fit in the drive's cache and
408 "working set" of sectors visited no longer fit in the drive's cache and
409 the workload switched from streaming to random I/O.
409 the workload switched from streaming to random I/O.
410
410
411 What we should really be doing is have workers read filenames from a
411 What we should really be doing is have workers read filenames from a
412 ordered queue. This preserves locality and also keeps any worker from
412 ordered queue. This preserves locality and also keeps any worker from
413 getting more than one file out of balance.
413 getting more than one file out of balance.
414 '''
414 '''
415 for i in range(nslices):
415 for i in range(nslices):
416 yield lst[i::nslices]
416 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now