##// END OF EJS Templates
worker: rewrite error handling so os._exit covers all cases...
Jun Wu -
r32112:31763785 default
parent child Browse files
Show More
@@ -1,234 +1,240
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 pycompat,
20 20 scmutil,
21 21 util,
22 22 )
23 23
24 24 def countcpus():
25 25 '''try to count the number of CPUs on the system'''
26 26
27 27 # posix
28 28 try:
29 29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 30 if n > 0:
31 31 return n
32 32 except (AttributeError, ValueError):
33 33 pass
34 34
35 35 # windows
36 36 try:
37 37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 38 if n > 0:
39 39 return n
40 40 except (KeyError, ValueError):
41 41 pass
42 42
43 43 return 1
44 44
45 45 def _numworkers(ui):
46 46 s = ui.config('worker', 'numcpus')
47 47 if s:
48 48 try:
49 49 n = int(s)
50 50 if n >= 1:
51 51 return n
52 52 except ValueError:
53 53 raise error.Abort(_('number of cpus must be an integer'))
54 54 return min(max(countcpus(), 4), 32)
55 55
56 56 if pycompat.osname == 'posix':
57 57 _startupcost = 0.01
58 58 else:
59 59 _startupcost = 1e30
60 60
61 61 def worthwhile(ui, costperop, nops):
62 62 '''try to determine whether the benefit of multiple processes can
63 63 outweigh the cost of starting them'''
64 64 linear = costperop * nops
65 65 workers = _numworkers(ui)
66 66 benefit = linear - (_startupcost * workers + linear / workers)
67 67 return benefit >= 0.15
68 68
69 69 def worker(ui, costperarg, func, staticargs, args):
70 70 '''run a function, possibly in parallel in multiple worker
71 71 processes.
72 72
73 73 returns a progress iterator
74 74
75 75 costperarg - cost of a single task
76 76
77 77 func - function to run
78 78
79 79 staticargs - arguments to pass to every invocation of the function
80 80
81 81 args - arguments to split into chunks, to pass to individual
82 82 workers
83 83 '''
84 84 if worthwhile(ui, costperarg, len(args)):
85 85 return _platformworker(ui, func, staticargs, args)
86 86 return func(*staticargs + (args,))
87 87
88 88 def _posixworker(ui, func, staticargs, args):
89 89 rfd, wfd = os.pipe()
90 90 workers = _numworkers(ui)
91 91 oldhandler = signal.getsignal(signal.SIGINT)
92 92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 93 pids, problem = set(), [0]
94 94 def killworkers():
95 95 # unregister SIGCHLD handler as all children will be killed. This
96 96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 97 # could be updated while iterating, which would cause inconsistency.
98 98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 99 # if one worker bails, there's no good reason to wait for the rest
100 100 for p in pids:
101 101 try:
102 102 os.kill(p, signal.SIGTERM)
103 103 except OSError as err:
104 104 if err.errno != errno.ESRCH:
105 105 raise
106 106 def waitforworkers(blocking=True):
107 107 for pid in pids.copy():
108 108 p = st = 0
109 109 while True:
110 110 try:
111 111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 112 break
113 113 except OSError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 elif e.errno == errno.ECHILD:
117 117 # child would already be reaped, but pids yet been
118 118 # updated (maybe interrupted just after waitpid)
119 119 pids.discard(pid)
120 120 break
121 121 else:
122 122 raise
123 123 if not p:
124 124 # skip subsequent steps, because child process should
125 125 # be still running in this case
126 126 continue
127 127 pids.discard(p)
128 128 st = _exitstatus(st)
129 129 if st and not problem[0]:
130 130 problem[0] = st
131 131 def sigchldhandler(signum, frame):
132 132 waitforworkers(blocking=False)
133 133 if problem[0]:
134 134 killworkers()
135 135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 136 ui.flush()
137 parentpid = os.getpid()
137 138 for pargs in partition(args, workers):
138 pid = os.fork()
139 if pid == 0:
140 signal.signal(signal.SIGINT, oldhandler)
141 signal.signal(signal.SIGCHLD, oldchldhandler)
142
143 def workerfunc():
144 os.close(rfd)
145 for i, item in func(*(staticargs + (pargs,))):
146 os.write(wfd, '%d %s\n' % (i, item))
147 return 0
139 # make sure we use os._exit in all worker code paths. otherwise the
140 # worker may do some clean-ups which could cause surprises like
141 # deadlock. see sshpeer.cleanup for example.
142 # override error handling *before* fork. this is necessary because
143 # exception (signal) may arrive after fork, before "pid =" assignment
144 # completes, and other exception handler (dispatch.py) can lead to
145 # unexpected code path without os._exit.
146 ret = -1
147 try:
148 pid = os.fork()
149 if pid == 0:
150 signal.signal(signal.SIGINT, oldhandler)
151 signal.signal(signal.SIGCHLD, oldchldhandler)
148 152
149 # make sure we use os._exit in all code paths. otherwise the worker
150 # may do some clean-ups which could cause surprises like deadlock.
151 # see sshpeer.cleanup for example.
152 ret = 0
153 try:
153 def workerfunc():
154 os.close(rfd)
155 for i, item in func(*(staticargs + (pargs,))):
156 os.write(wfd, '%d %s\n' % (i, item))
157 return 0
158
159 ret = scmutil.callcatch(ui, workerfunc)
160 except: # parent re-raises, child never returns
161 if os.getpid() == parentpid:
162 raise
163 exctype = sys.exc_info()[0]
164 force = not issubclass(exctype, KeyboardInterrupt)
165 ui.traceback(force=force)
166 finally:
167 if os.getpid() != parentpid:
154 168 try:
155 ret = scmutil.callcatch(ui, workerfunc)
156 finally:
157 169 ui.flush()
158 except KeyboardInterrupt:
159 os._exit(255)
160 except: # never return, therefore no re-raises
161 try:
162 ui.traceback(force=True)
163 ui.flush()
170 except: # never returns, no re-raises
171 pass
164 172 finally:
165 os._exit(255)
166 else:
167 os._exit(ret & 255)
173 os._exit(ret & 255)
168 174 pids.add(pid)
169 175 os.close(wfd)
170 176 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
171 177 def cleanup():
172 178 signal.signal(signal.SIGINT, oldhandler)
173 179 waitforworkers()
174 180 signal.signal(signal.SIGCHLD, oldchldhandler)
175 181 status = problem[0]
176 182 if status:
177 183 if status < 0:
178 184 os.kill(os.getpid(), -status)
179 185 sys.exit(status)
180 186 try:
181 187 for line in util.iterfile(fp):
182 188 l = line.split(' ', 1)
183 189 yield int(l[0]), l[1][:-1]
184 190 except: # re-raises
185 191 killworkers()
186 192 cleanup()
187 193 raise
188 194 cleanup()
189 195
190 196 def _posixexitstatus(code):
191 197 '''convert a posix exit status into the same form returned by
192 198 os.spawnv
193 199
194 200 returns None if the process was stopped instead of exiting'''
195 201 if os.WIFEXITED(code):
196 202 return os.WEXITSTATUS(code)
197 203 elif os.WIFSIGNALED(code):
198 204 return -os.WTERMSIG(code)
199 205
200 206 if pycompat.osname != 'nt':
201 207 _platformworker = _posixworker
202 208 _exitstatus = _posixexitstatus
203 209
204 210 def partition(lst, nslices):
205 211 '''partition a list into N slices of roughly equal size
206 212
207 213 The current strategy takes every Nth element from the input. If
208 214 we ever write workers that need to preserve grouping in input
209 215 we should consider allowing callers to specify a partition strategy.
210 216
211 217 mpm is not a fan of this partitioning strategy when files are involved.
212 218 In his words:
213 219
214 220 Single-threaded Mercurial makes a point of creating and visiting
215 221 files in a fixed order (alphabetical). When creating files in order,
216 222 a typical filesystem is likely to allocate them on nearby regions on
217 223 disk. Thus, when revisiting in the same order, locality is maximized
218 224 and various forms of OS and disk-level caching and read-ahead get a
219 225 chance to work.
220 226
221 227 This effect can be quite significant on spinning disks. I discovered it
222 228 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
223 229 Tarring a repo and copying it to another disk effectively randomized
224 230 the revlog ordering on disk by sorting the revlogs by hash and suddenly
225 231 performance of my kernel checkout benchmark dropped by ~10x because the
226 232 "working set" of sectors visited no longer fit in the drive's cache and
227 233 the workload switched from streaming to random I/O.
228 234
229 235 What we should really be doing is have workers read filenames from a
230 236 ordered queue. This preserves locality and also keeps any worker from
231 237 getting more than one file out of balance.
232 238 '''
233 239 for i in range(nslices):
234 240 yield lst[i::nslices]
@@ -1,94 +1,126
1 1 Test UI worker interaction
2 2
3 3 $ cat > t.py <<EOF
4 4 > from __future__ import absolute_import, print_function
5 5 > from mercurial import (
6 6 > cmdutil,
7 7 > error,
8 8 > ui as uimod,
9 9 > worker,
10 10 > )
11 11 > def abort(ui, args):
12 12 > if args[0] == 0:
13 13 > # by first worker for test stability
14 14 > raise error.Abort('known exception')
15 15 > return runme(ui, [])
16 16 > def exc(ui, args):
17 17 > if args[0] == 0:
18 18 > # by first worker for test stability
19 19 > raise Exception('unknown exception')
20 20 > return runme(ui, [])
21 21 > def runme(ui, args):
22 22 > for arg in args:
23 23 > ui.status('run\n')
24 24 > yield 1, arg
25 25 > functable = {
26 26 > 'abort': abort,
27 27 > 'exc': exc,
28 28 > 'runme': runme,
29 29 > }
30 30 > cmdtable = {}
31 31 > command = cmdutil.command(cmdtable)
32 32 > @command('test', [], 'hg test [COST] [FUNC]')
33 33 > def t(ui, repo, cost=1.0, func='runme'):
34 34 > cost = float(cost)
35 35 > func = functable[func]
36 36 > ui.status('start\n')
37 37 > runs = worker.worker(ui, cost, func, (ui,), range(8))
38 38 > for n, i in runs:
39 39 > pass
40 40 > ui.status('done\n')
41 41 > EOF
42 42 $ abspath=`pwd`/t.py
43 43 $ hg init
44 44
45 45 Run tests with worker enable by forcing a heigh cost
46 46
47 47 $ hg --config "extensions.t=$abspath" test 100000.0
48 48 start
49 49 run
50 50 run
51 51 run
52 52 run
53 53 run
54 54 run
55 55 run
56 56 run
57 57 done
58 58
59 59 Run tests without worker by forcing a low cost
60 60
61 61 $ hg --config "extensions.t=$abspath" test 0.0000001
62 62 start
63 63 run
64 64 run
65 65 run
66 66 run
67 67 run
68 68 run
69 69 run
70 70 run
71 71 done
72 72
73 73 #if no-windows
74 74
75 75 Known exception should be caught, but printed if --traceback is enabled
76 76
77 77 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
78 78 > test 100000.0 abort
79 79 start
80 80 abort: known exception
81 81 [255]
82 82
83 83 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
84 84 > test 100000.0 abort --traceback 2>&1 | grep '^Traceback'
85 85 Traceback (most recent call last):
86 86 Traceback (most recent call last):
87 87
88 88 Traceback must be printed for unknown exceptions
89 89
90 90 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
91 91 > test 100000.0 exc 2>&1 | grep '^Traceback'
92 92 Traceback (most recent call last):
93 93
94 Workers should not do cleanups in all cases
95
96 $ cat > $TESTTMP/detectcleanup.py <<EOF
97 > from __future__ import absolute_import
98 > import atexit
99 > import os
100 > import time
101 > oldfork = os.fork
102 > count = 0
103 > parentpid = os.getpid()
104 > def delayedfork():
105 > global count
106 > count += 1
107 > pid = oldfork()
108 > # make it easier to test SIGTERM hitting other workers when they have
109 > # not set up error handling yet.
110 > if count > 1 and pid == 0:
111 > time.sleep(0.1)
112 > return pid
113 > os.fork = delayedfork
114 > def cleanup():
115 > if os.getpid() != parentpid:
116 > os.write(1, 'should never happen\n')
117 > atexit.register(cleanup)
118 > EOF
119
120 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
121 > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
122 start
123 abort: known exception
124 [255]
125
94 126 #endif
General Comments 0
You need to be logged in to leave comments. Login now