##// END OF EJS Templates
worker: propagate exit code to main process...
Yuya Nishihara -
r32042:8f8ad013 default
parent child Browse files
Show More
@@ -1,232 +1,234
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 pycompat,
20 20 scmutil,
21 21 util,
22 22 )
23 23
24 24 def countcpus():
25 25 '''try to count the number of CPUs on the system'''
26 26
27 27 # posix
28 28 try:
29 29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 30 if n > 0:
31 31 return n
32 32 except (AttributeError, ValueError):
33 33 pass
34 34
35 35 # windows
36 36 try:
37 37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 38 if n > 0:
39 39 return n
40 40 except (KeyError, ValueError):
41 41 pass
42 42
43 43 return 1
44 44
45 45 def _numworkers(ui):
46 46 s = ui.config('worker', 'numcpus')
47 47 if s:
48 48 try:
49 49 n = int(s)
50 50 if n >= 1:
51 51 return n
52 52 except ValueError:
53 53 raise error.Abort(_('number of cpus must be an integer'))
54 54 return min(max(countcpus(), 4), 32)
55 55
56 56 if pycompat.osname == 'posix':
57 57 _startupcost = 0.01
58 58 else:
59 59 _startupcost = 1e30
60 60
61 61 def worthwhile(ui, costperop, nops):
62 62 '''try to determine whether the benefit of multiple processes can
63 63 outweigh the cost of starting them'''
64 64 linear = costperop * nops
65 65 workers = _numworkers(ui)
66 66 benefit = linear - (_startupcost * workers + linear / workers)
67 67 return benefit >= 0.15
68 68
69 69 def worker(ui, costperarg, func, staticargs, args):
70 70 '''run a function, possibly in parallel in multiple worker
71 71 processes.
72 72
73 73 returns a progress iterator
74 74
75 75 costperarg - cost of a single task
76 76
77 77 func - function to run
78 78
79 79 staticargs - arguments to pass to every invocation of the function
80 80
81 81 args - arguments to split into chunks, to pass to individual
82 82 workers
83 83 '''
84 84 if worthwhile(ui, costperarg, len(args)):
85 85 return _platformworker(ui, func, staticargs, args)
86 86 return func(*staticargs + (args,))
87 87
88 88 def _posixworker(ui, func, staticargs, args):
89 89 rfd, wfd = os.pipe()
90 90 workers = _numworkers(ui)
91 91 oldhandler = signal.getsignal(signal.SIGINT)
92 92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 93 pids, problem = set(), [0]
94 94 def killworkers():
95 95 # unregister SIGCHLD handler as all children will be killed. This
96 96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 97 # could be updated while iterating, which would cause inconsistency.
98 98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 99 # if one worker bails, there's no good reason to wait for the rest
100 100 for p in pids:
101 101 try:
102 102 os.kill(p, signal.SIGTERM)
103 103 except OSError as err:
104 104 if err.errno != errno.ESRCH:
105 105 raise
106 106 def waitforworkers(blocking=True):
107 107 for pid in pids.copy():
108 108 p = st = 0
109 109 while True:
110 110 try:
111 111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 112 break
113 113 except OSError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 elif e.errno == errno.ECHILD:
117 117 # child would already be reaped, but pids yet been
118 118 # updated (maybe interrupted just after waitpid)
119 119 pids.discard(pid)
120 120 break
121 121 else:
122 122 raise
123 123 if not p:
124 124 # skip subsequent steps, because child process should
125 125 # be still running in this case
126 126 continue
127 127 pids.discard(p)
128 128 st = _exitstatus(st)
129 129 if st and not problem[0]:
130 130 problem[0] = st
131 131 def sigchldhandler(signum, frame):
132 132 waitforworkers(blocking=False)
133 133 if problem[0]:
134 134 killworkers()
135 135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 136 ui.flush()
137 137 for pargs in partition(args, workers):
138 138 pid = os.fork()
139 139 if pid == 0:
140 140 signal.signal(signal.SIGINT, oldhandler)
141 141 signal.signal(signal.SIGCHLD, oldchldhandler)
142 142
143 143 def workerfunc():
144 144 os.close(rfd)
145 145 for i, item in func(*(staticargs + (pargs,))):
146 146 os.write(wfd, '%d %s\n' % (i, item))
147 return 0
147 148
148 149 # make sure we use os._exit in all code paths. otherwise the worker
149 150 # may do some clean-ups which could cause surprises like deadlock.
150 151 # see sshpeer.cleanup for example.
152 ret = 0
151 153 try:
152 154 try:
153 scmutil.callcatch(ui, workerfunc)
155 ret = scmutil.callcatch(ui, workerfunc)
154 156 finally:
155 157 ui.flush()
156 158 except KeyboardInterrupt:
157 159 os._exit(255)
158 160 except: # never return, therefore no re-raises
159 161 try:
160 162 ui.traceback()
161 163 ui.flush()
162 164 finally:
163 165 os._exit(255)
164 166 else:
165 os._exit(0)
167 os._exit(ret & 255)
166 168 pids.add(pid)
167 169 os.close(wfd)
168 170 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
169 171 def cleanup():
170 172 signal.signal(signal.SIGINT, oldhandler)
171 173 waitforworkers()
172 174 signal.signal(signal.SIGCHLD, oldchldhandler)
173 175 status = problem[0]
174 176 if status:
175 177 if status < 0:
176 178 os.kill(os.getpid(), -status)
177 179 sys.exit(status)
178 180 try:
179 181 for line in util.iterfile(fp):
180 182 l = line.split(' ', 1)
181 183 yield int(l[0]), l[1][:-1]
182 184 except: # re-raises
183 185 killworkers()
184 186 cleanup()
185 187 raise
186 188 cleanup()
187 189
188 190 def _posixexitstatus(code):
189 191 '''convert a posix exit status into the same form returned by
190 192 os.spawnv
191 193
192 194 returns None if the process was stopped instead of exiting'''
193 195 if os.WIFEXITED(code):
194 196 return os.WEXITSTATUS(code)
195 197 elif os.WIFSIGNALED(code):
196 198 return -os.WTERMSIG(code)
197 199
198 200 if pycompat.osname != 'nt':
199 201 _platformworker = _posixworker
200 202 _exitstatus = _posixexitstatus
201 203
202 204 def partition(lst, nslices):
203 205 '''partition a list into N slices of roughly equal size
204 206
205 207 The current strategy takes every Nth element from the input. If
206 208 we ever write workers that need to preserve grouping in input
207 209 we should consider allowing callers to specify a partition strategy.
208 210
209 211 mpm is not a fan of this partitioning strategy when files are involved.
210 212 In his words:
211 213
212 214 Single-threaded Mercurial makes a point of creating and visiting
213 215 files in a fixed order (alphabetical). When creating files in order,
214 216 a typical filesystem is likely to allocate them on nearby regions on
215 217 disk. Thus, when revisiting in the same order, locality is maximized
216 218 and various forms of OS and disk-level caching and read-ahead get a
217 219 chance to work.
218 220
219 221 This effect can be quite significant on spinning disks. I discovered it
220 222 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
221 223 Tarring a repo and copying it to another disk effectively randomized
222 224 the revlog ordering on disk by sorting the revlogs by hash and suddenly
223 225 performance of my kernel checkout benchmark dropped by ~10x because the
224 226 "working set" of sectors visited no longer fit in the drive's cache and
225 227 the workload switched from streaming to random I/O.
226 228
227 229 What we should really be doing is have workers read filenames from a
228 230 ordered queue. This preserves locality and also keeps any worker from
229 231 getting more than one file out of balance.
230 232 '''
231 233 for i in range(nslices):
232 234 yield lst[i::nslices]
@@ -1,77 +1,78
1 1 Test UI worker interaction
2 2
3 3 $ cat > t.py <<EOF
4 4 > from __future__ import absolute_import, print_function
5 5 > from mercurial import (
6 6 > cmdutil,
7 7 > error,
8 8 > ui as uimod,
9 9 > worker,
10 10 > )
11 11 > def abort(ui, args):
12 12 > if args[0] == 0:
13 13 > # by first worker for test stability
14 14 > raise error.Abort('known exception')
15 15 > return runme(ui, [])
16 16 > def runme(ui, args):
17 17 > for arg in args:
18 18 > ui.status('run\n')
19 19 > yield 1, arg
20 20 > functable = {
21 21 > 'abort': abort,
22 22 > 'runme': runme,
23 23 > }
24 24 > cmdtable = {}
25 25 > command = cmdutil.command(cmdtable)
26 26 > @command('test', [], 'hg test [COST] [FUNC]')
27 27 > def t(ui, repo, cost=1.0, func='runme'):
28 28 > cost = float(cost)
29 29 > func = functable[func]
30 30 > ui.status('start\n')
31 31 > runs = worker.worker(ui, cost, func, (ui,), range(8))
32 32 > for n, i in runs:
33 33 > pass
34 34 > ui.status('done\n')
35 35 > EOF
36 36 $ abspath=`pwd`/t.py
37 37 $ hg init
38 38
39 39 Run tests with worker enable by forcing a heigh cost
40 40
41 41 $ hg --config "extensions.t=$abspath" test 100000.0
42 42 start
43 43 run
44 44 run
45 45 run
46 46 run
47 47 run
48 48 run
49 49 run
50 50 run
51 51 done
52 52
53 53 Run tests without worker by forcing a low cost
54 54
55 55 $ hg --config "extensions.t=$abspath" test 0.0000001
56 56 start
57 57 run
58 58 run
59 59 run
60 60 run
61 61 run
62 62 run
63 63 run
64 64 run
65 65 done
66 66
67 67 Known exception should be caught, but printed if --traceback is enabled
68 68
69 69 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
70 70 > test 100000.0 abort
71 71 start
72 72 abort: known exception
73 done
73 [255]
74 74
75 75 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
76 76 > test 100000.0 abort --traceback 2>&1 | grep '^Traceback'
77 77 Traceback (most recent call last):
78 Traceback (most recent call last):
General Comments 0
You need to be logged in to leave comments. Login now