##// END OF EJS Templates
worker: handle interrupt on windows...
Wojciech Lis -
r35469:44fd4cfc @6 default
parent child Browse files
Show More
@@ -1,327 +1,327 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 from .i18n import _
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 scmutil,
23 23 util,
24 24 )
25 25
26 26 def countcpus():
27 27 '''try to count the number of CPUs on the system'''
28 28
29 29 # posix
30 30 try:
31 31 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
32 32 if n > 0:
33 33 return n
34 34 except (AttributeError, ValueError):
35 35 pass
36 36
37 37 # windows
38 38 try:
39 39 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
40 40 if n > 0:
41 41 return n
42 42 except (KeyError, ValueError):
43 43 pass
44 44
45 45 return 1
46 46
47 47 def _numworkers(ui):
48 48 s = ui.config('worker', 'numcpus')
49 49 if s:
50 50 try:
51 51 n = int(s)
52 52 if n >= 1:
53 53 return n
54 54 except ValueError:
55 55 raise error.Abort(_('number of cpus must be an integer'))
56 56 return min(max(countcpus(), 4), 32)
57 57
58 58 if pycompat.isposix or pycompat.iswindows:
59 59 _startupcost = 0.01
60 60 else:
61 61 _startupcost = 1e30
62 62
63 63 def worthwhile(ui, costperop, nops):
64 64 '''try to determine whether the benefit of multiple processes can
65 65 outweigh the cost of starting them'''
66 66 linear = costperop * nops
67 67 workers = _numworkers(ui)
68 68 benefit = linear - (_startupcost * workers + linear / workers)
69 69 return benefit >= 0.15
70 70
71 71 def worker(ui, costperarg, func, staticargs, args):
72 72 '''run a function, possibly in parallel in multiple worker
73 73 processes.
74 74
75 75 returns a progress iterator
76 76
77 77 costperarg - cost of a single task
78 78
79 79 func - function to run
80 80
81 81 staticargs - arguments to pass to every invocation of the function
82 82
83 83 args - arguments to split into chunks, to pass to individual
84 84 workers
85 85 '''
86 86 enabled = ui.configbool('worker', 'enabled')
87 87 if enabled and worthwhile(ui, costperarg, len(args)):
88 88 return _platformworker(ui, func, staticargs, args)
89 89 return func(*staticargs + (args,))
90 90
91 91 def _posixworker(ui, func, staticargs, args):
92 92 rfd, wfd = os.pipe()
93 93 workers = _numworkers(ui)
94 94 oldhandler = signal.getsignal(signal.SIGINT)
95 95 signal.signal(signal.SIGINT, signal.SIG_IGN)
96 96 pids, problem = set(), [0]
97 97 def killworkers():
98 98 # unregister SIGCHLD handler as all children will be killed. This
99 99 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
100 100 # could be updated while iterating, which would cause inconsistency.
101 101 signal.signal(signal.SIGCHLD, oldchldhandler)
102 102 # if one worker bails, there's no good reason to wait for the rest
103 103 for p in pids:
104 104 try:
105 105 os.kill(p, signal.SIGTERM)
106 106 except OSError as err:
107 107 if err.errno != errno.ESRCH:
108 108 raise
109 109 def waitforworkers(blocking=True):
110 110 for pid in pids.copy():
111 111 p = st = 0
112 112 while True:
113 113 try:
114 114 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
115 115 break
116 116 except OSError as e:
117 117 if e.errno == errno.EINTR:
118 118 continue
119 119 elif e.errno == errno.ECHILD:
120 120 # child would already be reaped, but pids yet been
121 121 # updated (maybe interrupted just after waitpid)
122 122 pids.discard(pid)
123 123 break
124 124 else:
125 125 raise
126 126 if not p:
127 127 # skip subsequent steps, because child process should
128 128 # be still running in this case
129 129 continue
130 130 pids.discard(p)
131 131 st = _exitstatus(st)
132 132 if st and not problem[0]:
133 133 problem[0] = st
134 134 def sigchldhandler(signum, frame):
135 135 waitforworkers(blocking=False)
136 136 if problem[0]:
137 137 killworkers()
138 138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 139 ui.flush()
140 140 parentpid = os.getpid()
141 141 for pargs in partition(args, workers):
142 142 # make sure we use os._exit in all worker code paths. otherwise the
143 143 # worker may do some clean-ups which could cause surprises like
144 144 # deadlock. see sshpeer.cleanup for example.
145 145 # override error handling *before* fork. this is necessary because
146 146 # exception (signal) may arrive after fork, before "pid =" assignment
147 147 # completes, and other exception handler (dispatch.py) can lead to
148 148 # unexpected code path without os._exit.
149 149 ret = -1
150 150 try:
151 151 pid = os.fork()
152 152 if pid == 0:
153 153 signal.signal(signal.SIGINT, oldhandler)
154 154 signal.signal(signal.SIGCHLD, oldchldhandler)
155 155
156 156 def workerfunc():
157 157 os.close(rfd)
158 158 for i, item in func(*(staticargs + (pargs,))):
159 159 os.write(wfd, '%d %s\n' % (i, item))
160 160 return 0
161 161
162 162 ret = scmutil.callcatch(ui, workerfunc)
163 163 except: # parent re-raises, child never returns
164 164 if os.getpid() == parentpid:
165 165 raise
166 166 exctype = sys.exc_info()[0]
167 167 force = not issubclass(exctype, KeyboardInterrupt)
168 168 ui.traceback(force=force)
169 169 finally:
170 170 if os.getpid() != parentpid:
171 171 try:
172 172 ui.flush()
173 173 except: # never returns, no re-raises
174 174 pass
175 175 finally:
176 176 os._exit(ret & 255)
177 177 pids.add(pid)
178 178 os.close(wfd)
179 179 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
180 180 def cleanup():
181 181 signal.signal(signal.SIGINT, oldhandler)
182 182 waitforworkers()
183 183 signal.signal(signal.SIGCHLD, oldchldhandler)
184 184 status = problem[0]
185 185 if status:
186 186 if status < 0:
187 187 os.kill(os.getpid(), -status)
188 188 sys.exit(status)
189 189 try:
190 190 for line in util.iterfile(fp):
191 191 l = line.split(' ', 1)
192 192 yield int(l[0]), l[1][:-1]
193 193 except: # re-raises
194 194 killworkers()
195 195 cleanup()
196 196 raise
197 197 cleanup()
198 198
199 199 def _posixexitstatus(code):
200 200 '''convert a posix exit status into the same form returned by
201 201 os.spawnv
202 202
203 203 returns None if the process was stopped instead of exiting'''
204 204 if os.WIFEXITED(code):
205 205 return os.WEXITSTATUS(code)
206 206 elif os.WIFSIGNALED(code):
207 207 return -os.WTERMSIG(code)
208 208
209 209 def _windowsworker(ui, func, staticargs, args):
210 210 class Worker(threading.Thread):
211 211 def __init__(self, taskqueue, resultqueue, func, staticargs,
212 212 group=None, target=None, name=None, verbose=None):
213 213 threading.Thread.__init__(self, group=group, target=target,
214 214 name=name, verbose=verbose)
215 215 self._taskqueue = taskqueue
216 216 self._resultqueue = resultqueue
217 217 self._func = func
218 218 self._staticargs = staticargs
219 219 self._interrupted = False
220 220 self.daemon = True
221 221 self.exception = None
222 222
223 223 def interrupt(self):
224 224 self._interrupted = True
225 225
226 226 def run(self):
227 227 try:
228 228 while not self._taskqueue.empty():
229 229 try:
230 230 args = self._taskqueue.get_nowait()
231 231 for res in self._func(*self._staticargs + (args,)):
232 232 self._resultqueue.put(res)
233 233 # threading doesn't provide a native way to
234 234 # interrupt execution. handle it manually at every
235 235 # iteration.
236 236 if self._interrupted:
237 237 return
238 238 except util.empty:
239 239 break
240 240 except Exception as e:
241 241 # store the exception such that the main thread can resurface
242 242 # it as if the func was running without workers.
243 243 self.exception = e
244 244 raise
245 245
246 246 threads = []
247 247 def trykillworkers():
248 248 # Allow up to 1 second to clean worker threads nicely
249 249 cleanupend = time.time() + 1
250 250 for t in threads:
251 251 t.interrupt()
252 252 for t in threads:
253 253 remainingtime = cleanupend - time.time()
254 254 t.join(remainingtime)
255 255 if t.is_alive():
256 256 # pass over the workers joining failure. it is more
257 257 # important to surface the inital exception than the
258 258 # fact that one of workers may be processing a large
259 259 # task and does not get to handle the interruption.
260 260 ui.warn(_("failed to kill worker threads while "
261 261 "handling an exception\n"))
262 262 return
263 263
264 264 workers = _numworkers(ui)
265 265 resultqueue = util.queue()
266 266 taskqueue = util.queue()
267 267 # partition work to more pieces than workers to minimize the chance
268 268 # of uneven distribution of large tasks between the workers
269 269 for pargs in partition(args, workers * 20):
270 270 taskqueue.put(pargs)
271 271 for _i in range(workers):
272 272 t = Worker(taskqueue, resultqueue, func, staticargs)
273 273 threads.append(t)
274 274 t.start()
275 275 try:
276 276 while len(threads) > 0:
277 277 while not resultqueue.empty():
278 278 yield resultqueue.get()
279 279 threads[0].join(0.05)
280 280 finishedthreads = [_t for _t in threads if not _t.is_alive()]
281 281 for t in finishedthreads:
282 282 if t.exception is not None:
283 283 raise t.exception
284 284 threads.remove(t)
285 except Exception: # re-raises
285 except (Exception, KeyboardInterrupt): # re-raises
286 286 trykillworkers()
287 287 raise
288 288 while not resultqueue.empty():
289 289 yield resultqueue.get()
290 290
291 291 if pycompat.iswindows:
292 292 _platformworker = _windowsworker
293 293 else:
294 294 _platformworker = _posixworker
295 295 _exitstatus = _posixexitstatus
296 296
297 297 def partition(lst, nslices):
298 298 '''partition a list into N slices of roughly equal size
299 299
300 300 The current strategy takes every Nth element from the input. If
301 301 we ever write workers that need to preserve grouping in input
302 302 we should consider allowing callers to specify a partition strategy.
303 303
304 304 mpm is not a fan of this partitioning strategy when files are involved.
305 305 In his words:
306 306
307 307 Single-threaded Mercurial makes a point of creating and visiting
308 308 files in a fixed order (alphabetical). When creating files in order,
309 309 a typical filesystem is likely to allocate them on nearby regions on
310 310 disk. Thus, when revisiting in the same order, locality is maximized
311 311 and various forms of OS and disk-level caching and read-ahead get a
312 312 chance to work.
313 313
314 314 This effect can be quite significant on spinning disks. I discovered it
315 315 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
316 316 Tarring a repo and copying it to another disk effectively randomized
317 317 the revlog ordering on disk by sorting the revlogs by hash and suddenly
318 318 performance of my kernel checkout benchmark dropped by ~10x because the
319 319 "working set" of sectors visited no longer fit in the drive's cache and
320 320 the workload switched from streaming to random I/O.
321 321
322 322 What we should really be doing is have workers read filenames from a
323 323 ordered queue. This preserves locality and also keeps any worker from
324 324 getting more than one file out of balance.
325 325 '''
326 326 for i in range(nslices):
327 327 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now