##// END OF EJS Templates
worker: rename variable to reflect constant...
Gregory Szorc -
r38753:69ed2cff default
parent child Browse files
Show More
@@ -1,355 +1,355 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19 selectors.BaseSelector
19 selectors.BaseSelector
20 except ImportError:
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
21 from .thirdparty import selectors2 as selectors
22
22
23 from .i18n import _
23 from .i18n import _
24 from . import (
24 from . import (
25 encoding,
25 encoding,
26 error,
26 error,
27 pycompat,
27 pycompat,
28 scmutil,
28 scmutil,
29 util,
29 util,
30 )
30 )
31
31
32 def countcpus():
32 def countcpus():
33 '''try to count the number of CPUs on the system'''
33 '''try to count the number of CPUs on the system'''
34
34
35 # posix
35 # posix
36 try:
36 try:
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (AttributeError, ValueError):
40 except (AttributeError, ValueError):
41 pass
41 pass
42
42
43 # windows
43 # windows
44 try:
44 try:
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 if n > 0:
46 if n > 0:
47 return n
47 return n
48 except (KeyError, ValueError):
48 except (KeyError, ValueError):
49 pass
49 pass
50
50
51 return 1
51 return 1
52
52
53 def _numworkers(ui):
53 def _numworkers(ui):
54 s = ui.config('worker', 'numcpus')
54 s = ui.config('worker', 'numcpus')
55 if s:
55 if s:
56 try:
56 try:
57 n = int(s)
57 n = int(s)
58 if n >= 1:
58 if n >= 1:
59 return n
59 return n
60 except ValueError:
60 except ValueError:
61 raise error.Abort(_('number of cpus must be an integer'))
61 raise error.Abort(_('number of cpus must be an integer'))
62 return min(max(countcpus(), 4), 32)
62 return min(max(countcpus(), 4), 32)
63
63
64 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
65 _startupcost = 0.01
65 _STARTUP_COST = 0.01
66 else:
66 else:
67 _startupcost = 1e30
67 _STARTUP_COST = 1e30
68
68
69 def worthwhile(ui, costperop, nops):
69 def worthwhile(ui, costperop, nops):
70 '''try to determine whether the benefit of multiple processes can
70 '''try to determine whether the benefit of multiple processes can
71 outweigh the cost of starting them'''
71 outweigh the cost of starting them'''
72 linear = costperop * nops
72 linear = costperop * nops
73 workers = _numworkers(ui)
73 workers = _numworkers(ui)
74 benefit = linear - (_startupcost * workers + linear / workers)
74 benefit = linear - (_STARTUP_COST * workers + linear / workers)
75 return benefit >= 0.15
75 return benefit >= 0.15
76
76
77 def worker(ui, costperarg, func, staticargs, args):
77 def worker(ui, costperarg, func, staticargs, args):
78 '''run a function, possibly in parallel in multiple worker
78 '''run a function, possibly in parallel in multiple worker
79 processes.
79 processes.
80
80
81 returns a progress iterator
81 returns a progress iterator
82
82
83 costperarg - cost of a single task
83 costperarg - cost of a single task
84
84
85 func - function to run
85 func - function to run
86
86
87 staticargs - arguments to pass to every invocation of the function
87 staticargs - arguments to pass to every invocation of the function
88
88
89 args - arguments to split into chunks, to pass to individual
89 args - arguments to split into chunks, to pass to individual
90 workers
90 workers
91 '''
91 '''
92 enabled = ui.configbool('worker', 'enabled')
92 enabled = ui.configbool('worker', 'enabled')
93 if enabled and worthwhile(ui, costperarg, len(args)):
93 if enabled and worthwhile(ui, costperarg, len(args)):
94 return _platformworker(ui, func, staticargs, args)
94 return _platformworker(ui, func, staticargs, args)
95 return func(*staticargs + (args,))
95 return func(*staticargs + (args,))
96
96
97 def _posixworker(ui, func, staticargs, args):
97 def _posixworker(ui, func, staticargs, args):
98 workers = _numworkers(ui)
98 workers = _numworkers(ui)
99 oldhandler = signal.getsignal(signal.SIGINT)
99 oldhandler = signal.getsignal(signal.SIGINT)
100 signal.signal(signal.SIGINT, signal.SIG_IGN)
100 signal.signal(signal.SIGINT, signal.SIG_IGN)
101 pids, problem = set(), [0]
101 pids, problem = set(), [0]
102 def killworkers():
102 def killworkers():
103 # unregister SIGCHLD handler as all children will be killed. This
103 # unregister SIGCHLD handler as all children will be killed. This
104 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
104 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
105 # could be updated while iterating, which would cause inconsistency.
105 # could be updated while iterating, which would cause inconsistency.
106 signal.signal(signal.SIGCHLD, oldchldhandler)
106 signal.signal(signal.SIGCHLD, oldchldhandler)
107 # if one worker bails, there's no good reason to wait for the rest
107 # if one worker bails, there's no good reason to wait for the rest
108 for p in pids:
108 for p in pids:
109 try:
109 try:
110 os.kill(p, signal.SIGTERM)
110 os.kill(p, signal.SIGTERM)
111 except OSError as err:
111 except OSError as err:
112 if err.errno != errno.ESRCH:
112 if err.errno != errno.ESRCH:
113 raise
113 raise
114 def waitforworkers(blocking=True):
114 def waitforworkers(blocking=True):
115 for pid in pids.copy():
115 for pid in pids.copy():
116 p = st = 0
116 p = st = 0
117 while True:
117 while True:
118 try:
118 try:
119 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
119 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
120 break
120 break
121 except OSError as e:
121 except OSError as e:
122 if e.errno == errno.EINTR:
122 if e.errno == errno.EINTR:
123 continue
123 continue
124 elif e.errno == errno.ECHILD:
124 elif e.errno == errno.ECHILD:
125 # child would already be reaped, but pids yet been
125 # child would already be reaped, but pids yet been
126 # updated (maybe interrupted just after waitpid)
126 # updated (maybe interrupted just after waitpid)
127 pids.discard(pid)
127 pids.discard(pid)
128 break
128 break
129 else:
129 else:
130 raise
130 raise
131 if not p:
131 if not p:
132 # skip subsequent steps, because child process should
132 # skip subsequent steps, because child process should
133 # be still running in this case
133 # be still running in this case
134 continue
134 continue
135 pids.discard(p)
135 pids.discard(p)
136 st = _exitstatus(st)
136 st = _exitstatus(st)
137 if st and not problem[0]:
137 if st and not problem[0]:
138 problem[0] = st
138 problem[0] = st
139 def sigchldhandler(signum, frame):
139 def sigchldhandler(signum, frame):
140 waitforworkers(blocking=False)
140 waitforworkers(blocking=False)
141 if problem[0]:
141 if problem[0]:
142 killworkers()
142 killworkers()
143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
144 ui.flush()
144 ui.flush()
145 parentpid = os.getpid()
145 parentpid = os.getpid()
146 pipes = []
146 pipes = []
147 for pargs in partition(args, workers):
147 for pargs in partition(args, workers):
148 # Every worker gets its own pipe to send results on, so we don't have to
148 # Every worker gets its own pipe to send results on, so we don't have to
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 # its own pipe's descriptors in the local variables, and the parent
150 # its own pipe's descriptors in the local variables, and the parent
151 # process has the full list of pipe descriptors (and it doesn't really
151 # process has the full list of pipe descriptors (and it doesn't really
152 # care what order they're in).
152 # care what order they're in).
153 rfd, wfd = os.pipe()
153 rfd, wfd = os.pipe()
154 pipes.append((rfd, wfd))
154 pipes.append((rfd, wfd))
155 # make sure we use os._exit in all worker code paths. otherwise the
155 # make sure we use os._exit in all worker code paths. otherwise the
156 # worker may do some clean-ups which could cause surprises like
156 # worker may do some clean-ups which could cause surprises like
157 # deadlock. see sshpeer.cleanup for example.
157 # deadlock. see sshpeer.cleanup for example.
158 # override error handling *before* fork. this is necessary because
158 # override error handling *before* fork. this is necessary because
159 # exception (signal) may arrive after fork, before "pid =" assignment
159 # exception (signal) may arrive after fork, before "pid =" assignment
160 # completes, and other exception handler (dispatch.py) can lead to
160 # completes, and other exception handler (dispatch.py) can lead to
161 # unexpected code path without os._exit.
161 # unexpected code path without os._exit.
162 ret = -1
162 ret = -1
163 try:
163 try:
164 pid = os.fork()
164 pid = os.fork()
165 if pid == 0:
165 if pid == 0:
166 signal.signal(signal.SIGINT, oldhandler)
166 signal.signal(signal.SIGINT, oldhandler)
167 signal.signal(signal.SIGCHLD, oldchldhandler)
167 signal.signal(signal.SIGCHLD, oldchldhandler)
168
168
169 def workerfunc():
169 def workerfunc():
170 for r, w in pipes[:-1]:
170 for r, w in pipes[:-1]:
171 os.close(r)
171 os.close(r)
172 os.close(w)
172 os.close(w)
173 os.close(rfd)
173 os.close(rfd)
174 for result in func(*(staticargs + (pargs,))):
174 for result in func(*(staticargs + (pargs,))):
175 os.write(wfd, util.pickle.dumps(result))
175 os.write(wfd, util.pickle.dumps(result))
176 return 0
176 return 0
177
177
178 ret = scmutil.callcatch(ui, workerfunc)
178 ret = scmutil.callcatch(ui, workerfunc)
179 except: # parent re-raises, child never returns
179 except: # parent re-raises, child never returns
180 if os.getpid() == parentpid:
180 if os.getpid() == parentpid:
181 raise
181 raise
182 exctype = sys.exc_info()[0]
182 exctype = sys.exc_info()[0]
183 force = not issubclass(exctype, KeyboardInterrupt)
183 force = not issubclass(exctype, KeyboardInterrupt)
184 ui.traceback(force=force)
184 ui.traceback(force=force)
185 finally:
185 finally:
186 if os.getpid() != parentpid:
186 if os.getpid() != parentpid:
187 try:
187 try:
188 ui.flush()
188 ui.flush()
189 except: # never returns, no re-raises
189 except: # never returns, no re-raises
190 pass
190 pass
191 finally:
191 finally:
192 os._exit(ret & 255)
192 os._exit(ret & 255)
193 pids.add(pid)
193 pids.add(pid)
194 selector = selectors.DefaultSelector()
194 selector = selectors.DefaultSelector()
195 for rfd, wfd in pipes:
195 for rfd, wfd in pipes:
196 os.close(wfd)
196 os.close(wfd)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
198 def cleanup():
198 def cleanup():
199 signal.signal(signal.SIGINT, oldhandler)
199 signal.signal(signal.SIGINT, oldhandler)
200 waitforworkers()
200 waitforworkers()
201 signal.signal(signal.SIGCHLD, oldchldhandler)
201 signal.signal(signal.SIGCHLD, oldchldhandler)
202 status = problem[0]
202 status = problem[0]
203 if status:
203 if status:
204 if status < 0:
204 if status < 0:
205 os.kill(os.getpid(), -status)
205 os.kill(os.getpid(), -status)
206 sys.exit(status)
206 sys.exit(status)
207 try:
207 try:
208 openpipes = len(pipes)
208 openpipes = len(pipes)
209 while openpipes > 0:
209 while openpipes > 0:
210 for key, events in selector.select():
210 for key, events in selector.select():
211 try:
211 try:
212 yield util.pickle.load(key.fileobj)
212 yield util.pickle.load(key.fileobj)
213 except EOFError:
213 except EOFError:
214 selector.unregister(key.fileobj)
214 selector.unregister(key.fileobj)
215 key.fileobj.close()
215 key.fileobj.close()
216 openpipes -= 1
216 openpipes -= 1
217 except IOError as e:
217 except IOError as e:
218 if e.errno == errno.EINTR:
218 if e.errno == errno.EINTR:
219 continue
219 continue
220 raise
220 raise
221 except: # re-raises
221 except: # re-raises
222 killworkers()
222 killworkers()
223 cleanup()
223 cleanup()
224 raise
224 raise
225 cleanup()
225 cleanup()
226
226
227 def _posixexitstatus(code):
227 def _posixexitstatus(code):
228 '''convert a posix exit status into the same form returned by
228 '''convert a posix exit status into the same form returned by
229 os.spawnv
229 os.spawnv
230
230
231 returns None if the process was stopped instead of exiting'''
231 returns None if the process was stopped instead of exiting'''
232 if os.WIFEXITED(code):
232 if os.WIFEXITED(code):
233 return os.WEXITSTATUS(code)
233 return os.WEXITSTATUS(code)
234 elif os.WIFSIGNALED(code):
234 elif os.WIFSIGNALED(code):
235 return -os.WTERMSIG(code)
235 return -os.WTERMSIG(code)
236
236
237 def _windowsworker(ui, func, staticargs, args):
237 def _windowsworker(ui, func, staticargs, args):
238 class Worker(threading.Thread):
238 class Worker(threading.Thread):
239 def __init__(self, taskqueue, resultqueue, func, staticargs,
239 def __init__(self, taskqueue, resultqueue, func, staticargs,
240 group=None, target=None, name=None, verbose=None):
240 group=None, target=None, name=None, verbose=None):
241 threading.Thread.__init__(self, group=group, target=target,
241 threading.Thread.__init__(self, group=group, target=target,
242 name=name, verbose=verbose)
242 name=name, verbose=verbose)
243 self._taskqueue = taskqueue
243 self._taskqueue = taskqueue
244 self._resultqueue = resultqueue
244 self._resultqueue = resultqueue
245 self._func = func
245 self._func = func
246 self._staticargs = staticargs
246 self._staticargs = staticargs
247 self._interrupted = False
247 self._interrupted = False
248 self.daemon = True
248 self.daemon = True
249 self.exception = None
249 self.exception = None
250
250
251 def interrupt(self):
251 def interrupt(self):
252 self._interrupted = True
252 self._interrupted = True
253
253
254 def run(self):
254 def run(self):
255 try:
255 try:
256 while not self._taskqueue.empty():
256 while not self._taskqueue.empty():
257 try:
257 try:
258 args = self._taskqueue.get_nowait()
258 args = self._taskqueue.get_nowait()
259 for res in self._func(*self._staticargs + (args,)):
259 for res in self._func(*self._staticargs + (args,)):
260 self._resultqueue.put(res)
260 self._resultqueue.put(res)
261 # threading doesn't provide a native way to
261 # threading doesn't provide a native way to
262 # interrupt execution. handle it manually at every
262 # interrupt execution. handle it manually at every
263 # iteration.
263 # iteration.
264 if self._interrupted:
264 if self._interrupted:
265 return
265 return
266 except pycompat.queue.Empty:
266 except pycompat.queue.Empty:
267 break
267 break
268 except Exception as e:
268 except Exception as e:
269 # store the exception such that the main thread can resurface
269 # store the exception such that the main thread can resurface
270 # it as if the func was running without workers.
270 # it as if the func was running without workers.
271 self.exception = e
271 self.exception = e
272 raise
272 raise
273
273
274 threads = []
274 threads = []
275 def trykillworkers():
275 def trykillworkers():
276 # Allow up to 1 second to clean worker threads nicely
276 # Allow up to 1 second to clean worker threads nicely
277 cleanupend = time.time() + 1
277 cleanupend = time.time() + 1
278 for t in threads:
278 for t in threads:
279 t.interrupt()
279 t.interrupt()
280 for t in threads:
280 for t in threads:
281 remainingtime = cleanupend - time.time()
281 remainingtime = cleanupend - time.time()
282 t.join(remainingtime)
282 t.join(remainingtime)
283 if t.is_alive():
283 if t.is_alive():
284 # pass over the workers joining failure. it is more
284 # pass over the workers joining failure. it is more
285 # important to surface the inital exception than the
285 # important to surface the inital exception than the
286 # fact that one of workers may be processing a large
286 # fact that one of workers may be processing a large
287 # task and does not get to handle the interruption.
287 # task and does not get to handle the interruption.
288 ui.warn(_("failed to kill worker threads while "
288 ui.warn(_("failed to kill worker threads while "
289 "handling an exception\n"))
289 "handling an exception\n"))
290 return
290 return
291
291
292 workers = _numworkers(ui)
292 workers = _numworkers(ui)
293 resultqueue = pycompat.queue.Queue()
293 resultqueue = pycompat.queue.Queue()
294 taskqueue = pycompat.queue.Queue()
294 taskqueue = pycompat.queue.Queue()
295 # partition work to more pieces than workers to minimize the chance
295 # partition work to more pieces than workers to minimize the chance
296 # of uneven distribution of large tasks between the workers
296 # of uneven distribution of large tasks between the workers
297 for pargs in partition(args, workers * 20):
297 for pargs in partition(args, workers * 20):
298 taskqueue.put(pargs)
298 taskqueue.put(pargs)
299 for _i in range(workers):
299 for _i in range(workers):
300 t = Worker(taskqueue, resultqueue, func, staticargs)
300 t = Worker(taskqueue, resultqueue, func, staticargs)
301 threads.append(t)
301 threads.append(t)
302 t.start()
302 t.start()
303 try:
303 try:
304 while len(threads) > 0:
304 while len(threads) > 0:
305 while not resultqueue.empty():
305 while not resultqueue.empty():
306 yield resultqueue.get()
306 yield resultqueue.get()
307 threads[0].join(0.05)
307 threads[0].join(0.05)
308 finishedthreads = [_t for _t in threads if not _t.is_alive()]
308 finishedthreads = [_t for _t in threads if not _t.is_alive()]
309 for t in finishedthreads:
309 for t in finishedthreads:
310 if t.exception is not None:
310 if t.exception is not None:
311 raise t.exception
311 raise t.exception
312 threads.remove(t)
312 threads.remove(t)
313 except (Exception, KeyboardInterrupt): # re-raises
313 except (Exception, KeyboardInterrupt): # re-raises
314 trykillworkers()
314 trykillworkers()
315 raise
315 raise
316 while not resultqueue.empty():
316 while not resultqueue.empty():
317 yield resultqueue.get()
317 yield resultqueue.get()
318
318
319 if pycompat.iswindows:
319 if pycompat.iswindows:
320 _platformworker = _windowsworker
320 _platformworker = _windowsworker
321 else:
321 else:
322 _platformworker = _posixworker
322 _platformworker = _posixworker
323 _exitstatus = _posixexitstatus
323 _exitstatus = _posixexitstatus
324
324
325 def partition(lst, nslices):
325 def partition(lst, nslices):
326 '''partition a list into N slices of roughly equal size
326 '''partition a list into N slices of roughly equal size
327
327
328 The current strategy takes every Nth element from the input. If
328 The current strategy takes every Nth element from the input. If
329 we ever write workers that need to preserve grouping in input
329 we ever write workers that need to preserve grouping in input
330 we should consider allowing callers to specify a partition strategy.
330 we should consider allowing callers to specify a partition strategy.
331
331
332 mpm is not a fan of this partitioning strategy when files are involved.
332 mpm is not a fan of this partitioning strategy when files are involved.
333 In his words:
333 In his words:
334
334
335 Single-threaded Mercurial makes a point of creating and visiting
335 Single-threaded Mercurial makes a point of creating and visiting
336 files in a fixed order (alphabetical). When creating files in order,
336 files in a fixed order (alphabetical). When creating files in order,
337 a typical filesystem is likely to allocate them on nearby regions on
337 a typical filesystem is likely to allocate them on nearby regions on
338 disk. Thus, when revisiting in the same order, locality is maximized
338 disk. Thus, when revisiting in the same order, locality is maximized
339 and various forms of OS and disk-level caching and read-ahead get a
339 and various forms of OS and disk-level caching and read-ahead get a
340 chance to work.
340 chance to work.
341
341
342 This effect can be quite significant on spinning disks. I discovered it
342 This effect can be quite significant on spinning disks. I discovered it
343 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
343 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
344 Tarring a repo and copying it to another disk effectively randomized
344 Tarring a repo and copying it to another disk effectively randomized
345 the revlog ordering on disk by sorting the revlogs by hash and suddenly
345 the revlog ordering on disk by sorting the revlogs by hash and suddenly
346 performance of my kernel checkout benchmark dropped by ~10x because the
346 performance of my kernel checkout benchmark dropped by ~10x because the
347 "working set" of sectors visited no longer fit in the drive's cache and
347 "working set" of sectors visited no longer fit in the drive's cache and
348 the workload switched from streaming to random I/O.
348 the workload switched from streaming to random I/O.
349
349
350 What we should really be doing is have workers read filenames from a
350 What we should really be doing is have workers read filenames from a
351 ordered queue. This preserves locality and also keeps any worker from
351 ordered queue. This preserves locality and also keeps any worker from
352 getting more than one file out of balance.
352 getting more than one file out of balance.
353 '''
353 '''
354 for i in range(nslices):
354 for i in range(nslices):
355 yield lst[i::nslices]
355 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now