##// END OF EJS Templates
worker: properly report errors from worker processes (issue3982)
Matt Mackall -
r19408:c7ec39c1 default
parent child Browse files
Show More
@@ -1,160 +1,157
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import errno, os, signal, sys, threading, util
9 import errno, os, signal, sys, threading, util
10
10
11 def countcpus():
11 def countcpus():
12 '''try to count the number of CPUs on the system'''
12 '''try to count the number of CPUs on the system'''
13
13
14 # posix
14 # posix
15 try:
15 try:
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
17 if n > 0:
17 if n > 0:
18 return n
18 return n
19 except (AttributeError, ValueError):
19 except (AttributeError, ValueError):
20 pass
20 pass
21
21
22 # windows
22 # windows
23 try:
23 try:
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
25 if n > 0:
25 if n > 0:
26 return n
26 return n
27 except (KeyError, ValueError):
27 except (KeyError, ValueError):
28 pass
28 pass
29
29
30 return 1
30 return 1
31
31
32 def _numworkers(ui):
32 def _numworkers(ui):
33 s = ui.config('worker', 'numcpus')
33 s = ui.config('worker', 'numcpus')
34 if s:
34 if s:
35 try:
35 try:
36 n = int(s)
36 n = int(s)
37 if n >= 1:
37 if n >= 1:
38 return n
38 return n
39 except ValueError:
39 except ValueError:
40 raise util.Abort(_('number of cpus must be an integer'))
40 raise util.Abort(_('number of cpus must be an integer'))
41 return min(max(countcpus(), 4), 32)
41 return min(max(countcpus(), 4), 32)
42
42
43 if os.name == 'posix':
43 if os.name == 'posix':
44 _startupcost = 0.01
44 _startupcost = 0.01
45 else:
45 else:
46 _startupcost = 1e30
46 _startupcost = 1e30
47
47
48 def worthwhile(ui, costperop, nops):
48 def worthwhile(ui, costperop, nops):
49 '''try to determine whether the benefit of multiple processes can
49 '''try to determine whether the benefit of multiple processes can
50 outweigh the cost of starting them'''
50 outweigh the cost of starting them'''
51 linear = costperop * nops
51 linear = costperop * nops
52 workers = _numworkers(ui)
52 workers = _numworkers(ui)
53 benefit = linear - (_startupcost * workers + linear / workers)
53 benefit = linear - (_startupcost * workers + linear / workers)
54 return benefit >= 0.15
54 return benefit >= 0.15
55
55
56 def worker(ui, costperarg, func, staticargs, args):
56 def worker(ui, costperarg, func, staticargs, args):
57 '''run a function, possibly in parallel in multiple worker
57 '''run a function, possibly in parallel in multiple worker
58 processes.
58 processes.
59
59
60 returns a progress iterator
60 returns a progress iterator
61
61
62 costperarg - cost of a single task
62 costperarg - cost of a single task
63
63
64 func - function to run
64 func - function to run
65
65
66 staticargs - arguments to pass to every invocation of the function
66 staticargs - arguments to pass to every invocation of the function
67
67
68 args - arguments to split into chunks, to pass to individual
68 args - arguments to split into chunks, to pass to individual
69 workers
69 workers
70 '''
70 '''
71 if worthwhile(ui, costperarg, len(args)):
71 if worthwhile(ui, costperarg, len(args)):
72 return _platformworker(ui, func, staticargs, args)
72 return _platformworker(ui, func, staticargs, args)
73 return func(*staticargs + (args,))
73 return func(*staticargs + (args,))
74
74
75 def _posixworker(ui, func, staticargs, args):
75 def _posixworker(ui, func, staticargs, args):
76 rfd, wfd = os.pipe()
76 rfd, wfd = os.pipe()
77 workers = _numworkers(ui)
77 workers = _numworkers(ui)
78 oldhandler = signal.getsignal(signal.SIGINT)
78 oldhandler = signal.getsignal(signal.SIGINT)
79 signal.signal(signal.SIGINT, signal.SIG_IGN)
79 signal.signal(signal.SIGINT, signal.SIG_IGN)
80 pids, problem = [], [0]
80 pids, problem = [], [0]
81 for pargs in partition(args, workers):
81 for pargs in partition(args, workers):
82 pid = os.fork()
82 pid = os.fork()
83 if pid == 0:
83 if pid == 0:
84 signal.signal(signal.SIGINT, oldhandler)
84 signal.signal(signal.SIGINT, oldhandler)
85 try:
85 try:
86 os.close(rfd)
86 os.close(rfd)
87 for i, item in func(*(staticargs + (pargs,))):
87 for i, item in func(*(staticargs + (pargs,))):
88 os.write(wfd, '%d %s\n' % (i, item))
88 os.write(wfd, '%d %s\n' % (i, item))
89 os._exit(0)
89 os._exit(0)
90 except KeyboardInterrupt:
90 except KeyboardInterrupt:
91 os._exit(255)
91 os._exit(255)
92 except: # re-raises (close enough for debugging anyway)
92 # other exceptions are allowed to propagate, we rely
93 try:
93 # on lock.py's pid checks to avoid release callbacks
94 ui.traceback()
95 finally:
96 os._exit(255)
97 pids.append(pid)
94 pids.append(pid)
98 pids.reverse()
95 pids.reverse()
99 os.close(wfd)
96 os.close(wfd)
100 fp = os.fdopen(rfd, 'rb', 0)
97 fp = os.fdopen(rfd, 'rb', 0)
101 def killworkers():
98 def killworkers():
102 # if one worker bails, there's no good reason to wait for the rest
99 # if one worker bails, there's no good reason to wait for the rest
103 for p in pids:
100 for p in pids:
104 try:
101 try:
105 os.kill(p, signal.SIGTERM)
102 os.kill(p, signal.SIGTERM)
106 except OSError, err:
103 except OSError, err:
107 if err.errno != errno.ESRCH:
104 if err.errno != errno.ESRCH:
108 raise
105 raise
109 def waitforworkers():
106 def waitforworkers():
110 for _ in pids:
107 for _ in pids:
111 st = _exitstatus(os.wait()[1])
108 st = _exitstatus(os.wait()[1])
112 if st and not problem[0]:
109 if st and not problem[0]:
113 problem[0] = st
110 problem[0] = st
114 killworkers()
111 killworkers()
115 t = threading.Thread(target=waitforworkers)
112 t = threading.Thread(target=waitforworkers)
116 t.start()
113 t.start()
117 def cleanup():
114 def cleanup():
118 signal.signal(signal.SIGINT, oldhandler)
115 signal.signal(signal.SIGINT, oldhandler)
119 t.join()
116 t.join()
120 status = problem[0]
117 status = problem[0]
121 if status:
118 if status:
122 if status < 0:
119 if status < 0:
123 os.kill(os.getpid(), -status)
120 os.kill(os.getpid(), -status)
124 sys.exit(status)
121 sys.exit(status)
125 try:
122 try:
126 for line in fp:
123 for line in fp:
127 l = line.split(' ', 1)
124 l = line.split(' ', 1)
128 yield int(l[0]), l[1][:-1]
125 yield int(l[0]), l[1][:-1]
129 except: # re-raises
126 except: # re-raises
130 killworkers()
127 killworkers()
131 cleanup()
128 cleanup()
132 raise
129 raise
133 cleanup()
130 cleanup()
134
131
135 def _posixexitstatus(code):
132 def _posixexitstatus(code):
136 '''convert a posix exit status into the same form returned by
133 '''convert a posix exit status into the same form returned by
137 os.spawnv
134 os.spawnv
138
135
139 returns None if the process was stopped instead of exiting'''
136 returns None if the process was stopped instead of exiting'''
140 if os.WIFEXITED(code):
137 if os.WIFEXITED(code):
141 return os.WEXITSTATUS(code)
138 return os.WEXITSTATUS(code)
142 elif os.WIFSIGNALED(code):
139 elif os.WIFSIGNALED(code):
143 return -os.WTERMSIG(code)
140 return -os.WTERMSIG(code)
144
141
145 if os.name != 'nt':
142 if os.name != 'nt':
146 _platformworker = _posixworker
143 _platformworker = _posixworker
147 _exitstatus = _posixexitstatus
144 _exitstatus = _posixexitstatus
148
145
149 def partition(lst, nslices):
146 def partition(lst, nslices):
150 '''partition a list into N slices of equal size'''
147 '''partition a list into N slices of equal size'''
151 n = len(lst)
148 n = len(lst)
152 chunk, slop = n / nslices, n % nslices
149 chunk, slop = n / nslices, n % nslices
153 end = 0
150 end = 0
154 for i in xrange(nslices):
151 for i in xrange(nslices):
155 start = end
152 start = end
156 end = start + chunk
153 end = start + chunk
157 if slop:
154 if slop:
158 end += 1
155 end += 1
159 slop -= 1
156 slop -= 1
160 yield lst[start:end]
157 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now