##// END OF EJS Templates
worker: manually buffer reads from pickle stream...
Jan Alexander Steffens (heftig) -
r44751:12491abf stable
parent child Browse files
Show More
@@ -1,416 +1,451 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19
19
20 selectors.BaseSelector
20 selectors.BaseSelector
21 except ImportError:
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
22 from .thirdparty import selectors2 as selectors
23
23
24 from .i18n import _
24 from .i18n import _
25 from . import (
25 from . import (
26 encoding,
26 encoding,
27 error,
27 error,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 util,
30 util,
31 )
31 )
32
32
33
33
34 def countcpus():
34 def countcpus():
35 '''try to count the number of CPUs on the system'''
35 '''try to count the number of CPUs on the system'''
36
36
37 # posix
37 # posix
38 try:
38 try:
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (AttributeError, ValueError):
42 except (AttributeError, ValueError):
43 pass
43 pass
44
44
45 # windows
45 # windows
46 try:
46 try:
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 if n > 0:
48 if n > 0:
49 return n
49 return n
50 except (KeyError, ValueError):
50 except (KeyError, ValueError):
51 pass
51 pass
52
52
53 return 1
53 return 1
54
54
55
55
56 def _numworkers(ui):
56 def _numworkers(ui):
57 s = ui.config(b'worker', b'numcpus')
57 s = ui.config(b'worker', b'numcpus')
58 if s:
58 if s:
59 try:
59 try:
60 n = int(s)
60 n = int(s)
61 if n >= 1:
61 if n >= 1:
62 return n
62 return n
63 except ValueError:
63 except ValueError:
64 raise error.Abort(_(b'number of cpus must be an integer'))
64 raise error.Abort(_(b'number of cpus must be an integer'))
65 return min(max(countcpus(), 4), 32)
65 return min(max(countcpus(), 4), 32)
66
66
67
67
68 if pycompat.ispy3:
69
70 class _blockingreader(object):
71 def __init__(self, wrapped):
72 self._wrapped = wrapped
73
74 def __getattr__(self, attr):
75 return getattr(self._wrapped, attr)
76
77 # issue multiple reads until size is fulfilled
78 def read(self, size=-1):
79 if size < 0:
80 return self._wrapped.readall()
81
82 buf = bytearray(size)
83 view = memoryview(buf)
84 pos = 0
85
86 while pos < size:
87 ret = self._wrapped.readinto(view[pos:])
88 if not ret:
89 break
90 pos += ret
91
92 del view
93 del buf[pos:]
94 return buf
95
96
97 else:
98
99 def _blockingreader(wrapped):
100 return wrapped
101
102
68 if pycompat.isposix or pycompat.iswindows:
103 if pycompat.isposix or pycompat.iswindows:
69 _STARTUP_COST = 0.01
104 _STARTUP_COST = 0.01
70 # The Windows worker is thread based. If tasks are CPU bound, threads
105 # The Windows worker is thread based. If tasks are CPU bound, threads
71 # in the presence of the GIL result in excessive context switching and
106 # in the presence of the GIL result in excessive context switching and
72 # this overhead can slow down execution.
107 # this overhead can slow down execution.
73 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
74 else:
109 else:
75 _STARTUP_COST = 1e30
110 _STARTUP_COST = 1e30
76 _DISALLOW_THREAD_UNSAFE = False
111 _DISALLOW_THREAD_UNSAFE = False
77
112
78
113
79 def worthwhile(ui, costperop, nops, threadsafe=True):
114 def worthwhile(ui, costperop, nops, threadsafe=True):
80 '''try to determine whether the benefit of multiple processes can
115 '''try to determine whether the benefit of multiple processes can
81 outweigh the cost of starting them'''
116 outweigh the cost of starting them'''
82
117
83 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
84 return False
119 return False
85
120
86 linear = costperop * nops
121 linear = costperop * nops
87 workers = _numworkers(ui)
122 workers = _numworkers(ui)
88 benefit = linear - (_STARTUP_COST * workers + linear / workers)
123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
89 return benefit >= 0.15
124 return benefit >= 0.15
90
125
91
126
92 def worker(
127 def worker(
93 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
94 ):
129 ):
95 '''run a function, possibly in parallel in multiple worker
130 '''run a function, possibly in parallel in multiple worker
96 processes.
131 processes.
97
132
98 returns a progress iterator
133 returns a progress iterator
99
134
100 costperarg - cost of a single task
135 costperarg - cost of a single task
101
136
102 func - function to run. It is expected to return a progress iterator.
137 func - function to run. It is expected to return a progress iterator.
103
138
104 staticargs - arguments to pass to every invocation of the function
139 staticargs - arguments to pass to every invocation of the function
105
140
106 args - arguments to split into chunks, to pass to individual
141 args - arguments to split into chunks, to pass to individual
107 workers
142 workers
108
143
109 hasretval - when True, func and the current function return an progress
144 hasretval - when True, func and the current function return an progress
110 iterator then a dict (encoded as an iterator that yield many (False, ..)
145 iterator then a dict (encoded as an iterator that yield many (False, ..)
111 then a (True, dict)). The dicts are joined in some arbitrary order, so
146 then a (True, dict)). The dicts are joined in some arbitrary order, so
112 overlapping keys are a bad idea.
147 overlapping keys are a bad idea.
113
148
114 threadsafe - whether work items are thread safe and can be executed using
149 threadsafe - whether work items are thread safe and can be executed using
115 a thread-based worker. Should be disabled for CPU heavy tasks that don't
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
116 release the GIL.
151 release the GIL.
117 '''
152 '''
118 enabled = ui.configbool(b'worker', b'enabled')
153 enabled = ui.configbool(b'worker', b'enabled')
119 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
154 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
120 return _platformworker(ui, func, staticargs, args, hasretval)
155 return _platformworker(ui, func, staticargs, args, hasretval)
121 return func(*staticargs + (args,))
156 return func(*staticargs + (args,))
122
157
123
158
124 def _posixworker(ui, func, staticargs, args, hasretval):
159 def _posixworker(ui, func, staticargs, args, hasretval):
125 workers = _numworkers(ui)
160 workers = _numworkers(ui)
126 oldhandler = signal.getsignal(signal.SIGINT)
161 oldhandler = signal.getsignal(signal.SIGINT)
127 signal.signal(signal.SIGINT, signal.SIG_IGN)
162 signal.signal(signal.SIGINT, signal.SIG_IGN)
128 pids, problem = set(), [0]
163 pids, problem = set(), [0]
129
164
130 def killworkers():
165 def killworkers():
131 # unregister SIGCHLD handler as all children will be killed. This
166 # unregister SIGCHLD handler as all children will be killed. This
132 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
167 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
133 # could be updated while iterating, which would cause inconsistency.
168 # could be updated while iterating, which would cause inconsistency.
134 signal.signal(signal.SIGCHLD, oldchldhandler)
169 signal.signal(signal.SIGCHLD, oldchldhandler)
135 # if one worker bails, there's no good reason to wait for the rest
170 # if one worker bails, there's no good reason to wait for the rest
136 for p in pids:
171 for p in pids:
137 try:
172 try:
138 os.kill(p, signal.SIGTERM)
173 os.kill(p, signal.SIGTERM)
139 except OSError as err:
174 except OSError as err:
140 if err.errno != errno.ESRCH:
175 if err.errno != errno.ESRCH:
141 raise
176 raise
142
177
143 def waitforworkers(blocking=True):
178 def waitforworkers(blocking=True):
144 for pid in pids.copy():
179 for pid in pids.copy():
145 p = st = 0
180 p = st = 0
146 while True:
181 while True:
147 try:
182 try:
148 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
183 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
149 break
184 break
150 except OSError as e:
185 except OSError as e:
151 if e.errno == errno.EINTR:
186 if e.errno == errno.EINTR:
152 continue
187 continue
153 elif e.errno == errno.ECHILD:
188 elif e.errno == errno.ECHILD:
154 # child would already be reaped, but pids yet been
189 # child would already be reaped, but pids yet been
155 # updated (maybe interrupted just after waitpid)
190 # updated (maybe interrupted just after waitpid)
156 pids.discard(pid)
191 pids.discard(pid)
157 break
192 break
158 else:
193 else:
159 raise
194 raise
160 if not p:
195 if not p:
161 # skip subsequent steps, because child process should
196 # skip subsequent steps, because child process should
162 # be still running in this case
197 # be still running in this case
163 continue
198 continue
164 pids.discard(p)
199 pids.discard(p)
165 st = _exitstatus(st)
200 st = _exitstatus(st)
166 if st and not problem[0]:
201 if st and not problem[0]:
167 problem[0] = st
202 problem[0] = st
168
203
169 def sigchldhandler(signum, frame):
204 def sigchldhandler(signum, frame):
170 waitforworkers(blocking=False)
205 waitforworkers(blocking=False)
171 if problem[0]:
206 if problem[0]:
172 killworkers()
207 killworkers()
173
208
174 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
209 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
175 ui.flush()
210 ui.flush()
176 parentpid = os.getpid()
211 parentpid = os.getpid()
177 pipes = []
212 pipes = []
178 retval = {}
213 retval = {}
179 for pargs in partition(args, workers):
214 for pargs in partition(args, workers):
180 # Every worker gets its own pipe to send results on, so we don't have to
215 # Every worker gets its own pipe to send results on, so we don't have to
181 # implement atomic writes larger than PIPE_BUF. Each forked process has
216 # implement atomic writes larger than PIPE_BUF. Each forked process has
182 # its own pipe's descriptors in the local variables, and the parent
217 # its own pipe's descriptors in the local variables, and the parent
183 # process has the full list of pipe descriptors (and it doesn't really
218 # process has the full list of pipe descriptors (and it doesn't really
184 # care what order they're in).
219 # care what order they're in).
185 rfd, wfd = os.pipe()
220 rfd, wfd = os.pipe()
186 pipes.append((rfd, wfd))
221 pipes.append((rfd, wfd))
187 # make sure we use os._exit in all worker code paths. otherwise the
222 # make sure we use os._exit in all worker code paths. otherwise the
188 # worker may do some clean-ups which could cause surprises like
223 # worker may do some clean-ups which could cause surprises like
189 # deadlock. see sshpeer.cleanup for example.
224 # deadlock. see sshpeer.cleanup for example.
190 # override error handling *before* fork. this is necessary because
225 # override error handling *before* fork. this is necessary because
191 # exception (signal) may arrive after fork, before "pid =" assignment
226 # exception (signal) may arrive after fork, before "pid =" assignment
192 # completes, and other exception handler (dispatch.py) can lead to
227 # completes, and other exception handler (dispatch.py) can lead to
193 # unexpected code path without os._exit.
228 # unexpected code path without os._exit.
194 ret = -1
229 ret = -1
195 try:
230 try:
196 pid = os.fork()
231 pid = os.fork()
197 if pid == 0:
232 if pid == 0:
198 signal.signal(signal.SIGINT, oldhandler)
233 signal.signal(signal.SIGINT, oldhandler)
199 signal.signal(signal.SIGCHLD, oldchldhandler)
234 signal.signal(signal.SIGCHLD, oldchldhandler)
200
235
201 def workerfunc():
236 def workerfunc():
202 for r, w in pipes[:-1]:
237 for r, w in pipes[:-1]:
203 os.close(r)
238 os.close(r)
204 os.close(w)
239 os.close(w)
205 os.close(rfd)
240 os.close(rfd)
206 for result in func(*(staticargs + (pargs,))):
241 for result in func(*(staticargs + (pargs,))):
207 os.write(wfd, util.pickle.dumps(result))
242 os.write(wfd, util.pickle.dumps(result))
208 return 0
243 return 0
209
244
210 ret = scmutil.callcatch(ui, workerfunc)
245 ret = scmutil.callcatch(ui, workerfunc)
211 except: # parent re-raises, child never returns
246 except: # parent re-raises, child never returns
212 if os.getpid() == parentpid:
247 if os.getpid() == parentpid:
213 raise
248 raise
214 exctype = sys.exc_info()[0]
249 exctype = sys.exc_info()[0]
215 force = not issubclass(exctype, KeyboardInterrupt)
250 force = not issubclass(exctype, KeyboardInterrupt)
216 ui.traceback(force=force)
251 ui.traceback(force=force)
217 finally:
252 finally:
218 if os.getpid() != parentpid:
253 if os.getpid() != parentpid:
219 try:
254 try:
220 ui.flush()
255 ui.flush()
221 except: # never returns, no re-raises
256 except: # never returns, no re-raises
222 pass
257 pass
223 finally:
258 finally:
224 os._exit(ret & 255)
259 os._exit(ret & 255)
225 pids.add(pid)
260 pids.add(pid)
226 selector = selectors.DefaultSelector()
261 selector = selectors.DefaultSelector()
227 for rfd, wfd in pipes:
262 for rfd, wfd in pipes:
228 os.close(wfd)
263 os.close(wfd)
229 selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
230
265
231 def cleanup():
266 def cleanup():
232 signal.signal(signal.SIGINT, oldhandler)
267 signal.signal(signal.SIGINT, oldhandler)
233 waitforworkers()
268 waitforworkers()
234 signal.signal(signal.SIGCHLD, oldchldhandler)
269 signal.signal(signal.SIGCHLD, oldchldhandler)
235 selector.close()
270 selector.close()
236 return problem[0]
271 return problem[0]
237
272
238 try:
273 try:
239 openpipes = len(pipes)
274 openpipes = len(pipes)
240 while openpipes > 0:
275 while openpipes > 0:
241 for key, events in selector.select():
276 for key, events in selector.select():
242 try:
277 try:
243 res = util.pickle.load(key.fileobj)
278 res = util.pickle.load(_blockingreader(key.fileobj))
244 if hasretval and res[0]:
279 if hasretval and res[0]:
245 retval.update(res[1])
280 retval.update(res[1])
246 else:
281 else:
247 yield res
282 yield res
248 except EOFError:
283 except EOFError:
249 selector.unregister(key.fileobj)
284 selector.unregister(key.fileobj)
250 key.fileobj.close()
285 key.fileobj.close()
251 openpipes -= 1
286 openpipes -= 1
252 except IOError as e:
287 except IOError as e:
253 if e.errno == errno.EINTR:
288 if e.errno == errno.EINTR:
254 continue
289 continue
255 raise
290 raise
256 except: # re-raises
291 except: # re-raises
257 killworkers()
292 killworkers()
258 cleanup()
293 cleanup()
259 raise
294 raise
260 status = cleanup()
295 status = cleanup()
261 if status:
296 if status:
262 if status < 0:
297 if status < 0:
263 os.kill(os.getpid(), -status)
298 os.kill(os.getpid(), -status)
264 sys.exit(status)
299 sys.exit(status)
265 if hasretval:
300 if hasretval:
266 yield True, retval
301 yield True, retval
267
302
268
303
269 def _posixexitstatus(code):
304 def _posixexitstatus(code):
270 '''convert a posix exit status into the same form returned by
305 '''convert a posix exit status into the same form returned by
271 os.spawnv
306 os.spawnv
272
307
273 returns None if the process was stopped instead of exiting'''
308 returns None if the process was stopped instead of exiting'''
274 if os.WIFEXITED(code):
309 if os.WIFEXITED(code):
275 return os.WEXITSTATUS(code)
310 return os.WEXITSTATUS(code)
276 elif os.WIFSIGNALED(code):
311 elif os.WIFSIGNALED(code):
277 return -(os.WTERMSIG(code))
312 return -(os.WTERMSIG(code))
278
313
279
314
280 def _windowsworker(ui, func, staticargs, args, hasretval):
315 def _windowsworker(ui, func, staticargs, args, hasretval):
281 class Worker(threading.Thread):
316 class Worker(threading.Thread):
282 def __init__(
317 def __init__(
283 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
318 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
284 ):
319 ):
285 threading.Thread.__init__(self, *args, **kwargs)
320 threading.Thread.__init__(self, *args, **kwargs)
286 self._taskqueue = taskqueue
321 self._taskqueue = taskqueue
287 self._resultqueue = resultqueue
322 self._resultqueue = resultqueue
288 self._func = func
323 self._func = func
289 self._staticargs = staticargs
324 self._staticargs = staticargs
290 self._interrupted = False
325 self._interrupted = False
291 self.daemon = True
326 self.daemon = True
292 self.exception = None
327 self.exception = None
293
328
294 def interrupt(self):
329 def interrupt(self):
295 self._interrupted = True
330 self._interrupted = True
296
331
297 def run(self):
332 def run(self):
298 try:
333 try:
299 while not self._taskqueue.empty():
334 while not self._taskqueue.empty():
300 try:
335 try:
301 args = self._taskqueue.get_nowait()
336 args = self._taskqueue.get_nowait()
302 for res in self._func(*self._staticargs + (args,)):
337 for res in self._func(*self._staticargs + (args,)):
303 self._resultqueue.put(res)
338 self._resultqueue.put(res)
304 # threading doesn't provide a native way to
339 # threading doesn't provide a native way to
305 # interrupt execution. handle it manually at every
340 # interrupt execution. handle it manually at every
306 # iteration.
341 # iteration.
307 if self._interrupted:
342 if self._interrupted:
308 return
343 return
309 except pycompat.queue.Empty:
344 except pycompat.queue.Empty:
310 break
345 break
311 except Exception as e:
346 except Exception as e:
312 # store the exception such that the main thread can resurface
347 # store the exception such that the main thread can resurface
313 # it as if the func was running without workers.
348 # it as if the func was running without workers.
314 self.exception = e
349 self.exception = e
315 raise
350 raise
316
351
317 threads = []
352 threads = []
318
353
319 def trykillworkers():
354 def trykillworkers():
320 # Allow up to 1 second to clean worker threads nicely
355 # Allow up to 1 second to clean worker threads nicely
321 cleanupend = time.time() + 1
356 cleanupend = time.time() + 1
322 for t in threads:
357 for t in threads:
323 t.interrupt()
358 t.interrupt()
324 for t in threads:
359 for t in threads:
325 remainingtime = cleanupend - time.time()
360 remainingtime = cleanupend - time.time()
326 t.join(remainingtime)
361 t.join(remainingtime)
327 if t.is_alive():
362 if t.is_alive():
328 # pass over the workers joining failure. it is more
363 # pass over the workers joining failure. it is more
329 # important to surface the inital exception than the
364 # important to surface the inital exception than the
330 # fact that one of workers may be processing a large
365 # fact that one of workers may be processing a large
331 # task and does not get to handle the interruption.
366 # task and does not get to handle the interruption.
332 ui.warn(
367 ui.warn(
333 _(
368 _(
334 b"failed to kill worker threads while "
369 b"failed to kill worker threads while "
335 b"handling an exception\n"
370 b"handling an exception\n"
336 )
371 )
337 )
372 )
338 return
373 return
339
374
340 workers = _numworkers(ui)
375 workers = _numworkers(ui)
341 resultqueue = pycompat.queue.Queue()
376 resultqueue = pycompat.queue.Queue()
342 taskqueue = pycompat.queue.Queue()
377 taskqueue = pycompat.queue.Queue()
343 retval = {}
378 retval = {}
344 # partition work to more pieces than workers to minimize the chance
379 # partition work to more pieces than workers to minimize the chance
345 # of uneven distribution of large tasks between the workers
380 # of uneven distribution of large tasks between the workers
346 for pargs in partition(args, workers * 20):
381 for pargs in partition(args, workers * 20):
347 taskqueue.put(pargs)
382 taskqueue.put(pargs)
348 for _i in range(workers):
383 for _i in range(workers):
349 t = Worker(taskqueue, resultqueue, func, staticargs)
384 t = Worker(taskqueue, resultqueue, func, staticargs)
350 threads.append(t)
385 threads.append(t)
351 t.start()
386 t.start()
352 try:
387 try:
353 while len(threads) > 0:
388 while len(threads) > 0:
354 while not resultqueue.empty():
389 while not resultqueue.empty():
355 res = resultqueue.get()
390 res = resultqueue.get()
356 if hasretval and res[0]:
391 if hasretval and res[0]:
357 retval.update(res[1])
392 retval.update(res[1])
358 else:
393 else:
359 yield res
394 yield res
360 threads[0].join(0.05)
395 threads[0].join(0.05)
361 finishedthreads = [_t for _t in threads if not _t.is_alive()]
396 finishedthreads = [_t for _t in threads if not _t.is_alive()]
362 for t in finishedthreads:
397 for t in finishedthreads:
363 if t.exception is not None:
398 if t.exception is not None:
364 raise t.exception
399 raise t.exception
365 threads.remove(t)
400 threads.remove(t)
366 except (Exception, KeyboardInterrupt): # re-raises
401 except (Exception, KeyboardInterrupt): # re-raises
367 trykillworkers()
402 trykillworkers()
368 raise
403 raise
369 while not resultqueue.empty():
404 while not resultqueue.empty():
370 res = resultqueue.get()
405 res = resultqueue.get()
371 if hasretval and res[0]:
406 if hasretval and res[0]:
372 retval.update(res[1])
407 retval.update(res[1])
373 else:
408 else:
374 yield res
409 yield res
375 if hasretval:
410 if hasretval:
376 yield True, retval
411 yield True, retval
377
412
378
413
379 if pycompat.iswindows:
414 if pycompat.iswindows:
380 _platformworker = _windowsworker
415 _platformworker = _windowsworker
381 else:
416 else:
382 _platformworker = _posixworker
417 _platformworker = _posixworker
383 _exitstatus = _posixexitstatus
418 _exitstatus = _posixexitstatus
384
419
385
420
386 def partition(lst, nslices):
421 def partition(lst, nslices):
387 '''partition a list into N slices of roughly equal size
422 '''partition a list into N slices of roughly equal size
388
423
389 The current strategy takes every Nth element from the input. If
424 The current strategy takes every Nth element from the input. If
390 we ever write workers that need to preserve grouping in input
425 we ever write workers that need to preserve grouping in input
391 we should consider allowing callers to specify a partition strategy.
426 we should consider allowing callers to specify a partition strategy.
392
427
393 mpm is not a fan of this partitioning strategy when files are involved.
428 mpm is not a fan of this partitioning strategy when files are involved.
394 In his words:
429 In his words:
395
430
396 Single-threaded Mercurial makes a point of creating and visiting
431 Single-threaded Mercurial makes a point of creating and visiting
397 files in a fixed order (alphabetical). When creating files in order,
432 files in a fixed order (alphabetical). When creating files in order,
398 a typical filesystem is likely to allocate them on nearby regions on
433 a typical filesystem is likely to allocate them on nearby regions on
399 disk. Thus, when revisiting in the same order, locality is maximized
434 disk. Thus, when revisiting in the same order, locality is maximized
400 and various forms of OS and disk-level caching and read-ahead get a
435 and various forms of OS and disk-level caching and read-ahead get a
401 chance to work.
436 chance to work.
402
437
403 This effect can be quite significant on spinning disks. I discovered it
438 This effect can be quite significant on spinning disks. I discovered it
404 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
439 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
405 Tarring a repo and copying it to another disk effectively randomized
440 Tarring a repo and copying it to another disk effectively randomized
406 the revlog ordering on disk by sorting the revlogs by hash and suddenly
441 the revlog ordering on disk by sorting the revlogs by hash and suddenly
407 performance of my kernel checkout benchmark dropped by ~10x because the
442 performance of my kernel checkout benchmark dropped by ~10x because the
408 "working set" of sectors visited no longer fit in the drive's cache and
443 "working set" of sectors visited no longer fit in the drive's cache and
409 the workload switched from streaming to random I/O.
444 the workload switched from streaming to random I/O.
410
445
411 What we should really be doing is have workers read filenames from a
446 What we should really be doing is have workers read filenames from a
412 ordered queue. This preserves locality and also keeps any worker from
447 ordered queue. This preserves locality and also keeps any worker from
413 getting more than one file out of balance.
448 getting more than one file out of balance.
414 '''
449 '''
415 for i in range(nslices):
450 for i in range(nslices):
416 yield lst[i::nslices]
451 yield lst[i::nslices]
@@ -1,134 +1,165 b''
1 Test UI worker interaction
1 Test UI worker interaction
2
2
3 $ cat > t.py <<EOF
3 $ cat > t.py <<EOF
4 > from __future__ import absolute_import, print_function
4 > from __future__ import absolute_import, print_function
5 > import sys
5 > import sys
6 > import time
6 > import time
7 > from mercurial import (
7 > from mercurial import (
8 > error,
8 > error,
9 > registrar,
9 > registrar,
10 > ui as uimod,
10 > ui as uimod,
11 > worker,
11 > worker,
12 > )
12 > )
13 > sys.unraisablehook = lambda x: None
13 > sys.unraisablehook = lambda x: None
14 > def abort(ui, args):
14 > def abort(ui, args):
15 > if args[0] == 0:
15 > if args[0] == 0:
16 > # by first worker for test stability
16 > # by first worker for test stability
17 > raise error.Abort(b'known exception')
17 > raise error.Abort(b'known exception')
18 > return runme(ui, [])
18 > return runme(ui, [])
19 > def exc(ui, args):
19 > def exc(ui, args):
20 > if args[0] == 0:
20 > if args[0] == 0:
21 > # by first worker for test stability
21 > # by first worker for test stability
22 > raise Exception('unknown exception')
22 > raise Exception('unknown exception')
23 > return runme(ui, [])
23 > return runme(ui, [])
24 > def runme(ui, args):
24 > def runme(ui, args):
25 > for arg in args:
25 > for arg in args:
26 > ui.status(b'run\n')
26 > ui.status(b'run\n')
27 > yield 1, arg
27 > yield 1, arg
28 > time.sleep(0.1) # easier to trigger killworkers code path
28 > time.sleep(0.1) # easier to trigger killworkers code path
29 > functable = {
29 > functable = {
30 > b'abort': abort,
30 > b'abort': abort,
31 > b'exc': exc,
31 > b'exc': exc,
32 > b'runme': runme,
32 > b'runme': runme,
33 > }
33 > }
34 > cmdtable = {}
34 > cmdtable = {}
35 > command = registrar.command(cmdtable)
35 > command = registrar.command(cmdtable)
36 > @command(b'test', [], b'hg test [COST] [FUNC]')
36 > @command(b'test', [], b'hg test [COST] [FUNC]')
37 > def t(ui, repo, cost=1.0, func=b'runme'):
37 > def t(ui, repo, cost=1.0, func=b'runme'):
38 > cost = float(cost)
38 > cost = float(cost)
39 > func = functable[func]
39 > func = functable[func]
40 > ui.status(b'start\n')
40 > ui.status(b'start\n')
41 > runs = worker.worker(ui, cost, func, (ui,), range(8))
41 > runs = worker.worker(ui, cost, func, (ui,), range(8))
42 > for n, i in runs:
42 > for n, i in runs:
43 > pass
43 > pass
44 > ui.status(b'done\n')
44 > ui.status(b'done\n')
45 > EOF
45 > EOF
46 $ abspath=`pwd`/t.py
46 $ abspath=`pwd`/t.py
47 $ hg init
47 $ hg init
48
48
49 Run tests with worker enable by forcing a heigh cost
49 Run tests with worker enable by forcing a heigh cost
50
50
51 $ hg --config "extensions.t=$abspath" test 100000.0
51 $ hg --config "extensions.t=$abspath" test 100000.0
52 start
52 start
53 run
53 run
54 run
54 run
55 run
55 run
56 run
56 run
57 run
57 run
58 run
58 run
59 run
59 run
60 run
60 run
61 done
61 done
62
62
63 Run tests without worker by forcing a low cost
63 Run tests without worker by forcing a low cost
64
64
65 $ hg --config "extensions.t=$abspath" test 0.0000001
65 $ hg --config "extensions.t=$abspath" test 0.0000001
66 start
66 start
67 run
67 run
68 run
68 run
69 run
69 run
70 run
70 run
71 run
71 run
72 run
72 run
73 run
73 run
74 run
74 run
75 done
75 done
76
76
77 #if no-windows
77 #if no-windows
78
78
79 Known exception should be caught, but printed if --traceback is enabled
79 Known exception should be caught, but printed if --traceback is enabled
80
80
81 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
81 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
82 > test 100000.0 abort 2>&1
82 > test 100000.0 abort 2>&1
83 start
83 start
84 abort: known exception
84 abort: known exception
85 [255]
85 [255]
86
86
87 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
87 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
88 > test 100000.0 abort --traceback 2>&1 | egrep '(SystemExit|Abort)'
88 > test 100000.0 abort --traceback 2>&1 | egrep '(SystemExit|Abort)'
89 raise error.Abort(b'known exception')
89 raise error.Abort(b'known exception')
90 mercurial.error.Abort: known exception (py3 !)
90 mercurial.error.Abort: known exception (py3 !)
91 Abort: known exception (no-py3 !)
91 Abort: known exception (no-py3 !)
92 SystemExit: 255
92 SystemExit: 255
93
93
94 Traceback must be printed for unknown exceptions
94 Traceback must be printed for unknown exceptions
95
95
96 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
96 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
97 > test 100000.0 exc 2>&1 | grep '^Exception'
97 > test 100000.0 exc 2>&1 | grep '^Exception'
98 Exception: unknown exception
98 Exception: unknown exception
99
99
100 Workers should not do cleanups in all cases
100 Workers should not do cleanups in all cases
101
101
102 $ cat > $TESTTMP/detectcleanup.py <<EOF
102 $ cat > $TESTTMP/detectcleanup.py <<EOF
103 > from __future__ import absolute_import
103 > from __future__ import absolute_import
104 > import atexit
104 > import atexit
105 > import os
105 > import os
106 > import sys
106 > import sys
107 > import time
107 > import time
108 > sys.unraisablehook = lambda x: None
108 > sys.unraisablehook = lambda x: None
109 > oldfork = os.fork
109 > oldfork = os.fork
110 > count = 0
110 > count = 0
111 > parentpid = os.getpid()
111 > parentpid = os.getpid()
112 > def delayedfork():
112 > def delayedfork():
113 > global count
113 > global count
114 > count += 1
114 > count += 1
115 > pid = oldfork()
115 > pid = oldfork()
116 > # make it easier to test SIGTERM hitting other workers when they have
116 > # make it easier to test SIGTERM hitting other workers when they have
117 > # not set up error handling yet.
117 > # not set up error handling yet.
118 > if count > 1 and pid == 0:
118 > if count > 1 and pid == 0:
119 > time.sleep(0.1)
119 > time.sleep(0.1)
120 > return pid
120 > return pid
121 > os.fork = delayedfork
121 > os.fork = delayedfork
122 > def cleanup():
122 > def cleanup():
123 > if os.getpid() != parentpid:
123 > if os.getpid() != parentpid:
124 > os.write(1, 'should never happen\n')
124 > os.write(1, 'should never happen\n')
125 > atexit.register(cleanup)
125 > atexit.register(cleanup)
126 > EOF
126 > EOF
127
127
128 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
128 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
129 > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
129 > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
130 start
130 start
131 abort: known exception
131 abort: known exception
132 [255]
132 [255]
133
133
134 Do not crash on partially read result
135
136 $ cat > $TESTTMP/detecttruncated.py <<EOF
137 > from __future__ import absolute_import
138 > import os
139 > import sys
140 > import time
141 > sys.unraisablehook = lambda x: None
142 > oldwrite = os.write
143 > def splitwrite(fd, string):
144 > ret = oldwrite(fd, string[:9])
145 > if ret == 9:
146 > time.sleep(0.1)
147 > ret += oldwrite(fd, string[9:])
148 > return ret
149 > os.write = splitwrite
150 > EOF
151
152 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
153 > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
154 start
155 run
156 run
157 run
158 run
159 run
160 run
161 run
162 run
163 done
164
134 #endif
165 #endif
General Comments 0
You need to be logged in to leave comments. Login now