##// END OF EJS Templates
worker: properly report errors from worker processes (issue3982)
Matt Mackall -
r19408:c7ec39c1 default
parent child Browse files
Show More
@@ -1,160 +1,157
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 import errno, os, signal, sys, threading, util
10 10
11 11 def countcpus():
12 12 '''try to count the number of CPUs on the system'''
13 13
14 14 # posix
15 15 try:
16 16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
17 17 if n > 0:
18 18 return n
19 19 except (AttributeError, ValueError):
20 20 pass
21 21
22 22 # windows
23 23 try:
24 24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
25 25 if n > 0:
26 26 return n
27 27 except (KeyError, ValueError):
28 28 pass
29 29
30 30 return 1
31 31
32 32 def _numworkers(ui):
33 33 s = ui.config('worker', 'numcpus')
34 34 if s:
35 35 try:
36 36 n = int(s)
37 37 if n >= 1:
38 38 return n
39 39 except ValueError:
40 40 raise util.Abort(_('number of cpus must be an integer'))
41 41 return min(max(countcpus(), 4), 32)
42 42
43 43 if os.name == 'posix':
44 44 _startupcost = 0.01
45 45 else:
46 46 _startupcost = 1e30
47 47
48 48 def worthwhile(ui, costperop, nops):
49 49 '''try to determine whether the benefit of multiple processes can
50 50 outweigh the cost of starting them'''
51 51 linear = costperop * nops
52 52 workers = _numworkers(ui)
53 53 benefit = linear - (_startupcost * workers + linear / workers)
54 54 return benefit >= 0.15
55 55
56 56 def worker(ui, costperarg, func, staticargs, args):
57 57 '''run a function, possibly in parallel in multiple worker
58 58 processes.
59 59
60 60 returns a progress iterator
61 61
62 62 costperarg - cost of a single task
63 63
64 64 func - function to run
65 65
66 66 staticargs - arguments to pass to every invocation of the function
67 67
68 68 args - arguments to split into chunks, to pass to individual
69 69 workers
70 70 '''
71 71 if worthwhile(ui, costperarg, len(args)):
72 72 return _platformworker(ui, func, staticargs, args)
73 73 return func(*staticargs + (args,))
74 74
75 75 def _posixworker(ui, func, staticargs, args):
76 76 rfd, wfd = os.pipe()
77 77 workers = _numworkers(ui)
78 78 oldhandler = signal.getsignal(signal.SIGINT)
79 79 signal.signal(signal.SIGINT, signal.SIG_IGN)
80 80 pids, problem = [], [0]
81 81 for pargs in partition(args, workers):
82 82 pid = os.fork()
83 83 if pid == 0:
84 84 signal.signal(signal.SIGINT, oldhandler)
85 85 try:
86 86 os.close(rfd)
87 87 for i, item in func(*(staticargs + (pargs,))):
88 88 os.write(wfd, '%d %s\n' % (i, item))
89 89 os._exit(0)
90 90 except KeyboardInterrupt:
91 91 os._exit(255)
92 except: # re-raises (close enough for debugging anyway)
93 try:
94 ui.traceback()
95 finally:
96 os._exit(255)
92 # other exceptions are allowed to propagate, we rely
93 # on lock.py's pid checks to avoid release callbacks
97 94 pids.append(pid)
98 95 pids.reverse()
99 96 os.close(wfd)
100 97 fp = os.fdopen(rfd, 'rb', 0)
101 98 def killworkers():
102 99 # if one worker bails, there's no good reason to wait for the rest
103 100 for p in pids:
104 101 try:
105 102 os.kill(p, signal.SIGTERM)
106 103 except OSError, err:
107 104 if err.errno != errno.ESRCH:
108 105 raise
109 106 def waitforworkers():
110 107 for _ in pids:
111 108 st = _exitstatus(os.wait()[1])
112 109 if st and not problem[0]:
113 110 problem[0] = st
114 111 killworkers()
115 112 t = threading.Thread(target=waitforworkers)
116 113 t.start()
117 114 def cleanup():
118 115 signal.signal(signal.SIGINT, oldhandler)
119 116 t.join()
120 117 status = problem[0]
121 118 if status:
122 119 if status < 0:
123 120 os.kill(os.getpid(), -status)
124 121 sys.exit(status)
125 122 try:
126 123 for line in fp:
127 124 l = line.split(' ', 1)
128 125 yield int(l[0]), l[1][:-1]
129 126 except: # re-raises
130 127 killworkers()
131 128 cleanup()
132 129 raise
133 130 cleanup()
134 131
135 132 def _posixexitstatus(code):
136 133 '''convert a posix exit status into the same form returned by
137 134 os.spawnv
138 135
139 136 returns None if the process was stopped instead of exiting'''
140 137 if os.WIFEXITED(code):
141 138 return os.WEXITSTATUS(code)
142 139 elif os.WIFSIGNALED(code):
143 140 return -os.WTERMSIG(code)
144 141
145 142 if os.name != 'nt':
146 143 _platformworker = _posixworker
147 144 _exitstatus = _posixexitstatus
148 145
149 146 def partition(lst, nslices):
150 147 '''partition a list into N slices of equal size'''
151 148 n = len(lst)
152 149 chunk, slop = n / nslices, n % nslices
153 150 end = 0
154 151 for i in xrange(nslices):
155 152 start = end
156 153 end = start + chunk
157 154 if slop:
158 155 end += 1
159 156 slop -= 1
160 157 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now