##// END OF EJS Templates
worker: print traceback for uncaught exception unconditionally...
Yuya Nishihara -
r32043:b844d0d3 default
parent child Browse files
Show More
@@ -1,234 +1,234 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 pycompat,
19 pycompat,
20 scmutil,
20 scmutil,
21 util,
21 util,
22 )
22 )
23
23
24 def countcpus():
24 def countcpus():
25 '''try to count the number of CPUs on the system'''
25 '''try to count the number of CPUs on the system'''
26
26
27 # posix
27 # posix
28 try:
28 try:
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 if n > 0:
30 if n > 0:
31 return n
31 return n
32 except (AttributeError, ValueError):
32 except (AttributeError, ValueError):
33 pass
33 pass
34
34
35 # windows
35 # windows
36 try:
36 try:
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (KeyError, ValueError):
40 except (KeyError, ValueError):
41 pass
41 pass
42
42
43 return 1
43 return 1
44
44
45 def _numworkers(ui):
45 def _numworkers(ui):
46 s = ui.config('worker', 'numcpus')
46 s = ui.config('worker', 'numcpus')
47 if s:
47 if s:
48 try:
48 try:
49 n = int(s)
49 n = int(s)
50 if n >= 1:
50 if n >= 1:
51 return n
51 return n
52 except ValueError:
52 except ValueError:
53 raise error.Abort(_('number of cpus must be an integer'))
53 raise error.Abort(_('number of cpus must be an integer'))
54 return min(max(countcpus(), 4), 32)
54 return min(max(countcpus(), 4), 32)
55
55
56 if pycompat.osname == 'posix':
56 if pycompat.osname == 'posix':
57 _startupcost = 0.01
57 _startupcost = 0.01
58 else:
58 else:
59 _startupcost = 1e30
59 _startupcost = 1e30
60
60
61 def worthwhile(ui, costperop, nops):
61 def worthwhile(ui, costperop, nops):
62 '''try to determine whether the benefit of multiple processes can
62 '''try to determine whether the benefit of multiple processes can
63 outweigh the cost of starting them'''
63 outweigh the cost of starting them'''
64 linear = costperop * nops
64 linear = costperop * nops
65 workers = _numworkers(ui)
65 workers = _numworkers(ui)
66 benefit = linear - (_startupcost * workers + linear / workers)
66 benefit = linear - (_startupcost * workers + linear / workers)
67 return benefit >= 0.15
67 return benefit >= 0.15
68
68
69 def worker(ui, costperarg, func, staticargs, args):
69 def worker(ui, costperarg, func, staticargs, args):
70 '''run a function, possibly in parallel in multiple worker
70 '''run a function, possibly in parallel in multiple worker
71 processes.
71 processes.
72
72
73 returns a progress iterator
73 returns a progress iterator
74
74
75 costperarg - cost of a single task
75 costperarg - cost of a single task
76
76
77 func - function to run
77 func - function to run
78
78
79 staticargs - arguments to pass to every invocation of the function
79 staticargs - arguments to pass to every invocation of the function
80
80
81 args - arguments to split into chunks, to pass to individual
81 args - arguments to split into chunks, to pass to individual
82 workers
82 workers
83 '''
83 '''
84 if worthwhile(ui, costperarg, len(args)):
84 if worthwhile(ui, costperarg, len(args)):
85 return _platformworker(ui, func, staticargs, args)
85 return _platformworker(ui, func, staticargs, args)
86 return func(*staticargs + (args,))
86 return func(*staticargs + (args,))
87
87
88 def _posixworker(ui, func, staticargs, args):
88 def _posixworker(ui, func, staticargs, args):
89 rfd, wfd = os.pipe()
89 rfd, wfd = os.pipe()
90 workers = _numworkers(ui)
90 workers = _numworkers(ui)
91 oldhandler = signal.getsignal(signal.SIGINT)
91 oldhandler = signal.getsignal(signal.SIGINT)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 pids, problem = set(), [0]
93 pids, problem = set(), [0]
94 def killworkers():
94 def killworkers():
95 # unregister SIGCHLD handler as all children will be killed. This
95 # unregister SIGCHLD handler as all children will be killed. This
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # could be updated while iterating, which would cause inconsistency.
97 # could be updated while iterating, which would cause inconsistency.
98 signal.signal(signal.SIGCHLD, oldchldhandler)
98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 # if one worker bails, there's no good reason to wait for the rest
99 # if one worker bails, there's no good reason to wait for the rest
100 for p in pids:
100 for p in pids:
101 try:
101 try:
102 os.kill(p, signal.SIGTERM)
102 os.kill(p, signal.SIGTERM)
103 except OSError as err:
103 except OSError as err:
104 if err.errno != errno.ESRCH:
104 if err.errno != errno.ESRCH:
105 raise
105 raise
106 def waitforworkers(blocking=True):
106 def waitforworkers(blocking=True):
107 for pid in pids.copy():
107 for pid in pids.copy():
108 p = st = 0
108 p = st = 0
109 while True:
109 while True:
110 try:
110 try:
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 break
112 break
113 except OSError as e:
113 except OSError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 elif e.errno == errno.ECHILD:
116 elif e.errno == errno.ECHILD:
117 # child would already be reaped, but pids yet been
117 # child would already be reaped, but pids yet been
118 # updated (maybe interrupted just after waitpid)
118 # updated (maybe interrupted just after waitpid)
119 pids.discard(pid)
119 pids.discard(pid)
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 if not p:
123 if not p:
124 # skip subsequent steps, because child process should
124 # skip subsequent steps, because child process should
125 # be still running in this case
125 # be still running in this case
126 continue
126 continue
127 pids.discard(p)
127 pids.discard(p)
128 st = _exitstatus(st)
128 st = _exitstatus(st)
129 if st and not problem[0]:
129 if st and not problem[0]:
130 problem[0] = st
130 problem[0] = st
131 def sigchldhandler(signum, frame):
131 def sigchldhandler(signum, frame):
132 waitforworkers(blocking=False)
132 waitforworkers(blocking=False)
133 if problem[0]:
133 if problem[0]:
134 killworkers()
134 killworkers()
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 ui.flush()
136 ui.flush()
137 for pargs in partition(args, workers):
137 for pargs in partition(args, workers):
138 pid = os.fork()
138 pid = os.fork()
139 if pid == 0:
139 if pid == 0:
140 signal.signal(signal.SIGINT, oldhandler)
140 signal.signal(signal.SIGINT, oldhandler)
141 signal.signal(signal.SIGCHLD, oldchldhandler)
141 signal.signal(signal.SIGCHLD, oldchldhandler)
142
142
143 def workerfunc():
143 def workerfunc():
144 os.close(rfd)
144 os.close(rfd)
145 for i, item in func(*(staticargs + (pargs,))):
145 for i, item in func(*(staticargs + (pargs,))):
146 os.write(wfd, '%d %s\n' % (i, item))
146 os.write(wfd, '%d %s\n' % (i, item))
147 return 0
147 return 0
148
148
149 # make sure we use os._exit in all code paths. otherwise the worker
149 # make sure we use os._exit in all code paths. otherwise the worker
150 # may do some clean-ups which could cause surprises like deadlock.
150 # may do some clean-ups which could cause surprises like deadlock.
151 # see sshpeer.cleanup for example.
151 # see sshpeer.cleanup for example.
152 ret = 0
152 ret = 0
153 try:
153 try:
154 try:
154 try:
155 ret = scmutil.callcatch(ui, workerfunc)
155 ret = scmutil.callcatch(ui, workerfunc)
156 finally:
156 finally:
157 ui.flush()
157 ui.flush()
158 except KeyboardInterrupt:
158 except KeyboardInterrupt:
159 os._exit(255)
159 os._exit(255)
160 except: # never return, therefore no re-raises
160 except: # never return, therefore no re-raises
161 try:
161 try:
162 ui.traceback()
162 ui.traceback(force=True)
163 ui.flush()
163 ui.flush()
164 finally:
164 finally:
165 os._exit(255)
165 os._exit(255)
166 else:
166 else:
167 os._exit(ret & 255)
167 os._exit(ret & 255)
168 pids.add(pid)
168 pids.add(pid)
169 os.close(wfd)
169 os.close(wfd)
170 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
170 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
171 def cleanup():
171 def cleanup():
172 signal.signal(signal.SIGINT, oldhandler)
172 signal.signal(signal.SIGINT, oldhandler)
173 waitforworkers()
173 waitforworkers()
174 signal.signal(signal.SIGCHLD, oldchldhandler)
174 signal.signal(signal.SIGCHLD, oldchldhandler)
175 status = problem[0]
175 status = problem[0]
176 if status:
176 if status:
177 if status < 0:
177 if status < 0:
178 os.kill(os.getpid(), -status)
178 os.kill(os.getpid(), -status)
179 sys.exit(status)
179 sys.exit(status)
180 try:
180 try:
181 for line in util.iterfile(fp):
181 for line in util.iterfile(fp):
182 l = line.split(' ', 1)
182 l = line.split(' ', 1)
183 yield int(l[0]), l[1][:-1]
183 yield int(l[0]), l[1][:-1]
184 except: # re-raises
184 except: # re-raises
185 killworkers()
185 killworkers()
186 cleanup()
186 cleanup()
187 raise
187 raise
188 cleanup()
188 cleanup()
189
189
190 def _posixexitstatus(code):
190 def _posixexitstatus(code):
191 '''convert a posix exit status into the same form returned by
191 '''convert a posix exit status into the same form returned by
192 os.spawnv
192 os.spawnv
193
193
194 returns None if the process was stopped instead of exiting'''
194 returns None if the process was stopped instead of exiting'''
195 if os.WIFEXITED(code):
195 if os.WIFEXITED(code):
196 return os.WEXITSTATUS(code)
196 return os.WEXITSTATUS(code)
197 elif os.WIFSIGNALED(code):
197 elif os.WIFSIGNALED(code):
198 return -os.WTERMSIG(code)
198 return -os.WTERMSIG(code)
199
199
200 if pycompat.osname != 'nt':
200 if pycompat.osname != 'nt':
201 _platformworker = _posixworker
201 _platformworker = _posixworker
202 _exitstatus = _posixexitstatus
202 _exitstatus = _posixexitstatus
203
203
204 def partition(lst, nslices):
204 def partition(lst, nslices):
205 '''partition a list into N slices of roughly equal size
205 '''partition a list into N slices of roughly equal size
206
206
207 The current strategy takes every Nth element from the input. If
207 The current strategy takes every Nth element from the input. If
208 we ever write workers that need to preserve grouping in input
208 we ever write workers that need to preserve grouping in input
209 we should consider allowing callers to specify a partition strategy.
209 we should consider allowing callers to specify a partition strategy.
210
210
211 mpm is not a fan of this partitioning strategy when files are involved.
211 mpm is not a fan of this partitioning strategy when files are involved.
212 In his words:
212 In his words:
213
213
214 Single-threaded Mercurial makes a point of creating and visiting
214 Single-threaded Mercurial makes a point of creating and visiting
215 files in a fixed order (alphabetical). When creating files in order,
215 files in a fixed order (alphabetical). When creating files in order,
216 a typical filesystem is likely to allocate them on nearby regions on
216 a typical filesystem is likely to allocate them on nearby regions on
217 disk. Thus, when revisiting in the same order, locality is maximized
217 disk. Thus, when revisiting in the same order, locality is maximized
218 and various forms of OS and disk-level caching and read-ahead get a
218 and various forms of OS and disk-level caching and read-ahead get a
219 chance to work.
219 chance to work.
220
220
221 This effect can be quite significant on spinning disks. I discovered it
221 This effect can be quite significant on spinning disks. I discovered it
222 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
222 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
223 Tarring a repo and copying it to another disk effectively randomized
223 Tarring a repo and copying it to another disk effectively randomized
224 the revlog ordering on disk by sorting the revlogs by hash and suddenly
224 the revlog ordering on disk by sorting the revlogs by hash and suddenly
225 performance of my kernel checkout benchmark dropped by ~10x because the
225 performance of my kernel checkout benchmark dropped by ~10x because the
226 "working set" of sectors visited no longer fit in the drive's cache and
226 "working set" of sectors visited no longer fit in the drive's cache and
227 the workload switched from streaming to random I/O.
227 the workload switched from streaming to random I/O.
228
228
229 What we should really be doing is have workers read filenames from a
229 What we should really be doing is have workers read filenames from a
230 ordered queue. This preserves locality and also keeps any worker from
230 ordered queue. This preserves locality and also keeps any worker from
231 getting more than one file out of balance.
231 getting more than one file out of balance.
232 '''
232 '''
233 for i in range(nslices):
233 for i in range(nslices):
234 yield lst[i::nslices]
234 yield lst[i::nslices]
@@ -1,78 +1,90 b''
1 Test UI worker interaction
1 Test UI worker interaction
2
2
3 $ cat > t.py <<EOF
3 $ cat > t.py <<EOF
4 > from __future__ import absolute_import, print_function
4 > from __future__ import absolute_import, print_function
5 > from mercurial import (
5 > from mercurial import (
6 > cmdutil,
6 > cmdutil,
7 > error,
7 > error,
8 > ui as uimod,
8 > ui as uimod,
9 > worker,
9 > worker,
10 > )
10 > )
11 > def abort(ui, args):
11 > def abort(ui, args):
12 > if args[0] == 0:
12 > if args[0] == 0:
13 > # by first worker for test stability
13 > # by first worker for test stability
14 > raise error.Abort('known exception')
14 > raise error.Abort('known exception')
15 > return runme(ui, [])
15 > return runme(ui, [])
16 > def exc(ui, args):
17 > if args[0] == 0:
18 > # by first worker for test stability
19 > raise Exception('unknown exception')
20 > return runme(ui, [])
16 > def runme(ui, args):
21 > def runme(ui, args):
17 > for arg in args:
22 > for arg in args:
18 > ui.status('run\n')
23 > ui.status('run\n')
19 > yield 1, arg
24 > yield 1, arg
20 > functable = {
25 > functable = {
21 > 'abort': abort,
26 > 'abort': abort,
27 > 'exc': exc,
22 > 'runme': runme,
28 > 'runme': runme,
23 > }
29 > }
24 > cmdtable = {}
30 > cmdtable = {}
25 > command = cmdutil.command(cmdtable)
31 > command = cmdutil.command(cmdtable)
26 > @command('test', [], 'hg test [COST] [FUNC]')
32 > @command('test', [], 'hg test [COST] [FUNC]')
27 > def t(ui, repo, cost=1.0, func='runme'):
33 > def t(ui, repo, cost=1.0, func='runme'):
28 > cost = float(cost)
34 > cost = float(cost)
29 > func = functable[func]
35 > func = functable[func]
30 > ui.status('start\n')
36 > ui.status('start\n')
31 > runs = worker.worker(ui, cost, func, (ui,), range(8))
37 > runs = worker.worker(ui, cost, func, (ui,), range(8))
32 > for n, i in runs:
38 > for n, i in runs:
33 > pass
39 > pass
34 > ui.status('done\n')
40 > ui.status('done\n')
35 > EOF
41 > EOF
36 $ abspath=`pwd`/t.py
42 $ abspath=`pwd`/t.py
37 $ hg init
43 $ hg init
38
44
39 Run tests with worker enable by forcing a heigh cost
45 Run tests with worker enable by forcing a heigh cost
40
46
41 $ hg --config "extensions.t=$abspath" test 100000.0
47 $ hg --config "extensions.t=$abspath" test 100000.0
42 start
48 start
43 run
49 run
44 run
50 run
45 run
51 run
46 run
52 run
47 run
53 run
48 run
54 run
49 run
55 run
50 run
56 run
51 done
57 done
52
58
53 Run tests without worker by forcing a low cost
59 Run tests without worker by forcing a low cost
54
60
55 $ hg --config "extensions.t=$abspath" test 0.0000001
61 $ hg --config "extensions.t=$abspath" test 0.0000001
56 start
62 start
57 run
63 run
58 run
64 run
59 run
65 run
60 run
66 run
61 run
67 run
62 run
68 run
63 run
69 run
64 run
70 run
65 done
71 done
66
72
67 Known exception should be caught, but printed if --traceback is enabled
73 Known exception should be caught, but printed if --traceback is enabled
68
74
69 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
75 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
70 > test 100000.0 abort
76 > test 100000.0 abort
71 start
77 start
72 abort: known exception
78 abort: known exception
73 [255]
79 [255]
74
80
75 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
81 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
76 > test 100000.0 abort --traceback 2>&1 | grep '^Traceback'
82 > test 100000.0 abort --traceback 2>&1 | grep '^Traceback'
77 Traceback (most recent call last):
83 Traceback (most recent call last):
78 Traceback (most recent call last):
84 Traceback (most recent call last):
85
86 Traceback must be printed for unknown exceptions
87
88 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=2' \
89 > test 100000.0 exc 2>&1 | grep '^Traceback'
90 Traceback (most recent call last):
General Comments 0
You need to be logged in to leave comments. Login now