# HG changeset patch # User Jun Wu # Date 2017-04-22 23:50:08 # Node ID 31763785094bb1bf604245fa22a7d9960662477c # Parent 1208b74841ffee824163a22724cee2edc99aa030 worker: rewrite error handling so os._exit covers all cases Previously the worker error handling is like: pid = os.fork() --+ if pid == 0: | .... | problematic .... --+ try: --+ .... | worker error handling --+ If a signal arrives when Python is executing the "problematic" lines, an external error handling (dispatch.py) will take over the control flow and it's no longer guaranteed "os._exit" is called (see 86cd09bc13ba for why it is necessary). This patch rewrites the error handling so it covers all possible code paths for a worker even during fork. Note: "os.getpid() == parentpid" is used to test if the process is parent or not intentionally, instead of checking "pid", because "pid = os.fork()" may be not atomic - it's possible that that a signal hits the worker before the assignment completes [1]. The newly added test replaces "os.fork" to exercise that extreme case. [1]: CPython compiles "pid = os.fork()" to 2 byte codes: "CALL_FUNCTION" and "STORE_FAST", so it's probably not atomic: def f(): pid = os.fork() dis.dis(f) 2 0 LOAD_GLOBAL 0 (os) 3 LOAD_ATTR 1 (fork) 6 CALL_FUNCTION 0 9 STORE_FAST 0 (pid) 12 LOAD_CONST 0 (None) 15 RETURN_VALUE diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -134,37 +134,43 @@ def _posixworker(ui, func, staticargs, a killworkers() oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() + parentpid = os.getpid() for pargs in partition(args, workers): - pid = os.fork() - if pid == 0: - signal.signal(signal.SIGINT, oldhandler) - signal.signal(signal.SIGCHLD, oldchldhandler) - - def workerfunc(): - os.close(rfd) - for i, item in func(*(staticargs + (pargs,))): - os.write(wfd, '%d %s\n' % (i, item)) - return 0 + # make sure we use os._exit in all worker code paths. otherwise the + # worker may do some clean-ups which could cause surprises like + # deadlock. see sshpeer.cleanup for example. + # override error handling *before* fork. this is necessary because + # exception (signal) may arrive after fork, before "pid =" assignment + # completes, and other exception handler (dispatch.py) can lead to + # unexpected code path without os._exit. + ret = -1 + try: + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGINT, oldhandler) + signal.signal(signal.SIGCHLD, oldchldhandler) - # make sure we use os._exit in all code paths. otherwise the worker - # may do some clean-ups which could cause surprises like deadlock. - # see sshpeer.cleanup for example. - ret = 0 - try: + def workerfunc(): + os.close(rfd) + for i, item in func(*(staticargs + (pargs,))): + os.write(wfd, '%d %s\n' % (i, item)) + return 0 + + ret = scmutil.callcatch(ui, workerfunc) + except: # parent re-raises, child never returns + if os.getpid() == parentpid: + raise + exctype = sys.exc_info()[0] + force = not issubclass(exctype, KeyboardInterrupt) + ui.traceback(force=force) + finally: + if os.getpid() != parentpid: try: - ret = scmutil.callcatch(ui, workerfunc) - finally: ui.flush() - except KeyboardInterrupt: - os._exit(255) - except: # never return, therefore no re-raises - try: - ui.traceback(force=True) - ui.flush() + except: # never returns, no re-raises + pass finally: - os._exit(255) - else: - os._exit(ret & 255) + os._exit(ret & 255) pids.add(pid) os.close(wfd) fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0) diff --git a/tests/test-worker.t b/tests/test-worker.t --- a/tests/test-worker.t +++ b/tests/test-worker.t @@ -91,4 +91,36 @@ Traceback must be printed for unknown ex > test 100000.0 exc 2>&1 | grep '^Traceback' Traceback (most recent call last): +Workers should not do cleanups in all cases + + $ cat > $TESTTMP/detectcleanup.py < from __future__ import absolute_import + > import atexit + > import os + > import time + > oldfork = os.fork + > count = 0 + > parentpid = os.getpid() + > def delayedfork(): + > global count + > count += 1 + > pid = oldfork() + > # make it easier to test SIGTERM hitting other workers when they have + > # not set up error handling yet. + > if count > 1 and pid == 0: + > time.sleep(0.1) + > return pid + > os.fork = delayedfork + > def cleanup(): + > if os.getpid() != parentpid: + > os.write(1, 'should never happen\n') + > atexit.register(cleanup) + > EOF + + $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \ + > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort + start + abort: known exception + [255] + #endif