##// END OF EJS Templates
worker: restore old countcpus code (issue4869)...
Gregory Szorc -
r26568:c0501c26 default
parent child Browse files
Show More
@@ -1,152 +1,165 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import multiprocessing
12 import os
11 import os
13 import signal
12 import signal
14 import sys
13 import sys
15 import threading
14 import threading
16
15
17 from .i18n import _
16 from .i18n import _
18 from . import util
17 from . import util
19
18
20 def countcpus():
19 def countcpus():
21 '''try to count the number of CPUs on the system'''
20 '''try to count the number of CPUs on the system'''
21
22 # posix
22 try:
23 try:
23 return multiprocessing.cpu_count()
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
24 except NotImplementedError:
25 if n > 0:
25 return 1
26 return n
27 except (AttributeError, ValueError):
28 pass
29
30 # windows
31 try:
32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
33 if n > 0:
34 return n
35 except (KeyError, ValueError):
36 pass
37
38 return 1
26
39
27 def _numworkers(ui):
40 def _numworkers(ui):
28 s = ui.config('worker', 'numcpus')
41 s = ui.config('worker', 'numcpus')
29 if s:
42 if s:
30 try:
43 try:
31 n = int(s)
44 n = int(s)
32 if n >= 1:
45 if n >= 1:
33 return n
46 return n
34 except ValueError:
47 except ValueError:
35 raise util.Abort(_('number of cpus must be an integer'))
48 raise util.Abort(_('number of cpus must be an integer'))
36 return min(max(countcpus(), 4), 32)
49 return min(max(countcpus(), 4), 32)
37
50
38 if os.name == 'posix':
51 if os.name == 'posix':
39 _startupcost = 0.01
52 _startupcost = 0.01
40 else:
53 else:
41 _startupcost = 1e30
54 _startupcost = 1e30
42
55
43 def worthwhile(ui, costperop, nops):
56 def worthwhile(ui, costperop, nops):
44 '''try to determine whether the benefit of multiple processes can
57 '''try to determine whether the benefit of multiple processes can
45 outweigh the cost of starting them'''
58 outweigh the cost of starting them'''
46 linear = costperop * nops
59 linear = costperop * nops
47 workers = _numworkers(ui)
60 workers = _numworkers(ui)
48 benefit = linear - (_startupcost * workers + linear / workers)
61 benefit = linear - (_startupcost * workers + linear / workers)
49 return benefit >= 0.15
62 return benefit >= 0.15
50
63
51 def worker(ui, costperarg, func, staticargs, args):
64 def worker(ui, costperarg, func, staticargs, args):
52 '''run a function, possibly in parallel in multiple worker
65 '''run a function, possibly in parallel in multiple worker
53 processes.
66 processes.
54
67
55 returns a progress iterator
68 returns a progress iterator
56
69
57 costperarg - cost of a single task
70 costperarg - cost of a single task
58
71
59 func - function to run
72 func - function to run
60
73
61 staticargs - arguments to pass to every invocation of the function
74 staticargs - arguments to pass to every invocation of the function
62
75
63 args - arguments to split into chunks, to pass to individual
76 args - arguments to split into chunks, to pass to individual
64 workers
77 workers
65 '''
78 '''
66 if worthwhile(ui, costperarg, len(args)):
79 if worthwhile(ui, costperarg, len(args)):
67 return _platformworker(ui, func, staticargs, args)
80 return _platformworker(ui, func, staticargs, args)
68 return func(*staticargs + (args,))
81 return func(*staticargs + (args,))
69
82
70 def _posixworker(ui, func, staticargs, args):
83 def _posixworker(ui, func, staticargs, args):
71 rfd, wfd = os.pipe()
84 rfd, wfd = os.pipe()
72 workers = _numworkers(ui)
85 workers = _numworkers(ui)
73 oldhandler = signal.getsignal(signal.SIGINT)
86 oldhandler = signal.getsignal(signal.SIGINT)
74 signal.signal(signal.SIGINT, signal.SIG_IGN)
87 signal.signal(signal.SIGINT, signal.SIG_IGN)
75 pids, problem = [], [0]
88 pids, problem = [], [0]
76 for pargs in partition(args, workers):
89 for pargs in partition(args, workers):
77 pid = os.fork()
90 pid = os.fork()
78 if pid == 0:
91 if pid == 0:
79 signal.signal(signal.SIGINT, oldhandler)
92 signal.signal(signal.SIGINT, oldhandler)
80 try:
93 try:
81 os.close(rfd)
94 os.close(rfd)
82 for i, item in func(*(staticargs + (pargs,))):
95 for i, item in func(*(staticargs + (pargs,))):
83 os.write(wfd, '%d %s\n' % (i, item))
96 os.write(wfd, '%d %s\n' % (i, item))
84 os._exit(0)
97 os._exit(0)
85 except KeyboardInterrupt:
98 except KeyboardInterrupt:
86 os._exit(255)
99 os._exit(255)
87 # other exceptions are allowed to propagate, we rely
100 # other exceptions are allowed to propagate, we rely
88 # on lock.py's pid checks to avoid release callbacks
101 # on lock.py's pid checks to avoid release callbacks
89 pids.append(pid)
102 pids.append(pid)
90 pids.reverse()
103 pids.reverse()
91 os.close(wfd)
104 os.close(wfd)
92 fp = os.fdopen(rfd, 'rb', 0)
105 fp = os.fdopen(rfd, 'rb', 0)
93 def killworkers():
106 def killworkers():
94 # if one worker bails, there's no good reason to wait for the rest
107 # if one worker bails, there's no good reason to wait for the rest
95 for p in pids:
108 for p in pids:
96 try:
109 try:
97 os.kill(p, signal.SIGTERM)
110 os.kill(p, signal.SIGTERM)
98 except OSError as err:
111 except OSError as err:
99 if err.errno != errno.ESRCH:
112 if err.errno != errno.ESRCH:
100 raise
113 raise
101 def waitforworkers():
114 def waitforworkers():
102 for _pid in pids:
115 for _pid in pids:
103 st = _exitstatus(os.wait()[1])
116 st = _exitstatus(os.wait()[1])
104 if st and not problem[0]:
117 if st and not problem[0]:
105 problem[0] = st
118 problem[0] = st
106 killworkers()
119 killworkers()
107 t = threading.Thread(target=waitforworkers)
120 t = threading.Thread(target=waitforworkers)
108 t.start()
121 t.start()
109 def cleanup():
122 def cleanup():
110 signal.signal(signal.SIGINT, oldhandler)
123 signal.signal(signal.SIGINT, oldhandler)
111 t.join()
124 t.join()
112 status = problem[0]
125 status = problem[0]
113 if status:
126 if status:
114 if status < 0:
127 if status < 0:
115 os.kill(os.getpid(), -status)
128 os.kill(os.getpid(), -status)
116 sys.exit(status)
129 sys.exit(status)
117 try:
130 try:
118 for line in fp:
131 for line in fp:
119 l = line.split(' ', 1)
132 l = line.split(' ', 1)
120 yield int(l[0]), l[1][:-1]
133 yield int(l[0]), l[1][:-1]
121 except: # re-raises
134 except: # re-raises
122 killworkers()
135 killworkers()
123 cleanup()
136 cleanup()
124 raise
137 raise
125 cleanup()
138 cleanup()
126
139
127 def _posixexitstatus(code):
140 def _posixexitstatus(code):
128 '''convert a posix exit status into the same form returned by
141 '''convert a posix exit status into the same form returned by
129 os.spawnv
142 os.spawnv
130
143
131 returns None if the process was stopped instead of exiting'''
144 returns None if the process was stopped instead of exiting'''
132 if os.WIFEXITED(code):
145 if os.WIFEXITED(code):
133 return os.WEXITSTATUS(code)
146 return os.WEXITSTATUS(code)
134 elif os.WIFSIGNALED(code):
147 elif os.WIFSIGNALED(code):
135 return -os.WTERMSIG(code)
148 return -os.WTERMSIG(code)
136
149
137 if os.name != 'nt':
150 if os.name != 'nt':
138 _platformworker = _posixworker
151 _platformworker = _posixworker
139 _exitstatus = _posixexitstatus
152 _exitstatus = _posixexitstatus
140
153
141 def partition(lst, nslices):
154 def partition(lst, nslices):
142 '''partition a list into N slices of equal size'''
155 '''partition a list into N slices of equal size'''
143 n = len(lst)
156 n = len(lst)
144 chunk, slop = n / nslices, n % nslices
157 chunk, slop = n / nslices, n % nslices
145 end = 0
158 end = 0
146 for i in xrange(nslices):
159 for i in xrange(nslices):
147 start = end
160 start = end
148 end = start + chunk
161 end = start + chunk
149 if slop:
162 if slop:
150 end += 1
163 end += 1
151 slop -= 1
164 slop -= 1
152 yield lst[start:end]
165 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now