##// END OF EJS Templates
worker: use multiprocessing to find cpu count...
Gregory Szorc -
r26063:d29859cf default
parent child Browse files
Show More
@@ -1,165 +1,152 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 import multiprocessing
11 12 import os
12 13 import signal
13 14 import sys
14 15 import threading
15 16
16 17 from .i18n import _
17 18 from . import util
18 19
19 20 def countcpus():
20 21 '''try to count the number of CPUs on the system'''
21
22 # posix
23 22 try:
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
25 if n > 0:
26 return n
27 except (AttributeError, ValueError):
28 pass
29
30 # windows
31 try:
32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
33 if n > 0:
34 return n
35 except (KeyError, ValueError):
36 pass
37
38 return 1
23 return multiprocessing.cpu_count()
24 except NotImplementedError:
25 return 1
39 26
40 27 def _numworkers(ui):
41 28 s = ui.config('worker', 'numcpus')
42 29 if s:
43 30 try:
44 31 n = int(s)
45 32 if n >= 1:
46 33 return n
47 34 except ValueError:
48 35 raise util.Abort(_('number of cpus must be an integer'))
49 36 return min(max(countcpus(), 4), 32)
50 37
51 38 if os.name == 'posix':
52 39 _startupcost = 0.01
53 40 else:
54 41 _startupcost = 1e30
55 42
56 43 def worthwhile(ui, costperop, nops):
57 44 '''try to determine whether the benefit of multiple processes can
58 45 outweigh the cost of starting them'''
59 46 linear = costperop * nops
60 47 workers = _numworkers(ui)
61 48 benefit = linear - (_startupcost * workers + linear / workers)
62 49 return benefit >= 0.15
63 50
64 51 def worker(ui, costperarg, func, staticargs, args):
65 52 '''run a function, possibly in parallel in multiple worker
66 53 processes.
67 54
68 55 returns a progress iterator
69 56
70 57 costperarg - cost of a single task
71 58
72 59 func - function to run
73 60
74 61 staticargs - arguments to pass to every invocation of the function
75 62
76 63 args - arguments to split into chunks, to pass to individual
77 64 workers
78 65 '''
79 66 if worthwhile(ui, costperarg, len(args)):
80 67 return _platformworker(ui, func, staticargs, args)
81 68 return func(*staticargs + (args,))
82 69
83 70 def _posixworker(ui, func, staticargs, args):
84 71 rfd, wfd = os.pipe()
85 72 workers = _numworkers(ui)
86 73 oldhandler = signal.getsignal(signal.SIGINT)
87 74 signal.signal(signal.SIGINT, signal.SIG_IGN)
88 75 pids, problem = [], [0]
89 76 for pargs in partition(args, workers):
90 77 pid = os.fork()
91 78 if pid == 0:
92 79 signal.signal(signal.SIGINT, oldhandler)
93 80 try:
94 81 os.close(rfd)
95 82 for i, item in func(*(staticargs + (pargs,))):
96 83 os.write(wfd, '%d %s\n' % (i, item))
97 84 os._exit(0)
98 85 except KeyboardInterrupt:
99 86 os._exit(255)
100 87 # other exceptions are allowed to propagate, we rely
101 88 # on lock.py's pid checks to avoid release callbacks
102 89 pids.append(pid)
103 90 pids.reverse()
104 91 os.close(wfd)
105 92 fp = os.fdopen(rfd, 'rb', 0)
106 93 def killworkers():
107 94 # if one worker bails, there's no good reason to wait for the rest
108 95 for p in pids:
109 96 try:
110 97 os.kill(p, signal.SIGTERM)
111 98 except OSError as err:
112 99 if err.errno != errno.ESRCH:
113 100 raise
114 101 def waitforworkers():
115 102 for _pid in pids:
116 103 st = _exitstatus(os.wait()[1])
117 104 if st and not problem[0]:
118 105 problem[0] = st
119 106 killworkers()
120 107 t = threading.Thread(target=waitforworkers)
121 108 t.start()
122 109 def cleanup():
123 110 signal.signal(signal.SIGINT, oldhandler)
124 111 t.join()
125 112 status = problem[0]
126 113 if status:
127 114 if status < 0:
128 115 os.kill(os.getpid(), -status)
129 116 sys.exit(status)
130 117 try:
131 118 for line in fp:
132 119 l = line.split(' ', 1)
133 120 yield int(l[0]), l[1][:-1]
134 121 except: # re-raises
135 122 killworkers()
136 123 cleanup()
137 124 raise
138 125 cleanup()
139 126
140 127 def _posixexitstatus(code):
141 128 '''convert a posix exit status into the same form returned by
142 129 os.spawnv
143 130
144 131 returns None if the process was stopped instead of exiting'''
145 132 if os.WIFEXITED(code):
146 133 return os.WEXITSTATUS(code)
147 134 elif os.WIFSIGNALED(code):
148 135 return -os.WTERMSIG(code)
149 136
150 137 if os.name != 'nt':
151 138 _platformworker = _posixworker
152 139 _exitstatus = _posixexitstatus
153 140
154 141 def partition(lst, nslices):
155 142 '''partition a list into N slices of equal size'''
156 143 n = len(lst)
157 144 chunk, slop = n / nslices, n % nslices
158 145 end = 0
159 146 for i in xrange(nslices):
160 147 start = end
161 148 end = start + chunk
162 149 if slop:
163 150 end += 1
164 151 slop -= 1
165 152 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now