##// END OF EJS Templates
worker: on error, exit similarly to the first failing worker...
Bryan O'Sullivan -
r18707:d1a2b086 default
parent child Browse files
Show More
@@ -1,123 +1,139
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import os, signal, sys, util
9 import os, signal, sys, util
10
10
11 def countcpus():
11 def countcpus():
12 '''try to count the number of CPUs on the system'''
12 '''try to count the number of CPUs on the system'''
13
13
14 # posix
14 # posix
15 try:
15 try:
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
16 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
17 if n > 0:
17 if n > 0:
18 return n
18 return n
19 except (AttributeError, ValueError):
19 except (AttributeError, ValueError):
20 pass
20 pass
21
21
22 # windows
22 # windows
23 try:
23 try:
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
24 n = int(os.environ['NUMBER_OF_PROCESSORS'])
25 if n > 0:
25 if n > 0:
26 return n
26 return n
27 except (KeyError, ValueError):
27 except (KeyError, ValueError):
28 pass
28 pass
29
29
30 return 1
30 return 1
31
31
32 def _numworkers(ui):
32 def _numworkers(ui):
33 s = ui.config('worker', 'numcpus')
33 s = ui.config('worker', 'numcpus')
34 if s:
34 if s:
35 try:
35 try:
36 n = int(s)
36 n = int(s)
37 if n >= 1:
37 if n >= 1:
38 return n
38 return n
39 except ValueError:
39 except ValueError:
40 raise util.Abort(_('number of cpus must be an integer'))
40 raise util.Abort(_('number of cpus must be an integer'))
41 return min(max(countcpus(), 4), 32)
41 return min(max(countcpus(), 4), 32)
42
42
43 if os.name == 'posix':
43 if os.name == 'posix':
44 _startupcost = 0.01
44 _startupcost = 0.01
45 else:
45 else:
46 _startupcost = 1e30
46 _startupcost = 1e30
47
47
48 def worthwhile(ui, costperop, nops):
48 def worthwhile(ui, costperop, nops):
49 '''try to determine whether the benefit of multiple processes can
49 '''try to determine whether the benefit of multiple processes can
50 outweigh the cost of starting them'''
50 outweigh the cost of starting them'''
51 linear = costperop * nops
51 linear = costperop * nops
52 workers = _numworkers(ui)
52 workers = _numworkers(ui)
53 benefit = linear - (_startupcost * workers + linear / workers)
53 benefit = linear - (_startupcost * workers + linear / workers)
54 return benefit >= 0.15
54 return benefit >= 0.15
55
55
56 def worker(ui, costperarg, func, staticargs, args):
56 def worker(ui, costperarg, func, staticargs, args):
57 '''run a function, possibly in parallel in multiple worker
57 '''run a function, possibly in parallel in multiple worker
58 processes.
58 processes.
59
59
60 returns a progress iterator
60 returns a progress iterator
61
61
62 costperarg - cost of a single task
62 costperarg - cost of a single task
63
63
64 func - function to run
64 func - function to run
65
65
66 staticargs - arguments to pass to every invocation of the function
66 staticargs - arguments to pass to every invocation of the function
67
67
68 args - arguments to split into chunks, to pass to individual
68 args - arguments to split into chunks, to pass to individual
69 workers
69 workers
70 '''
70 '''
71 if worthwhile(ui, costperarg, len(args)):
71 if worthwhile(ui, costperarg, len(args)):
72 return _platformworker(ui, func, staticargs, args)
72 return _platformworker(ui, func, staticargs, args)
73 return func(*staticargs + (args,))
73 return func(*staticargs + (args,))
74
74
75 def _posixworker(ui, func, staticargs, args):
75 def _posixworker(ui, func, staticargs, args):
76 rfd, wfd = os.pipe()
76 rfd, wfd = os.pipe()
77 workers = _numworkers(ui)
77 workers = _numworkers(ui)
78 for pargs in partition(args, workers):
78 for pargs in partition(args, workers):
79 pid = os.fork()
79 pid = os.fork()
80 if pid == 0:
80 if pid == 0:
81 try:
81 try:
82 os.close(rfd)
82 os.close(rfd)
83 for i, item in func(*(staticargs + (pargs,))):
83 for i, item in func(*(staticargs + (pargs,))):
84 os.write(wfd, '%d %s\n' % (i, item))
84 os.write(wfd, '%d %s\n' % (i, item))
85 os._exit(0)
85 os._exit(0)
86 except KeyboardInterrupt:
86 except KeyboardInterrupt:
87 os._exit(255)
87 os._exit(255)
88 os.close(wfd)
88 os.close(wfd)
89 fp = os.fdopen(rfd, 'rb', 0)
89 fp = os.fdopen(rfd, 'rb', 0)
90 oldhandler = signal.getsignal(signal.SIGINT)
90 oldhandler = signal.getsignal(signal.SIGINT)
91 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 def cleanup():
92 def cleanup():
93 # python 2.4 is too dumb for try/yield/finally
93 # python 2.4 is too dumb for try/yield/finally
94 signal.signal(signal.SIGINT, oldhandler)
94 signal.signal(signal.SIGINT, oldhandler)
95 problems = 0
95 problem = None
96 for i in xrange(workers):
96 for i in xrange(workers):
97 problems |= os.wait()[1]
97 pid, st = os.wait()
98 if problems:
98 st = _exitstatus(st)
99 sys.exit(1)
99 if st and not problem:
100 problem = st
101 if problem:
102 if problem < 0:
103 os.kill(os.getpid(), -problem)
104 sys.exit(problem)
100 try:
105 try:
101 for line in fp:
106 for line in fp:
102 l = line.split(' ', 1)
107 l = line.split(' ', 1)
103 yield int(l[0]), l[1][:-1]
108 yield int(l[0]), l[1][:-1]
104 except: # re-raises
109 except: # re-raises
105 cleanup()
110 cleanup()
106 raise
111 raise
107 cleanup()
112 cleanup()
108
113
114 def _posixexitstatus(code):
115 '''convert a posix exit status into the same form returned by
116 os.spawnv
117
118 returns None if the process was stopped instead of exiting'''
119 if os.WIFEXITED(code):
120 return os.WEXITSTATUS(code)
121 elif os.WIFSIGNALED(code):
122 return -os.WTERMSIG(code)
123
109 if os.name != 'nt':
124 if os.name != 'nt':
110 _platformworker = _posixworker
125 _platformworker = _posixworker
126 _exitstatus = _posixexitstatus
111
127
112 def partition(lst, nslices):
128 def partition(lst, nslices):
113 '''partition a list into N slices of equal size'''
129 '''partition a list into N slices of equal size'''
114 n = len(lst)
130 n = len(lst)
115 chunk, slop = n / nslices, n % nslices
131 chunk, slop = n / nslices, n % nslices
116 end = 0
132 end = 0
117 for i in xrange(nslices):
133 for i in xrange(nslices):
118 start = end
134 start = end
119 end = start + chunk
135 end = start + chunk
120 if slop:
136 if slop:
121 end += 1
137 end += 1
122 slop -= 1
138 slop -= 1
123 yield lst[start:end]
139 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now