##// END OF EJS Templates
worker: check problem state correctly (issue3982)...
Matt Mackall -
r19406:3185b347 default
parent child Browse files
Show More
@@ -1,160 +1,160
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import errno, os, signal, sys, threading, util
9 import errno, os, signal, sys, threading, util
10
10
11 def countcpus():
11 def countcpus():
12 '''try to count the number of CPUs on the system'''
12 '''try to count the number of CPUs on the system'''
13
13
14 # posix
14 # posix
15 try:
15 try:
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
17 if n > 0:
17 if n > 0:
18 return n
18 return n
19 except (AttributeError, ValueError):
19 except (AttributeError, ValueError):
20 pass
20 pass
21
21
22 # windows
22 # windows
23 try:
23 try:
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
25 if n > 0:
25 if n > 0:
26 return n
26 return n
27 except (KeyError, ValueError):
27 except (KeyError, ValueError):
28 pass
28 pass
29
29
30 return 1
30 return 1
31
31
32 def _numworkers(ui):
32 def _numworkers(ui):
33 s = ui.config('worker', 'numcpus')
33 s = ui.config('worker', 'numcpus')
34 if s:
34 if s:
35 try:
35 try:
36 n = int(s)
36 n = int(s)
37 if n >= 1:
37 if n >= 1:
38 return n
38 return n
39 except ValueError:
39 except ValueError:
40 raise util.Abort(_('number of cpus must be an integer'))
40 raise util.Abort(_('number of cpus must be an integer'))
41 return min(max(countcpus(), 4), 32)
41 return min(max(countcpus(), 4), 32)
42
42
43 if os.name == 'posix':
43 if os.name == 'posix':
44 _startupcost = 0.01
44 _startupcost = 0.01
45 else:
45 else:
46 _startupcost = 1e30
46 _startupcost = 1e30
47
47
48 def worthwhile(ui, costperop, nops):
48 def worthwhile(ui, costperop, nops):
49 '''try to determine whether the benefit of multiple processes can
49 '''try to determine whether the benefit of multiple processes can
50 outweigh the cost of starting them'''
50 outweigh the cost of starting them'''
51 linear = costperop * nops
51 linear = costperop * nops
52 workers = _numworkers(ui)
52 workers = _numworkers(ui)
53 benefit = linear - (_startupcost * workers + linear / workers)
53 benefit = linear - (_startupcost * workers + linear / workers)
54 return benefit >= 0.15
54 return benefit >= 0.15
55
55
56 def worker(ui, costperarg, func, staticargs, args):
56 def worker(ui, costperarg, func, staticargs, args):
57 '''run a function, possibly in parallel in multiple worker
57 '''run a function, possibly in parallel in multiple worker
58 processes.
58 processes.
59
59
60 returns a progress iterator
60 returns a progress iterator
61
61
62 costperarg - cost of a single task
62 costperarg - cost of a single task
63
63
64 func - function to run
64 func - function to run
65
65
66 staticargs - arguments to pass to every invocation of the function
66 staticargs - arguments to pass to every invocation of the function
67
67
68 args - arguments to split into chunks, to pass to individual
68 args - arguments to split into chunks, to pass to individual
69 workers
69 workers
70 '''
70 '''
71 if worthwhile(ui, costperarg, len(args)):
71 if worthwhile(ui, costperarg, len(args)):
72 return _platformworker(ui, func, staticargs, args)
72 return _platformworker(ui, func, staticargs, args)
73 return func(*staticargs + (args,))
73 return func(*staticargs + (args,))
74
74
75 def _posixworker(ui, func, staticargs, args):
75 def _posixworker(ui, func, staticargs, args):
76 rfd, wfd = os.pipe()
76 rfd, wfd = os.pipe()
77 workers = _numworkers(ui)
77 workers = _numworkers(ui)
78 oldhandler = signal.getsignal(signal.SIGINT)
78 oldhandler = signal.getsignal(signal.SIGINT)
79 signal.signal(signal.SIGINT, signal.SIG_IGN)
79 signal.signal(signal.SIGINT, signal.SIG_IGN)
80 pids, problem = [], [0]
80 pids, problem = [], [0]
81 for pargs in partition(args, workers):
81 for pargs in partition(args, workers):
82 pid = os.fork()
82 pid = os.fork()
83 if pid == 0:
83 if pid == 0:
84 signal.signal(signal.SIGINT, oldhandler)
84 signal.signal(signal.SIGINT, oldhandler)
85 try:
85 try:
86 os.close(rfd)
86 os.close(rfd)
87 for i, item in func(*(staticargs + (pargs,))):
87 for i, item in func(*(staticargs + (pargs,))):
88 os.write(wfd, '%d %s\n' % (i, item))
88 os.write(wfd, '%d %s\n' % (i, item))
89 os._exit(0)
89 os._exit(0)
90 except KeyboardInterrupt:
90 except KeyboardInterrupt:
91 os._exit(255)
91 os._exit(255)
92 except: # re-raises (close enough for debugging anyway)
92 except: # re-raises (close enough for debugging anyway)
93 try:
93 try:
94 ui.traceback()
94 ui.traceback()
95 finally:
95 finally:
96 os._exit(255)
96 os._exit(255)
97 pids.append(pid)
97 pids.append(pid)
98 pids.reverse()
98 pids.reverse()
99 os.close(wfd)
99 os.close(wfd)
100 fp = os.fdopen(rfd, 'rb', 0)
100 fp = os.fdopen(rfd, 'rb', 0)
101 def killworkers():
101 def killworkers():
102 # if one worker bails, there's no good reason to wait for the rest
102 # if one worker bails, there's no good reason to wait for the rest
103 for p in pids:
103 for p in pids:
104 try:
104 try:
105 os.kill(p, signal.SIGTERM)
105 os.kill(p, signal.SIGTERM)
106 except OSError, err:
106 except OSError, err:
107 if err.errno != errno.ESRCH:
107 if err.errno != errno.ESRCH:
108 raise
108 raise
109 def waitforworkers():
109 def waitforworkers():
110 for _ in pids:
110 for _ in pids:
111 st = _exitstatus(os.wait()[1])
111 st = _exitstatus(os.wait()[1])
112 if st and not problem:
112 if st and not problem[0]:
113 problem[0] = st
113 problem[0] = st
114 killworkers()
114 killworkers()
115 t = threading.Thread(target=waitforworkers)
115 t = threading.Thread(target=waitforworkers)
116 t.start()
116 t.start()
117 def cleanup():
117 def cleanup():
118 signal.signal(signal.SIGINT, oldhandler)
118 signal.signal(signal.SIGINT, oldhandler)
119 t.join()
119 t.join()
120 status = problem[0]
120 status = problem[0]
121 if status:
121 if status:
122 if status < 0:
122 if status < 0:
123 os.kill(os.getpid(), -status)
123 os.kill(os.getpid(), -status)
124 sys.exit(status)
124 sys.exit(status)
125 try:
125 try:
126 for line in fp:
126 for line in fp:
127 l = line.split(' ', 1)
127 l = line.split(' ', 1)
128 yield int(l[0]), l[1][:-1]
128 yield int(l[0]), l[1][:-1]
129 except: # re-raises
129 except: # re-raises
130 killworkers()
130 killworkers()
131 cleanup()
131 cleanup()
132 raise
132 raise
133 cleanup()
133 cleanup()
134
134
135 def _posixexitstatus(code):
135 def _posixexitstatus(code):
136 '''convert a posix exit status into the same form returned by
136 '''convert a posix exit status into the same form returned by
137 os.spawnv
137 os.spawnv
138
138
139 returns None if the process was stopped instead of exiting'''
139 returns None if the process was stopped instead of exiting'''
140 if os.WIFEXITED(code):
140 if os.WIFEXITED(code):
141 return os.WEXITSTATUS(code)
141 return os.WEXITSTATUS(code)
142 elif os.WIFSIGNALED(code):
142 elif os.WIFSIGNALED(code):
143 return -os.WTERMSIG(code)
143 return -os.WTERMSIG(code)
144
144
145 if os.name != 'nt':
145 if os.name != 'nt':
146 _platformworker = _posixworker
146 _platformworker = _posixworker
147 _exitstatus = _posixexitstatus
147 _exitstatus = _posixexitstatus
148
148
149 def partition(lst, nslices):
149 def partition(lst, nslices):
150 '''partition a list into N slices of equal size'''
150 '''partition a list into N slices of equal size'''
151 n = len(lst)
151 n = len(lst)
152 chunk, slop = n / nslices, n % nslices
152 chunk, slop = n / nslices, n % nslices
153 end = 0
153 end = 0
154 for i in xrange(nslices):
154 for i in xrange(nslices):
155 start = end
155 start = end
156 end = start + chunk
156 end = start + chunk
157 if slop:
157 if slop:
158 end += 1
158 end += 1
159 slop -= 1
159 slop -= 1
160 yield lst[start:end]
160 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now