##// END OF EJS Templates
workers: implemented worker on windows...
Wojciech Lis -
r35427:02b36e86 default
parent child Browse files
Show More
@@ -1,240 +1,285 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 import threading
14 15
15 16 from .i18n import _
16 17 from . import (
17 18 encoding,
18 19 error,
19 20 pycompat,
20 21 scmutil,
21 22 util,
22 23 )
23 24
24 25 def countcpus():
25 26 '''try to count the number of CPUs on the system'''
26 27
27 28 # posix
28 29 try:
29 30 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
30 31 if n > 0:
31 32 return n
32 33 except (AttributeError, ValueError):
33 34 pass
34 35
35 36 # windows
36 37 try:
37 38 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 39 if n > 0:
39 40 return n
40 41 except (KeyError, ValueError):
41 42 pass
42 43
43 44 return 1
44 45
45 46 def _numworkers(ui):
46 47 s = ui.config('worker', 'numcpus')
47 48 if s:
48 49 try:
49 50 n = int(s)
50 51 if n >= 1:
51 52 return n
52 53 except ValueError:
53 54 raise error.Abort(_('number of cpus must be an integer'))
54 55 return min(max(countcpus(), 4), 32)
55 56
56 if pycompat.isposix:
57 if pycompat.isposix or pycompat.iswindows:
57 58 _startupcost = 0.01
58 59 else:
59 60 _startupcost = 1e30
60 61
61 62 def worthwhile(ui, costperop, nops):
62 63 '''try to determine whether the benefit of multiple processes can
63 64 outweigh the cost of starting them'''
64 65 linear = costperop * nops
65 66 workers = _numworkers(ui)
66 67 benefit = linear - (_startupcost * workers + linear / workers)
67 68 return benefit >= 0.15
68 69
69 70 def worker(ui, costperarg, func, staticargs, args):
70 71 '''run a function, possibly in parallel in multiple worker
71 72 processes.
72 73
73 74 returns a progress iterator
74 75
75 76 costperarg - cost of a single task
76 77
77 78 func - function to run
78 79
79 80 staticargs - arguments to pass to every invocation of the function
80 81
81 82 args - arguments to split into chunks, to pass to individual
82 83 workers
83 84 '''
84 85 if worthwhile(ui, costperarg, len(args)):
85 86 return _platformworker(ui, func, staticargs, args)
86 87 return func(*staticargs + (args,))
87 88
88 89 def _posixworker(ui, func, staticargs, args):
89 90 rfd, wfd = os.pipe()
90 91 workers = _numworkers(ui)
91 92 oldhandler = signal.getsignal(signal.SIGINT)
92 93 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 94 pids, problem = set(), [0]
94 95 def killworkers():
95 96 # unregister SIGCHLD handler as all children will be killed. This
96 97 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 98 # could be updated while iterating, which would cause inconsistency.
98 99 signal.signal(signal.SIGCHLD, oldchldhandler)
99 100 # if one worker bails, there's no good reason to wait for the rest
100 101 for p in pids:
101 102 try:
102 103 os.kill(p, signal.SIGTERM)
103 104 except OSError as err:
104 105 if err.errno != errno.ESRCH:
105 106 raise
106 107 def waitforworkers(blocking=True):
107 108 for pid in pids.copy():
108 109 p = st = 0
109 110 while True:
110 111 try:
111 112 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 113 break
113 114 except OSError as e:
114 115 if e.errno == errno.EINTR:
115 116 continue
116 117 elif e.errno == errno.ECHILD:
117 118 # child would already be reaped, but pids yet been
118 119 # updated (maybe interrupted just after waitpid)
119 120 pids.discard(pid)
120 121 break
121 122 else:
122 123 raise
123 124 if not p:
124 125 # skip subsequent steps, because child process should
125 126 # be still running in this case
126 127 continue
127 128 pids.discard(p)
128 129 st = _exitstatus(st)
129 130 if st and not problem[0]:
130 131 problem[0] = st
131 132 def sigchldhandler(signum, frame):
132 133 waitforworkers(blocking=False)
133 134 if problem[0]:
134 135 killworkers()
135 136 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 137 ui.flush()
137 138 parentpid = os.getpid()
138 139 for pargs in partition(args, workers):
139 140 # make sure we use os._exit in all worker code paths. otherwise the
140 141 # worker may do some clean-ups which could cause surprises like
141 142 # deadlock. see sshpeer.cleanup for example.
142 143 # override error handling *before* fork. this is necessary because
143 144 # exception (signal) may arrive after fork, before "pid =" assignment
144 145 # completes, and other exception handler (dispatch.py) can lead to
145 146 # unexpected code path without os._exit.
146 147 ret = -1
147 148 try:
148 149 pid = os.fork()
149 150 if pid == 0:
150 151 signal.signal(signal.SIGINT, oldhandler)
151 152 signal.signal(signal.SIGCHLD, oldchldhandler)
152 153
153 154 def workerfunc():
154 155 os.close(rfd)
155 156 for i, item in func(*(staticargs + (pargs,))):
156 157 os.write(wfd, '%d %s\n' % (i, item))
157 158 return 0
158 159
159 160 ret = scmutil.callcatch(ui, workerfunc)
160 161 except: # parent re-raises, child never returns
161 162 if os.getpid() == parentpid:
162 163 raise
163 164 exctype = sys.exc_info()[0]
164 165 force = not issubclass(exctype, KeyboardInterrupt)
165 166 ui.traceback(force=force)
166 167 finally:
167 168 if os.getpid() != parentpid:
168 169 try:
169 170 ui.flush()
170 171 except: # never returns, no re-raises
171 172 pass
172 173 finally:
173 174 os._exit(ret & 255)
174 175 pids.add(pid)
175 176 os.close(wfd)
176 177 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
177 178 def cleanup():
178 179 signal.signal(signal.SIGINT, oldhandler)
179 180 waitforworkers()
180 181 signal.signal(signal.SIGCHLD, oldchldhandler)
181 182 status = problem[0]
182 183 if status:
183 184 if status < 0:
184 185 os.kill(os.getpid(), -status)
185 186 sys.exit(status)
186 187 try:
187 188 for line in util.iterfile(fp):
188 189 l = line.split(' ', 1)
189 190 yield int(l[0]), l[1][:-1]
190 191 except: # re-raises
191 192 killworkers()
192 193 cleanup()
193 194 raise
194 195 cleanup()
195 196
196 197 def _posixexitstatus(code):
197 198 '''convert a posix exit status into the same form returned by
198 199 os.spawnv
199 200
200 201 returns None if the process was stopped instead of exiting'''
201 202 if os.WIFEXITED(code):
202 203 return os.WEXITSTATUS(code)
203 204 elif os.WIFSIGNALED(code):
204 205 return -os.WTERMSIG(code)
205 206
206 if not pycompat.iswindows:
207 def _windowsworker(ui, func, staticargs, args):
208 class Worker(threading.Thread):
209 def __init__(self, taskqueue, resultqueue, func, staticargs,
210 group=None, target=None, name=None, verbose=None):
211 threading.Thread.__init__(self, group=group, target=target,
212 name=name, verbose=verbose)
213 self._taskqueue = taskqueue
214 self._resultqueue = resultqueue
215 self._func = func
216 self._staticargs = staticargs
217
218 def run(self):
219 while not self._taskqueue.empty():
220 try:
221 args = self._taskqueue.get_nowait()
222 for res in self._func(*self._staticargs + (args,)):
223 self._resultqueue.put(res)
224 except util.empty:
225 break
226
227 workers = _numworkers(ui)
228 threads = []
229 resultqueue = util.queue()
230 taskqueue = util.queue()
231 # partition work to more pieces than workers to minimize the chance
232 # of uneven distribution of large tasks between the workers
233 for pargs in partition(args, workers * 20):
234 taskqueue.put(pargs)
235 for _i in range(workers):
236 t = Worker(taskqueue, resultqueue, func, staticargs)
237 threads.append(t)
238 t.start()
239 while any(t.is_alive() for t in threads):
240 while not resultqueue.empty():
241 yield resultqueue.get()
242 t = threads[0]
243 t.join(0.05)
244 if not t.is_alive():
245 threads.remove(t)
246 while not resultqueue.empty():
247 yield resultqueue.get()
248
249 if pycompat.iswindows:
250 _platformworker = _windowsworker
251 else:
207 252 _platformworker = _posixworker
208 253 _exitstatus = _posixexitstatus
209 254
210 255 def partition(lst, nslices):
211 256 '''partition a list into N slices of roughly equal size
212 257
213 258 The current strategy takes every Nth element from the input. If
214 259 we ever write workers that need to preserve grouping in input
215 260 we should consider allowing callers to specify a partition strategy.
216 261
217 262 mpm is not a fan of this partitioning strategy when files are involved.
218 263 In his words:
219 264
220 265 Single-threaded Mercurial makes a point of creating and visiting
221 266 files in a fixed order (alphabetical). When creating files in order,
222 267 a typical filesystem is likely to allocate them on nearby regions on
223 268 disk. Thus, when revisiting in the same order, locality is maximized
224 269 and various forms of OS and disk-level caching and read-ahead get a
225 270 chance to work.
226 271
227 272 This effect can be quite significant on spinning disks. I discovered it
228 273 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
229 274 Tarring a repo and copying it to another disk effectively randomized
230 275 the revlog ordering on disk by sorting the revlogs by hash and suddenly
231 276 performance of my kernel checkout benchmark dropped by ~10x because the
232 277 "working set" of sectors visited no longer fit in the drive's cache and
233 278 the workload switched from streaming to random I/O.
234 279
235 280 What we should really be doing is have workers read filenames from a
236 281 ordered queue. This preserves locality and also keeps any worker from
237 282 getting more than one file out of balance.
238 283 '''
239 284 for i in range(nslices):
240 285 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now