##// END OF EJS Templates
worker: print traceback for uncaught exception unconditionally...
Yuya Nishihara -
r32043:b844d0d3 default
parent child Browse files
Show More
@@ -1,234 +1,234 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 pycompat,
20 20 scmutil,
21 21 util,
22 22 )
23 23
24 24 def countcpus():
25 25 '''try to count the number of CPUs on the system'''
26 26
27 27 # posix
28 28 try:
29 29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 30 if n > 0:
31 31 return n
32 32 except (AttributeError, ValueError):
33 33 pass
34 34
35 35 # windows
36 36 try:
37 37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 38 if n > 0:
39 39 return n
40 40 except (KeyError, ValueError):
41 41 pass
42 42
43 43 return 1
44 44
45 45 def _numworkers(ui):
46 46 s = ui.config('worker', 'numcpus')
47 47 if s:
48 48 try:
49 49 n = int(s)
50 50 if n >= 1:
51 51 return n
52 52 except ValueError:
53 53 raise error.Abort(_('number of cpus must be an integer'))
54 54 return min(max(countcpus(), 4), 32)
55 55
56 56 if pycompat.osname == 'posix':
57 57 _startupcost = 0.01
58 58 else:
59 59 _startupcost = 1e30
60 60
61 61 def worthwhile(ui, costperop, nops):
62 62 '''try to determine whether the benefit of multiple processes can
63 63 outweigh the cost of starting them'''
64 64 linear = costperop * nops
65 65 workers = _numworkers(ui)
66 66 benefit = linear - (_startupcost * workers + linear / workers)
67 67 return benefit >= 0.15
68 68
69 69 def worker(ui, costperarg, func, staticargs, args):
70 70 '''run a function, possibly in parallel in multiple worker
71 71 processes.
72 72
73 73 returns a progress iterator
74 74
75 75 costperarg - cost of a single task
76 76
77 77 func - function to run
78 78
79 79 staticargs - arguments to pass to every invocation of the function
80 80
81 81 args - arguments to split into chunks, to pass to individual
82 82 workers
83 83 '''
84 84 if worthwhile(ui, costperarg, len(args)):
85 85 return _platformworker(ui, func, staticargs, args)
86 86 return func(*staticargs + (args,))
87 87
88 88 def _posixworker(ui, func, staticargs, args):
89 89 rfd, wfd = os.pipe()
90 90 workers = _numworkers(ui)
91 91 oldhandler = signal.getsignal(signal.SIGINT)
92 92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 93 pids, problem = set(), [0]
94 94 def killworkers():
95 95 # unregister SIGCHLD handler as all children will be killed. This
96 96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 97 # could be updated while iterating, which would cause inconsistency.
98 98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 99 # if one worker bails, there's no good reason to wait for the rest
100 100 for p in pids:
101 101 try:
102 102 os.kill(p, signal.SIGTERM)
103 103 except OSError as err:
104 104 if err.errno != errno.ESRCH:
105 105 raise
106 106 def waitforworkers(blocking=True):
107 107 for pid in pids.copy():
108 108 p = st = 0
109 109 while True:
110 110 try:
111 111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 112 break
113 113 except OSError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 elif e.errno == errno.ECHILD:
117 117 # child would already be reaped, but pids yet been
118 118 # updated (maybe interrupted just after waitpid)
119 119 pids.discard(pid)
120 120 break
121 121 else:
122 122 raise
123 123 if not p:
124 124 # skip subsequent steps, because child process should
125 125 # be still running in this case
126 126 continue
127 127 pids.discard(p)
128 128 st = _exitstatus(st)
129 129 if st and not problem[0]:
130 130 problem[0] = st
131 131 def sigchldhandler(signum, frame):
132 132 waitforworkers(blocking=False)
133 133 if problem[0]:
134 134 killworkers()
135 135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 136 ui.flush()
137 137 for pargs in partition(args, workers):
138 138 pid = os.fork()
139 139 if pid == 0:
140 140 signal.signal(signal.SIGINT, oldhandler)
141 141 signal.signal(signal.SIGCHLD, oldchldhandler)
142 142
143 143 def workerfunc():
144 144 os.close(rfd)
145 145 for i, item in func(*(staticargs + (pargs,))):
146 146 os.write(wfd, '%d %s\n' % (i, item))
147 147 return 0
148 148
149 149 # make sure we use os._exit in all code paths. otherwise the worker
150 150 # may do some clean-ups which could cause surprises like deadlock.
151 151 # see sshpeer.cleanup for example.
152 152 ret = 0
153 153 try:
154 154 try:
155 155 ret = scmutil.callcatch(ui, workerfunc)
156 156 finally:
157 157 ui.flush()
158 158 except KeyboardInterrupt:
159 159 os._exit(255)
160 160 except: # never return, therefore no re-raises
161 161 try:
162 ui.traceback()
162 ui.traceback(force=True)
163 163 ui.flush()
164 164 finally:
165 165 os._exit(255)
166 166 else:
167 167 os._exit(ret & 255)
168 168 pids.add(pid)
169 169 os.close(wfd)
170 170 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
171 171 def cleanup():
172 172 signal.signal(signal.SIGINT, oldhandler)
173 173 waitforworkers()
174 174 signal.signal(signal.SIGCHLD, oldchldhandler)
175 175 status = problem[0]
176 176 if status:
177 177 if status < 0:
178 178 os.kill(os.getpid(), -status)
179 179 sys.exit(status)
180 180 try:
181 181 for line in util.iterfile(fp):
182 182 l = line.split(' ', 1)
183 183 yield int(l[0]), l[1][:-1]
184 184 except: # re-raises
185 185 killworkers()
186 186 cleanup()
187 187 raise
188 188 cleanup()
189 189
190 190 def _posixexitstatus(code):
191 191 '''convert a posix exit status into the same form returned by
192 192 os.spawnv
193 193
194 194 returns None if the process was stopped instead of exiting'''
195 195 if os.WIFEXITED(code):
196 196 return os.WEXITSTATUS(code)
197 197 elif os.WIFSIGNALED(code):
198 198 return -os.WTERMSIG(code)
199 199
200 200 if pycompat.osname != 'nt':
201 201 _platformworker = _posixworker
202 202 _exitstatus = _posixexitstatus
203 203
204 204 def partition(lst, nslices):
205 205 '''partition a list into N slices of roughly equal size
206 206
207 207 The current strategy takes every Nth element from the input. If
208 208 we ever write workers that need to preserve grouping in input
209 209 we should consider allowing callers to specify a partition strategy.
210 210
211 211 mpm is not a fan of this partitioning strategy when files are involved.
212 212 In his words:
213 213
214 214 Single-threaded Mercurial makes a point of creating and visiting
215 215 files in a fixed order (alphabetical). When creating files in order,
216 216 a typical filesystem is likely to allocate them on nearby regions on
217 217 disk. Thus, when revisiting in the same order, locality is maximized
218 218 and various forms of OS and disk-level caching and read-ahead get a
219 219 chance to work.
220 220
221 221 This effect can be quite significant on spinning disks. I discovered it
222 222 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
223 223 Tarring a repo and copying it to another disk effectively randomized
224 224 the revlog ordering on disk by sorting the revlogs by hash and suddenly
225 225 performance of my kernel checkout benchmark dropped by ~10x because the
226 226 "working set" of sectors visited no longer fit in the drive's cache and
227 227 the workload switched from streaming to random I/O.
228 228
229 229 What we should really be doing is have workers read filenames from a
230 230 ordered queue. This preserves locality and also keeps any worker from
231 231 getting more than one file out of balance.
232 232 '''
233 233 for i in range(nslices):
234 234 yield lst[i::nslices]
@@ -1,78 +1,90 b''
1 1 Test UI worker interaction
2 2
3 3 $ cat > t.py <<EOF
4 4 > from __future__ import absolute_import, print_function
5 5 > from mercurial import (
6 6 > cmdutil,
7 7 > error,
8 8 > ui as uimod,
9 9 > worker,
10 10 > )
11 11 > def abort(ui, args):
12 12 > if args[0] == 0:
13 13 > # by first worker for test stability
14 14 > raise error.Abort('known exception')
15 15 > return runme(ui, [])
16 > def exc(ui, args):
17 > if args[0] == 0:
18 > # by first worker for test stability
19 > raise Exception('unknown exception')
20 > return runme(ui, [])
16 21 > def runme(ui, args):
17 22 > for arg in args:
18 23 > ui.status('run\n')
19 24 > yield 1, arg
20 25 > functable = {
21 26 > 'abort': abort,
27 > 'exc': exc,
22 28 > 'runme': runme,
23 29 > }
24 30 > cmdtable = {}
25 31 > command = cmdutil.command(cmdtable)
26 32 > @command('test', [], 'hg test [COST] [FUNC]')
27 33 > def t(ui, repo, cost=1.0, func='runme'):
28 34 > cost = float(cost)
29 35 > func = functable[func]
30 36 > ui.status('start\n')
31 37 > runs = worker.worker(ui, cost, func, (ui,), range(8))
32 38 > for n, i in runs:
33 39 > pass
34 40 > ui.status('done\n')
35 41 > EOF
36 42 $ abspath=`pwd`/t.py
37 43 $ hg init
38 44
39 45 Run tests with worker enable by forcing a heigh cost
40 46
41 47 $ hg --config "extensions.t=$abspath" test 100000.0
42 48 start
43 49 run
44 50 run
45 51 run
46 52 run
47 53 run
48 54 run
49 55 run
50 56 run
51 57 done
52 58
53 59 Run tests without worker by forcing a low cost
54 60
55 61 $ hg --config "extensions.t=$abspath" test 0.0000001
56 62 start
57 63 run
58 64 run
59 65 run
60 66 run
61 67 run
62 68 run
63 69 run
64 70 run
65 71 done
66 72
67 73 Known exception should be caught, but printed if --traceback is enabled
68 74
69 75 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
70 76 > test 100000.0 abort
71 77 start
72 78 abort: known exception
73 79 [255]
74 80
75 81 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
76 82 > test 100000.0 abort --traceback 2>&1 | grep '^Traceback'
77 83 Traceback (most recent call last):
78 84 Traceback (most recent call last):
85
86 Traceback must be printed for unknown exceptions
87
88 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
89 > test 100000.0 exc 2>&1 | grep '^Traceback'
90 Traceback (most recent call last):
General Comments 0
You need to be logged in to leave comments. Login now