##// END OF EJS Templates
worker: manually buffer reads from pickle stream...
worker: manually buffer reads from pickle stream My previous fix (D8051, cb52e619c99e, which added Python's built-in buffering to the pickle stream) has the problem that the selector will ignore the buffer. When multiple pickled objects are read from the pipe into the buffer at once, only one object will be loaded. This can repeat until the buffer is full and delays the processing of completed items until the worker exits, at which point the pipe is always considered readable and all remaining items are processed. This changeset reverts D8051, removing the buffer again. Instead, on Python 3 only, we use a wrapper to modify the "read" provided to the Unpickler to behave more like a buffered read. We never read more bytes from the pipe than the Unpickler requests, so the selector behaves as expected. Also add a test case for "pickle data was truncated" issue. https://phab.mercurial-scm.org/D8051#119193 Differential Revision: https://phab.mercurial-scm.org/D8076

File last commit:

r44751:12491abf stable
r44751:12491abf stable
Show More
test-worker.t
165 lines | 3.9 KiB | text/troff | Tads3Lexer
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 Test UI worker interaction
$ cat > t.py <<EOF
> from __future__ import absolute_import, print_function
Gregory Szorc
py3: suppress unraisable exceptions in test-worker.t...
r44575 > import sys
Jun Wu
test-worker: exercise more about "killworkers" situation...
r32114 > import time
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > from mercurial import (
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > error,
Yuya Nishihara
registrar: move cmdutil.command to registrar module (API)...
r32337 > registrar,
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > ui as uimod,
> worker,
> )
Gregory Szorc
py3: suppress unraisable exceptions in test-worker.t...
r44575 > sys.unraisablehook = lambda x: None
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > def abort(ui, args):
> if args[0] == 0:
> # by first worker for test stability
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > raise error.Abort(b'known exception')
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > return runme(ui, [])
Yuya Nishihara
worker: print traceback for uncaught exception unconditionally...
r32043 > def exc(ui, args):
> if args[0] == 0:
> # by first worker for test stability
> raise Exception('unknown exception')
> return runme(ui, [])
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > def runme(ui, args):
> for arg in args:
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > ui.status(b'run\n')
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > yield 1, arg
Jun Wu
test-worker: exercise more about "killworkers" situation...
r32114 > time.sleep(0.1) # easier to trigger killworkers code path
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > functable = {
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > b'abort': abort,
> b'exc': exc,
> b'runme': runme,
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > }
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > cmdtable = {}
Yuya Nishihara
registrar: move cmdutil.command to registrar module (API)...
r32337 > command = registrar.command(cmdtable)
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > @command(b'test', [], b'hg test [COST] [FUNC]')
> def t(ui, repo, cost=1.0, func=b'runme'):
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > cost = float(cost)
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > func = functable[func]
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > ui.status(b'start\n')
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 > runs = worker.worker(ui, cost, func, (ui,), range(8))
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > for n, i in runs:
> pass
Pulkit Goyal
py3: add b'' prefixes in test-worker.t...
r36200 > ui.status(b'done\n')
David Soria Parra
worker: flush ui buffers before running the worker...
r31696 > EOF
$ abspath=`pwd`/t.py
$ hg init
Run tests with worker enable by forcing a heigh cost
$ hg --config "extensions.t=$abspath" test 100000.0
start
run
run
run
run
run
run
run
run
done
Run tests without worker by forcing a low cost
$ hg --config "extensions.t=$abspath" test 0.0000001
start
run
run
run
run
run
run
run
run
done
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041
Yuya Nishihara
test-worker: disable tests of forked workers on Windows...
r32061 #if no-windows
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 Known exception should be caught, but printed if --traceback is enabled
Jun Wu
test-worker: exercise more about "killworkers" situation...
r32114 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
> test 100000.0 abort 2>&1
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041 start
abort: known exception
Yuya Nishihara
worker: propagate exit code to main process...
r32042 [255]
Yuya Nishihara
dispatch: print traceback in scmutil.callcatch() if --traceback specified...
r32041
Jun Wu
test-worker: exercise more about "killworkers" situation...
r32114 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
Gregory Szorc
py3: add alternate output on Python 3...
r41662 > test 100000.0 abort --traceback 2>&1 | egrep '(SystemExit|Abort)'
raise error.Abort(b'known exception')
Denis Laxalde
py3: add a __str__ method to Abort...
r43720 mercurial.error.Abort: known exception (py3 !)
Gregory Szorc
py3: add alternate output on Python 3...
r41662 Abort: known exception (no-py3 !)
Jun Wu
test-worker: capture tracebacks more reliably...
r32113 SystemExit: 255
Yuya Nishihara
worker: print traceback for uncaught exception unconditionally...
r32043
Traceback must be printed for unknown exceptions
Jun Wu
test-worker: exercise more about "killworkers" situation...
r32114 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
Jun Wu
test-worker: capture tracebacks more reliably...
r32113 > test 100000.0 exc 2>&1 | grep '^Exception'
Exception: unknown exception
Yuya Nishihara
test-worker: disable tests of forked workers on Windows...
r32061
Jun Wu
worker: rewrite error handling so os._exit covers all cases...
r32112 Workers should not do cleanups in all cases
$ cat > $TESTTMP/detectcleanup.py <<EOF
> from __future__ import absolute_import
> import atexit
> import os
Gregory Szorc
py3: suppress unraisable exceptions in test-worker.t...
r44575 > import sys
Jun Wu
worker: rewrite error handling so os._exit covers all cases...
r32112 > import time
Gregory Szorc
py3: suppress unraisable exceptions in test-worker.t...
r44575 > sys.unraisablehook = lambda x: None
Jun Wu
worker: rewrite error handling so os._exit covers all cases...
r32112 > oldfork = os.fork
> count = 0
> parentpid = os.getpid()
> def delayedfork():
> global count
> count += 1
> pid = oldfork()
> # make it easier to test SIGTERM hitting other workers when they have
> # not set up error handling yet.
> if count > 1 and pid == 0:
> time.sleep(0.1)
> return pid
> os.fork = delayedfork
> def cleanup():
> if os.getpid() != parentpid:
> os.write(1, 'should never happen\n')
> atexit.register(cleanup)
> EOF
$ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
> "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
start
abort: known exception
[255]
Jan Alexander Steffens (heftig)
worker: manually buffer reads from pickle stream...
r44751 Do not crash on partially read result
$ cat > $TESTTMP/detecttruncated.py <<EOF
> from __future__ import absolute_import
> import os
> import sys
> import time
> sys.unraisablehook = lambda x: None
> oldwrite = os.write
> def splitwrite(fd, string):
> ret = oldwrite(fd, string[:9])
> if ret == 9:
> time.sleep(0.1)
> ret += oldwrite(fd, string[9:])
> return ret
> os.write = splitwrite
> EOF
$ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
> "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
start
run
run
run
run
run
run
run
run
done
Yuya Nishihara
test-worker: disable tests of forked workers on Windows...
r32061 #endif