##// END OF EJS Templates
worker: use multiprocessing to find cpu count...
Gregory Szorc -
r26063:d29859cf default
parent child Browse files
Show More
@@ -1,165 +1,152 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import multiprocessing
11 import os
12 import os
12 import signal
13 import signal
13 import sys
14 import sys
14 import threading
15 import threading
15
16
16 from .i18n import _
17 from .i18n import _
17 from . import util
18 from . import util
18
19
19 def countcpus():
20 def countcpus():
20 '''try to count the number of CPUs on the system'''
21 '''try to count the number of CPUs on the system'''
21
22 # posix
23 try:
22 try:
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
23 return multiprocessing.cpu_count()
25 if n > 0:
24 except NotImplementedError:
26 return n
25 return 1
27 except (AttributeError, ValueError):
28 pass
29
30 # windows
31 try:
32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
33 if n > 0:
34 return n
35 except (KeyError, ValueError):
36 pass
37
38 return 1
39
26
40 def _numworkers(ui):
27 def _numworkers(ui):
41 s = ui.config('worker', 'numcpus')
28 s = ui.config('worker', 'numcpus')
42 if s:
29 if s:
43 try:
30 try:
44 n = int(s)
31 n = int(s)
45 if n >= 1:
32 if n >= 1:
46 return n
33 return n
47 except ValueError:
34 except ValueError:
48 raise util.Abort(_('number of cpus must be an integer'))
35 raise util.Abort(_('number of cpus must be an integer'))
49 return min(max(countcpus(), 4), 32)
36 return min(max(countcpus(), 4), 32)
50
37
51 if os.name == 'posix':
38 if os.name == 'posix':
52 _startupcost = 0.01
39 _startupcost = 0.01
53 else:
40 else:
54 _startupcost = 1e30
41 _startupcost = 1e30
55
42
56 def worthwhile(ui, costperop, nops):
43 def worthwhile(ui, costperop, nops):
57 '''try to determine whether the benefit of multiple processes can
44 '''try to determine whether the benefit of multiple processes can
58 outweigh the cost of starting them'''
45 outweigh the cost of starting them'''
59 linear = costperop * nops
46 linear = costperop * nops
60 workers = _numworkers(ui)
47 workers = _numworkers(ui)
61 benefit = linear - (_startupcost * workers + linear / workers)
48 benefit = linear - (_startupcost * workers + linear / workers)
62 return benefit >= 0.15
49 return benefit >= 0.15
63
50
64 def worker(ui, costperarg, func, staticargs, args):
51 def worker(ui, costperarg, func, staticargs, args):
65 '''run a function, possibly in parallel in multiple worker
52 '''run a function, possibly in parallel in multiple worker
66 processes.
53 processes.
67
54
68 returns a progress iterator
55 returns a progress iterator
69
56
70 costperarg - cost of a single task
57 costperarg - cost of a single task
71
58
72 func - function to run
59 func - function to run
73
60
74 staticargs - arguments to pass to every invocation of the function
61 staticargs - arguments to pass to every invocation of the function
75
62
76 args - arguments to split into chunks, to pass to individual
63 args - arguments to split into chunks, to pass to individual
77 workers
64 workers
78 '''
65 '''
79 if worthwhile(ui, costperarg, len(args)):
66 if worthwhile(ui, costperarg, len(args)):
80 return _platformworker(ui, func, staticargs, args)
67 return _platformworker(ui, func, staticargs, args)
81 return func(*staticargs + (args,))
68 return func(*staticargs + (args,))
82
69
83 def _posixworker(ui, func, staticargs, args):
70 def _posixworker(ui, func, staticargs, args):
84 rfd, wfd = os.pipe()
71 rfd, wfd = os.pipe()
85 workers = _numworkers(ui)
72 workers = _numworkers(ui)
86 oldhandler = signal.getsignal(signal.SIGINT)
73 oldhandler = signal.getsignal(signal.SIGINT)
87 signal.signal(signal.SIGINT, signal.SIG_IGN)
74 signal.signal(signal.SIGINT, signal.SIG_IGN)
88 pids, problem = [], [0]
75 pids, problem = [], [0]
89 for pargs in partition(args, workers):
76 for pargs in partition(args, workers):
90 pid = os.fork()
77 pid = os.fork()
91 if pid == 0:
78 if pid == 0:
92 signal.signal(signal.SIGINT, oldhandler)
79 signal.signal(signal.SIGINT, oldhandler)
93 try:
80 try:
94 os.close(rfd)
81 os.close(rfd)
95 for i, item in func(*(staticargs + (pargs,))):
82 for i, item in func(*(staticargs + (pargs,))):
96 os.write(wfd, '%d %s\n' % (i, item))
83 os.write(wfd, '%d %s\n' % (i, item))
97 os._exit(0)
84 os._exit(0)
98 except KeyboardInterrupt:
85 except KeyboardInterrupt:
99 os._exit(255)
86 os._exit(255)
100 # other exceptions are allowed to propagate, we rely
87 # other exceptions are allowed to propagate, we rely
101 # on lock.py's pid checks to avoid release callbacks
88 # on lock.py's pid checks to avoid release callbacks
102 pids.append(pid)
89 pids.append(pid)
103 pids.reverse()
90 pids.reverse()
104 os.close(wfd)
91 os.close(wfd)
105 fp = os.fdopen(rfd, 'rb', 0)
92 fp = os.fdopen(rfd, 'rb', 0)
106 def killworkers():
93 def killworkers():
107 # if one worker bails, there's no good reason to wait for the rest
94 # if one worker bails, there's no good reason to wait for the rest
108 for p in pids:
95 for p in pids:
109 try:
96 try:
110 os.kill(p, signal.SIGTERM)
97 os.kill(p, signal.SIGTERM)
111 except OSError as err:
98 except OSError as err:
112 if err.errno != errno.ESRCH:
99 if err.errno != errno.ESRCH:
113 raise
100 raise
114 def waitforworkers():
101 def waitforworkers():
115 for _pid in pids:
102 for _pid in pids:
116 st = _exitstatus(os.wait()[1])
103 st = _exitstatus(os.wait()[1])
117 if st and not problem[0]:
104 if st and not problem[0]:
118 problem[0] = st
105 problem[0] = st
119 killworkers()
106 killworkers()
120 t = threading.Thread(target=waitforworkers)
107 t = threading.Thread(target=waitforworkers)
121 t.start()
108 t.start()
122 def cleanup():
109 def cleanup():
123 signal.signal(signal.SIGINT, oldhandler)
110 signal.signal(signal.SIGINT, oldhandler)
124 t.join()
111 t.join()
125 status = problem[0]
112 status = problem[0]
126 if status:
113 if status:
127 if status < 0:
114 if status < 0:
128 os.kill(os.getpid(), -status)
115 os.kill(os.getpid(), -status)
129 sys.exit(status)
116 sys.exit(status)
130 try:
117 try:
131 for line in fp:
118 for line in fp:
132 l = line.split(' ', 1)
119 l = line.split(' ', 1)
133 yield int(l[0]), l[1][:-1]
120 yield int(l[0]), l[1][:-1]
134 except: # re-raises
121 except: # re-raises
135 killworkers()
122 killworkers()
136 cleanup()
123 cleanup()
137 raise
124 raise
138 cleanup()
125 cleanup()
139
126
140 def _posixexitstatus(code):
127 def _posixexitstatus(code):
141 '''convert a posix exit status into the same form returned by
128 '''convert a posix exit status into the same form returned by
142 os.spawnv
129 os.spawnv
143
130
144 returns None if the process was stopped instead of exiting'''
131 returns None if the process was stopped instead of exiting'''
145 if os.WIFEXITED(code):
132 if os.WIFEXITED(code):
146 return os.WEXITSTATUS(code)
133 return os.WEXITSTATUS(code)
147 elif os.WIFSIGNALED(code):
134 elif os.WIFSIGNALED(code):
148 return -os.WTERMSIG(code)
135 return -os.WTERMSIG(code)
149
136
150 if os.name != 'nt':
137 if os.name != 'nt':
151 _platformworker = _posixworker
138 _platformworker = _posixworker
152 _exitstatus = _posixexitstatus
139 _exitstatus = _posixexitstatus
153
140
154 def partition(lst, nslices):
141 def partition(lst, nslices):
155 '''partition a list into N slices of equal size'''
142 '''partition a list into N slices of equal size'''
156 n = len(lst)
143 n = len(lst)
157 chunk, slop = n / nslices, n % nslices
144 chunk, slop = n / nslices, n % nslices
158 end = 0
145 end = 0
159 for i in xrange(nslices):
146 for i in xrange(nslices):
160 start = end
147 start = end
161 end = start + chunk
148 end = start + chunk
162 if slop:
149 if slop:
163 end += 1
150 end += 1
164 slop -= 1
151 slop -= 1
165 yield lst[start:end]
152 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now