##// END OF EJS Templates
worker: implement _blockingreader.readinto() (issue6444)...
Manuel Jacob -
r50128:52072252 default
parent child Browse files
Show More
@@ -1,485 +1,471 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 67 def ismainthread():
68 68 return threading.current_thread() == threading.main_thread()
69 69
70 70
71 71 class _blockingreader:
72 72 """Wrap unbuffered stream such that pickle.load() works with it.
73 73
74 74 pickle.load() expects that calls to read() and readinto() read as many
75 75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 76 pickle.load() raises an EOFError.
77 77 """
78 78
79 79 def __init__(self, wrapped):
80 80 self._wrapped = wrapped
81 81
82 # Do NOT implement readinto() by making it delegate to
83 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
84 # with just read() and readline(), so we don't need to implement it.
85
86 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
87
88 # This is required for python 3.8, prior to 3.8.2. See issue6444.
89 def readinto(self, b):
90 pos = 0
91 size = len(b)
92
93 while pos < size:
94 ret = self._wrapped.readinto(b[pos:])
95 if not ret:
96 break
97 pos += ret
98
99 return pos
100
101 82 def readline(self):
102 83 return self._wrapped.readline()
103 84
104 # issue multiple reads until size is fulfilled (or EOF is encountered)
105 def read(self, size=-1):
106 if size < 0:
107 return self._wrapped.readall()
108
109 buf = bytearray(size)
85 def readinto(self, buf):
110 86 pos = 0
87 size = len(buf)
111 88
112 89 with memoryview(buf) as view:
113 90 while pos < size:
114 91 with view[pos:] as subview:
115 92 ret = self._wrapped.readinto(subview)
116 93 if not ret:
117 94 break
118 95 pos += ret
119 96
120 del buf[pos:]
97 return pos
98
99 # issue multiple reads until size is fulfilled (or EOF is encountered)
100 def read(self, size=-1):
101 if size < 0:
102 return self._wrapped.readall()
103
104 buf = bytearray(size)
105 n_read = self.readinto(buf)
106 del buf[n_read:]
121 107 return bytes(buf)
122 108
123 109
124 110 if pycompat.isposix or pycompat.iswindows:
125 111 _STARTUP_COST = 0.01
126 112 # The Windows worker is thread based. If tasks are CPU bound, threads
127 113 # in the presence of the GIL result in excessive context switching and
128 114 # this overhead can slow down execution.
129 115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
130 116 else:
131 117 _STARTUP_COST = 1e30
132 118 _DISALLOW_THREAD_UNSAFE = False
133 119
134 120
135 121 def worthwhile(ui, costperop, nops, threadsafe=True):
136 122 """try to determine whether the benefit of multiple processes can
137 123 outweigh the cost of starting them"""
138 124
139 125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
140 126 return False
141 127
142 128 linear = costperop * nops
143 129 workers = _numworkers(ui)
144 130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
145 131 return benefit >= 0.15
146 132
147 133
148 134 def worker(
149 135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
150 136 ):
151 137 """run a function, possibly in parallel in multiple worker
152 138 processes.
153 139
154 140 returns a progress iterator
155 141
156 142 costperarg - cost of a single task
157 143
158 144 func - function to run. It is expected to return a progress iterator.
159 145
160 146 staticargs - arguments to pass to every invocation of the function
161 147
162 148 args - arguments to split into chunks, to pass to individual
163 149 workers
164 150
165 151 hasretval - when True, func and the current function return an progress
166 152 iterator then a dict (encoded as an iterator that yield many (False, ..)
167 153 then a (True, dict)). The dicts are joined in some arbitrary order, so
168 154 overlapping keys are a bad idea.
169 155
170 156 threadsafe - whether work items are thread safe and can be executed using
171 157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
172 158 release the GIL.
173 159 """
174 160 enabled = ui.configbool(b'worker', b'enabled')
175 161 if enabled and _platformworker is _posixworker and not ismainthread():
176 162 # The POSIX worker has to install a handler for SIGCHLD.
177 163 # Python up to 3.9 only allows this in the main thread.
178 164 enabled = False
179 165
180 166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
181 167 return _platformworker(ui, func, staticargs, args, hasretval)
182 168 return func(*staticargs + (args,))
183 169
184 170
185 171 def _posixworker(ui, func, staticargs, args, hasretval):
186 172 workers = _numworkers(ui)
187 173 oldhandler = signal.getsignal(signal.SIGINT)
188 174 signal.signal(signal.SIGINT, signal.SIG_IGN)
189 175 pids, problem = set(), [0]
190 176
191 177 def killworkers():
192 178 # unregister SIGCHLD handler as all children will be killed. This
193 179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
194 180 # could be updated while iterating, which would cause inconsistency.
195 181 signal.signal(signal.SIGCHLD, oldchldhandler)
196 182 # if one worker bails, there's no good reason to wait for the rest
197 183 for p in pids:
198 184 try:
199 185 os.kill(p, signal.SIGTERM)
200 186 except OSError as err:
201 187 if err.errno != errno.ESRCH:
202 188 raise
203 189
204 190 def waitforworkers(blocking=True):
205 191 for pid in pids.copy():
206 192 p = st = 0
207 193 while True:
208 194 try:
209 195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
210 196 break
211 197 except OSError as e:
212 198 if e.errno == errno.EINTR:
213 199 continue
214 200 elif e.errno == errno.ECHILD:
215 201 # child would already be reaped, but pids yet been
216 202 # updated (maybe interrupted just after waitpid)
217 203 pids.discard(pid)
218 204 break
219 205 else:
220 206 raise
221 207 if not p:
222 208 # skip subsequent steps, because child process should
223 209 # be still running in this case
224 210 continue
225 211 pids.discard(p)
226 212 st = _exitstatus(st)
227 213 if st and not problem[0]:
228 214 problem[0] = st
229 215
230 216 def sigchldhandler(signum, frame):
231 217 waitforworkers(blocking=False)
232 218 if problem[0]:
233 219 killworkers()
234 220
235 221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
236 222 ui.flush()
237 223 parentpid = os.getpid()
238 224 pipes = []
239 225 retval = {}
240 226 for pargs in partition(args, min(workers, len(args))):
241 227 # Every worker gets its own pipe to send results on, so we don't have to
242 228 # implement atomic writes larger than PIPE_BUF. Each forked process has
243 229 # its own pipe's descriptors in the local variables, and the parent
244 230 # process has the full list of pipe descriptors (and it doesn't really
245 231 # care what order they're in).
246 232 rfd, wfd = os.pipe()
247 233 pipes.append((rfd, wfd))
248 234 # make sure we use os._exit in all worker code paths. otherwise the
249 235 # worker may do some clean-ups which could cause surprises like
250 236 # deadlock. see sshpeer.cleanup for example.
251 237 # override error handling *before* fork. this is necessary because
252 238 # exception (signal) may arrive after fork, before "pid =" assignment
253 239 # completes, and other exception handler (dispatch.py) can lead to
254 240 # unexpected code path without os._exit.
255 241 ret = -1
256 242 try:
257 243 pid = os.fork()
258 244 if pid == 0:
259 245 signal.signal(signal.SIGINT, oldhandler)
260 246 signal.signal(signal.SIGCHLD, oldchldhandler)
261 247
262 248 def workerfunc():
263 249 for r, w in pipes[:-1]:
264 250 os.close(r)
265 251 os.close(w)
266 252 os.close(rfd)
267 253 for result in func(*(staticargs + (pargs,))):
268 254 os.write(wfd, pickle.dumps(result))
269 255 return 0
270 256
271 257 ret = scmutil.callcatch(ui, workerfunc)
272 258 except: # parent re-raises, child never returns
273 259 if os.getpid() == parentpid:
274 260 raise
275 261 exctype = sys.exc_info()[0]
276 262 force = not issubclass(exctype, KeyboardInterrupt)
277 263 ui.traceback(force=force)
278 264 finally:
279 265 if os.getpid() != parentpid:
280 266 try:
281 267 ui.flush()
282 268 except: # never returns, no re-raises
283 269 pass
284 270 finally:
285 271 os._exit(ret & 255)
286 272 pids.add(pid)
287 273 selector = selectors.DefaultSelector()
288 274 for rfd, wfd in pipes:
289 275 os.close(wfd)
290 276 # The stream has to be unbuffered. Otherwise, if all data is read from
291 277 # the raw file into the buffer, the selector thinks that the FD is not
292 278 # ready to read while pickle.load() could read from the buffer. This
293 279 # would delay the processing of readable items.
294 280 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
295 281
296 282 def cleanup():
297 283 signal.signal(signal.SIGINT, oldhandler)
298 284 waitforworkers()
299 285 signal.signal(signal.SIGCHLD, oldchldhandler)
300 286 selector.close()
301 287 return problem[0]
302 288
303 289 try:
304 290 openpipes = len(pipes)
305 291 while openpipes > 0:
306 292 for key, events in selector.select():
307 293 try:
308 294 # The pytype error likely goes away on a modern version of
309 295 # pytype having a modern typeshed snapshot.
310 296 # pytype: disable=wrong-arg-types
311 297 res = pickle.load(_blockingreader(key.fileobj))
312 298 # pytype: enable=wrong-arg-types
313 299 if hasretval and res[0]:
314 300 retval.update(res[1])
315 301 else:
316 302 yield res
317 303 except EOFError:
318 304 selector.unregister(key.fileobj)
319 305 key.fileobj.close()
320 306 openpipes -= 1
321 307 except IOError as e:
322 308 if e.errno == errno.EINTR:
323 309 continue
324 310 raise
325 311 except: # re-raises
326 312 killworkers()
327 313 cleanup()
328 314 raise
329 315 status = cleanup()
330 316 if status:
331 317 if status < 0:
332 318 os.kill(os.getpid(), -status)
333 319 raise error.WorkerError(status)
334 320 if hasretval:
335 321 yield True, retval
336 322
337 323
338 324 def _posixexitstatus(code):
339 325 """convert a posix exit status into the same form returned by
340 326 os.spawnv
341 327
342 328 returns None if the process was stopped instead of exiting"""
343 329 if os.WIFEXITED(code):
344 330 return os.WEXITSTATUS(code)
345 331 elif os.WIFSIGNALED(code):
346 332 return -(os.WTERMSIG(code))
347 333
348 334
349 335 def _windowsworker(ui, func, staticargs, args, hasretval):
350 336 class Worker(threading.Thread):
351 337 def __init__(
352 338 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
353 339 ):
354 340 threading.Thread.__init__(self, *args, **kwargs)
355 341 self._taskqueue = taskqueue
356 342 self._resultqueue = resultqueue
357 343 self._func = func
358 344 self._staticargs = staticargs
359 345 self._interrupted = False
360 346 self.daemon = True
361 347 self.exception = None
362 348
363 349 def interrupt(self):
364 350 self._interrupted = True
365 351
366 352 def run(self):
367 353 try:
368 354 while not self._taskqueue.empty():
369 355 try:
370 356 args = self._taskqueue.get_nowait()
371 357 for res in self._func(*self._staticargs + (args,)):
372 358 self._resultqueue.put(res)
373 359 # threading doesn't provide a native way to
374 360 # interrupt execution. handle it manually at every
375 361 # iteration.
376 362 if self._interrupted:
377 363 return
378 364 except pycompat.queue.Empty:
379 365 break
380 366 except Exception as e:
381 367 # store the exception such that the main thread can resurface
382 368 # it as if the func was running without workers.
383 369 self.exception = e
384 370 raise
385 371
386 372 threads = []
387 373
388 374 def trykillworkers():
389 375 # Allow up to 1 second to clean worker threads nicely
390 376 cleanupend = time.time() + 1
391 377 for t in threads:
392 378 t.interrupt()
393 379 for t in threads:
394 380 remainingtime = cleanupend - time.time()
395 381 t.join(remainingtime)
396 382 if t.is_alive():
397 383 # pass over the workers joining failure. it is more
398 384 # important to surface the inital exception than the
399 385 # fact that one of workers may be processing a large
400 386 # task and does not get to handle the interruption.
401 387 ui.warn(
402 388 _(
403 389 b"failed to kill worker threads while "
404 390 b"handling an exception\n"
405 391 )
406 392 )
407 393 return
408 394
409 395 workers = _numworkers(ui)
410 396 resultqueue = pycompat.queue.Queue()
411 397 taskqueue = pycompat.queue.Queue()
412 398 retval = {}
413 399 # partition work to more pieces than workers to minimize the chance
414 400 # of uneven distribution of large tasks between the workers
415 401 for pargs in partition(args, workers * 20):
416 402 taskqueue.put(pargs)
417 403 for _i in range(workers):
418 404 t = Worker(taskqueue, resultqueue, func, staticargs)
419 405 threads.append(t)
420 406 t.start()
421 407 try:
422 408 while len(threads) > 0:
423 409 while not resultqueue.empty():
424 410 res = resultqueue.get()
425 411 if hasretval and res[0]:
426 412 retval.update(res[1])
427 413 else:
428 414 yield res
429 415 threads[0].join(0.05)
430 416 finishedthreads = [_t for _t in threads if not _t.is_alive()]
431 417 for t in finishedthreads:
432 418 if t.exception is not None:
433 419 raise t.exception
434 420 threads.remove(t)
435 421 except (Exception, KeyboardInterrupt): # re-raises
436 422 trykillworkers()
437 423 raise
438 424 while not resultqueue.empty():
439 425 res = resultqueue.get()
440 426 if hasretval and res[0]:
441 427 retval.update(res[1])
442 428 else:
443 429 yield res
444 430 if hasretval:
445 431 yield True, retval
446 432
447 433
448 434 if pycompat.iswindows:
449 435 _platformworker = _windowsworker
450 436 else:
451 437 _platformworker = _posixworker
452 438 _exitstatus = _posixexitstatus
453 439
454 440
455 441 def partition(lst, nslices):
456 442 """partition a list into N slices of roughly equal size
457 443
458 444 The current strategy takes every Nth element from the input. If
459 445 we ever write workers that need to preserve grouping in input
460 446 we should consider allowing callers to specify a partition strategy.
461 447
462 448 olivia is not a fan of this partitioning strategy when files are involved.
463 449 In his words:
464 450
465 451 Single-threaded Mercurial makes a point of creating and visiting
466 452 files in a fixed order (alphabetical). When creating files in order,
467 453 a typical filesystem is likely to allocate them on nearby regions on
468 454 disk. Thus, when revisiting in the same order, locality is maximized
469 455 and various forms of OS and disk-level caching and read-ahead get a
470 456 chance to work.
471 457
472 458 This effect can be quite significant on spinning disks. I discovered it
473 459 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
474 460 Tarring a repo and copying it to another disk effectively randomized
475 461 the revlog ordering on disk by sorting the revlogs by hash and suddenly
476 462 performance of my kernel checkout benchmark dropped by ~10x because the
477 463 "working set" of sectors visited no longer fit in the drive's cache and
478 464 the workload switched from streaming to random I/O.
479 465
480 466 What we should really be doing is have workers read filenames from a
481 467 ordered queue. This preserves locality and also keeps any worker from
482 468 getting more than one file out of balance.
483 469 """
484 470 for i in range(nslices):
485 471 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now