##// END OF EJS Templates
Merge pull request #13348 from minrk/run-async-tests...
Matthias Bussonnier -
r27389:f162b26d merge
parent child Browse files
Show More
@@ -1,10 +1,12 b''
1 import types
2 import sys
3 1 import builtins
2 import inspect
4 3 import os
5 import pytest
6 4 import pathlib
7 5 import shutil
6 import sys
7 import types
8
9 import pytest
8 10
9 11 # Must register before it gets imported
10 12 pytest.register_assert_rewrite("IPython.testing.tools")
@@ -12,6 +14,19 b' pytest.register_assert_rewrite("IPython.testing.tools")'
12 14 from .testing import tools
13 15
14 16
17 def pytest_collection_modifyitems(items):
18 """This function is automatically run by pytest passing all collected test
19 functions.
20
21 We use it to add asyncio marker to all async tests and assert we don't use
22 test functions that are async generators which wouldn't make sense.
23 """
24 for item in items:
25 if inspect.iscoroutinefunction(item.obj):
26 item.add_marker("asyncio")
27 assert not inspect.isasyncgenfunction(item.obj)
28
29
15 30 def get_ipython():
16 31 from .terminal.interactiveshell import TerminalInteractiveShell
17 32 if TerminalInteractiveShell._instance:
@@ -32,7 +47,7 b' def work_path():'
32 47 if path.exists():
33 48 raise ValueError('IPython dir temporary path already exists ! Did previous test run exit successfully ?')
34 49 path.mkdir()
35 yield
50 yield
36 51 shutil.rmtree(str(path.resolve()))
37 52
38 53
@@ -14,26 +14,46 b' Python semantics.'
14 14 import ast
15 15 import asyncio
16 16 import inspect
17 from functools import wraps
17 18
19 _asyncio_event_loop = None
18 20
19 class _AsyncIORunner:
20 def __init__(self):
21 self._loop = None
22
23 @property
24 def loop(self):
25 """Always returns a non-closed event loop"""
26 if self._loop is None or self._loop.is_closed():
27 policy = asyncio.get_event_loop_policy()
28 self._loop = policy.new_event_loop()
29 policy.set_event_loop(self._loop)
30 return self._loop
31 21
22 def get_asyncio_loop():
23 """asyncio has deprecated get_event_loop
24
25 Replicate it here, with our desired semantics:
26
27 - always returns a valid, not-closed loop
28 - not thread-local like asyncio's,
29 because we only want one loop for IPython
30 - if called from inside a coroutine (e.g. in ipykernel),
31 return the running loop
32
33 .. versionadded:: 8.0
34 """
35 try:
36 return asyncio.get_running_loop()
37 except RuntimeError:
38 # not inside a coroutine,
39 # track our own global
40 pass
41
42 # not thread-local like asyncio's,
43 # because we only track one event loop to run for IPython itself,
44 # always in the main thread.
45 global _asyncio_event_loop
46 if _asyncio_event_loop is None or _asyncio_event_loop.is_closed():
47 _asyncio_event_loop = asyncio.new_event_loop()
48 return _asyncio_event_loop
49
50
51 class _AsyncIORunner:
32 52 def __call__(self, coro):
33 53 """
34 54 Handler for asyncio autoawait
35 55 """
36 return self.loop.run_until_complete(coro)
56 return get_asyncio_loop().run_until_complete(coro)
37 57
38 58 def __str__(self):
39 59 return "asyncio"
@@ -42,6 +62,39 b' class _AsyncIORunner:'
42 62 _asyncio_runner = _AsyncIORunner()
43 63
44 64
65 class _AsyncIOProxy:
66 """Proxy-object for an asyncio
67
68 Any coroutine methods will be wrapped in event_loop.run_
69 """
70
71 def __init__(self, obj, event_loop):
72 self._obj = obj
73 self._event_loop = event_loop
74
75 def __repr__(self):
76 return f"<_AsyncIOProxy({self._obj!r})>"
77
78 def __getattr__(self, key):
79 attr = getattr(self._obj, key)
80 if inspect.iscoroutinefunction(attr):
81 # if it's a coroutine method,
82 # return a threadsafe wrapper onto the _current_ asyncio loop
83 @wraps(attr)
84 def _wrapped(*args, **kwargs):
85 concurrent_future = asyncio.run_coroutine_threadsafe(
86 attr(*args, **kwargs), self._event_loop
87 )
88 return asyncio.wrap_future(concurrent_future)
89
90 return _wrapped
91 else:
92 return attr
93
94 def __dir__(self):
95 return dir(self._obj)
96
97
45 98 def _curio_runner(coroutine):
46 99 """
47 100 handler for curio autoawait
@@ -6,19 +6,18 b''
6 6 import asyncio
7 7 import atexit
8 8 import errno
9 import functools
10 9 import os
11 10 import signal
12 11 import sys
13 12 import time
14 from contextlib import contextmanager
15 13 from subprocess import CalledProcessError
14 from threading import Thread
16 15
17 from traitlets import Dict, List, default
16 from traitlets import Any, Dict, List, default
18 17
19 18 from IPython.core import magic_arguments
19 from IPython.core.async_helpers import _AsyncIOProxy
20 20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 21 from IPython.utils.process import arg_split
23 22
24 23 #-----------------------------------------------------------------------------
@@ -57,7 +56,7 b' def script_args(f):'
57 56 ),
58 57 magic_arguments.argument(
59 58 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
59 help="""Whether you should raise an error message in addition to
61 60 a stream on stderr if you get a nonzero exit code.
62 61 """
63 62 )
@@ -67,48 +66,6 b' def script_args(f):'
67 66 return f
68 67
69 68
70 @contextmanager
71 def safe_watcher():
72 if sys.platform == "win32":
73 yield
74 return
75
76 from asyncio import SafeChildWatcher
77
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
82 return
83
84 try:
85 loop = policy.get_event_loop()
86 if loop.is_closed():
87 raise RuntimeError("open a new one")
88 except RuntimeError:
89 # closed loop, make a new one
90 loop = policy.new_event_loop()
91 policy.set_event_loop(loop)
92
93 try:
94 watcher = asyncio.SafeChildWatcher()
95 watcher.attach_loop(loop)
96 policy.set_child_watcher(watcher)
97 yield
98 finally:
99 watcher.close()
100 policy.set_child_watcher(old_watcher)
101
102
103 def dec_safe_watcher(fun):
104 @functools.wraps(fun)
105 def _inner(*args, **kwargs):
106 with safe_watcher():
107 return fun(*args, **kwargs)
108
109 return _inner
110
111
112 69 @magics_class
113 70 class ScriptMagics(Magics):
114 71 """Magics for talking to scripts
@@ -117,6 +74,17 b' class ScriptMagics(Magics):'
117 74 with a program in a subprocess, and registers a few top-level
118 75 magics that call %%script with common interpreters.
119 76 """
77
78 event_loop = Any(
79 help="""
80 The event loop on which to run subprocesses
81
82 Not the main event loop,
83 because we want to be able to make blocking calls
84 and have certain requirements we don't want to impose on the main loop.
85 """
86 )
87
120 88 script_magics = List(
121 89 help="""Extra script cell magics to define
122 90
@@ -158,7 +126,6 b' class ScriptMagics(Magics):'
158 126 def __init__(self, shell=None):
159 127 super(ScriptMagics, self).__init__(shell=shell)
160 128 self._generate_script_magics()
161 self.job_manager = BackgroundJobManager()
162 129 self.bg_processes = []
163 130 atexit.register(self.kill_bg_processes)
164 131
@@ -199,7 +166,6 b' class ScriptMagics(Magics):'
199 166 @magic_arguments.magic_arguments()
200 167 @script_args
201 168 @cell_magic("script")
202 @dec_safe_watcher
203 169 def shebang(self, line, cell):
204 170 """Run a cell via a shell command
205 171
@@ -221,6 +187,27 b' class ScriptMagics(Magics):'
221 187 3
222 188 """
223 189
190 # Create the event loop in which to run script magics
191 # this operates on a background thread
192 if self.event_loop is None:
193 if sys.platform == "win32":
194 # don't override the current policy,
195 # just create an event loop
196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 else:
198 event_loop = asyncio.new_event_loop()
199 self.event_loop = event_loop
200
201 # start the loop in a background thread
202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread.start()
204 else:
205 event_loop = self.event_loop
206
207 def in_thread(coro):
208 """Call a coroutine on the asyncio thread"""
209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210
224 211 async def _handle_stream(stream, stream_arg, file_object):
225 212 while True:
226 213 line = (await stream.readline()).decode("utf8")
@@ -244,23 +231,11 b' class ScriptMagics(Magics):'
244 231 await asyncio.wait([stdout_task, stderr_task])
245 232 await process.wait()
246 233
247 policy = asyncio.get_event_loop_policy()
248 if sys.platform.startswith("win") and not isinstance(
249 policy, asyncio.WindowsProactorEventLoopPolicy
250 ):
251 # _do not_ overwrite the current policy
252 policy = asyncio.WindowsProactorEventLoopPolicy()
253
254 try:
255 loop = policy.get_event_loop()
256 except RuntimeError:
257 # closed loop, make a new one
258 loop = policy.new_event_loop()
259 policy.set_event_loop(loop)
260 234 argv = arg_split(line, posix=not sys.platform.startswith("win"))
261 235 args, cmd = self.shebang.parser.parse_known_args(argv)
236
262 237 try:
263 p = loop.run_until_complete(
238 p = in_thread(
264 239 asyncio.create_subprocess_exec(
265 240 *cmd,
266 241 stdout=asyncio.subprocess.PIPE,
@@ -274,7 +249,7 b' class ScriptMagics(Magics):'
274 249 return
275 250 else:
276 251 raise
277
252
278 253 if not cell.endswith('\n'):
279 254 cell += '\n'
280 255 cell = cell.encode('utf8', 'replace')
@@ -283,29 +258,34 b' class ScriptMagics(Magics):'
283 258 self._gc_bg_processes()
284 259 to_close = []
285 260 if args.out:
286 self.shell.user_ns[args.out] = p.stdout
261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
287 262 else:
288 263 to_close.append(p.stdout)
289 264 if args.err:
290 self.shell.user_ns[args.err] = p.stderr
265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
291 266 else:
292 267 to_close.append(p.stderr)
293 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
268 event_loop.call_soon_threadsafe(
269 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 )
294 271 if args.proc:
295 self.shell.user_ns[args.proc] = p
272 proc_proxy = _AsyncIOProxy(p, event_loop)
273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 self.shell.user_ns[args.proc] = proc_proxy
296 276 return
297
277
298 278 try:
299 loop.run_until_complete(_stream_communicate(p, cell))
279 in_thread(_stream_communicate(p, cell))
300 280 except KeyboardInterrupt:
301 281 try:
302 282 p.send_signal(signal.SIGINT)
303 time.sleep(0.1)
283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
304 284 if p.returncode is not None:
305 285 print("Process is interrupted.")
306 286 return
307 287 p.terminate()
308 time.sleep(0.1)
288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
309 289 if p.returncode is not None:
310 290 print("Process is terminated.")
311 291 return
@@ -316,7 +296,8 b' class ScriptMagics(Magics):'
316 296 except Exception as e:
317 297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
318 298 return
319 if args.raise_error and p.returncode!=0:
299
300 if args.raise_error and p.returncode != 0:
320 301 # If we get here and p.returncode is still None, we must have
321 302 # killed it but not yet seen its return code. We don't wait for it,
322 303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
@@ -324,14 +305,20 b' class ScriptMagics(Magics):'
324 305 raise CalledProcessError(rc, cell)
325 306
326 307 shebang.__skip_doctest__ = os.name != "posix"
327
328 def _run_script(self, p, cell, to_close):
308
309 async def _run_script(self, p, cell, to_close):
329 310 """callback for running the script in the background"""
311
330 312 p.stdin.write(cell)
313 await p.stdin.drain()
331 314 p.stdin.close()
315 await p.stdin.wait_closed()
316 await p.wait()
317 # asyncio read pipes have no close
318 # but we should drain the data anyway
332 319 for s in to_close:
333 s.close()
334 p.wait()
320 await s.read()
321 self._gc_bg_processes()
335 322
336 323 @line_magic("killbgscripts")
337 324 def killbgscripts(self, _nouse_=''):
@@ -1049,10 +1049,10 b' def test_custom_exc_count():'
1049 1049
1050 1050
1051 1051 def test_run_cell_async():
1052 loop = asyncio.get_event_loop_policy().get_event_loop()
1053 1052 ip.run_cell("import asyncio")
1054 1053 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1055 1054 assert asyncio.iscoroutine(coro)
1055 loop = asyncio.new_event_loop()
1056 1056 result = loop.run_until_complete(coro)
1057 1057 assert isinstance(result, interactiveshell.ExecutionResult)
1058 1058 assert result.result == 5
@@ -970,90 +970,98 b' def test_script_config():'
970 970 assert "whoda" in sm.magics["cell"]
971 971
972 972
973 @pytest.fixture
974 def event_loop():
975 policy = asyncio.get_event_loop_policy()
976 loop = policy.new_event_loop()
977 policy.set_event_loop(loop)
978 yield loop
979 loop.close()
980
981
982 @dec.skip_win32
983 @pytest.mark.skipif(
984 sys.platform == "win32", reason="This test does not run under Windows"
985 )
986 def test_script_out(event_loop):
987 assert event_loop.is_running() is False
988
973 def test_script_out():
989 974 ip = get_ipython()
990 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
991 assert event_loop.is_running() is False
992 assert ip.user_ns["output"] == "hi\n"
975 ip.run_cell_magic("script", f"--out output {sys.executable}", "print('hi')")
976 assert ip.user_ns["output"].strip() == "hi"
993 977
994 978
995 @dec.skip_win32
996 @pytest.mark.skipif(
997 sys.platform == "win32", reason="This test does not run under Windows"
998 )
999 def test_script_err(event_loop):
979 def test_script_err():
1000 980 ip = get_ipython()
1001 assert event_loop.is_running() is False
1002 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
1003 assert event_loop.is_running() is False
1004 assert ip.user_ns["error"] == "hello\n"
981 ip.run_cell_magic(
982 "script",
983 f"--err error {sys.executable}",
984 "import sys; print('hello', file=sys.stderr)",
985 )
986 assert ip.user_ns["error"].strip() == "hello"
1005 987
1006 988
1007 @dec.skip_win32
1008 @pytest.mark.skipif(
1009 sys.platform == "win32", reason="This test does not run under Windows"
1010 )
1011 989 def test_script_out_err():
1012 990
1013 991 ip = get_ipython()
1014 992 ip.run_cell_magic(
1015 "script", "--out output --err error sh", "echo 'hi'\necho 'hello' >&2"
993 "script",
994 f"--out output --err error {sys.executable}",
995 "\n".join(
996 [
997 "import sys",
998 "print('hi')",
999 "print('hello', file=sys.stderr)",
1000 ]
1001 ),
1016 1002 )
1017 assert ip.user_ns["output"] == "hi\n"
1018 assert ip.user_ns["error"] == "hello\n"
1003 assert ip.user_ns["output"].strip() == "hi"
1004 assert ip.user_ns["error"].strip() == "hello"
1019 1005
1020 1006
1021 @dec.skip_win32
1022 @pytest.mark.skipif(
1023 sys.platform == "win32", reason="This test does not run under Windows"
1024 )
1025 async def test_script_bg_out(event_loop):
1007 async def test_script_bg_out():
1026 1008 ip = get_ipython()
1027 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1028 assert (await ip.user_ns["output"].read()) == b"hi\n"
1029 ip.user_ns["output"].close()
1030 event_loop.stop()
1009 ip.run_cell_magic("script", f"--bg --out output {sys.executable}", "print('hi')")
1010 assert (await ip.user_ns["output"].read()).strip() == b"hi"
1011 assert ip.user_ns["output"].at_eof()
1031 1012
1032 1013
1033 @dec.skip_win32
1034 @pytest.mark.skipif(
1035 sys.platform == "win32", reason="This test does not run under Windows"
1036 )
1037 1014 async def test_script_bg_err():
1038 1015 ip = get_ipython()
1039 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1040 assert (await ip.user_ns["error"].read()) == b"hello\n"
1041 ip.user_ns["error"].close()
1016 ip.run_cell_magic(
1017 "script",
1018 f"--bg --err error {sys.executable}",
1019 "import sys; print('hello', file=sys.stderr)",
1020 )
1021 assert (await ip.user_ns["error"].read()).strip() == b"hello"
1022 assert ip.user_ns["error"].at_eof()
1042 1023
1043 1024
1044 @dec.skip_win32
1045 @pytest.mark.skipif(
1046 sys.platform == "win32", reason="This test does not run under Windows"
1047 )
1048 1025 async def test_script_bg_out_err():
1049 1026 ip = get_ipython()
1050 1027 ip.run_cell_magic(
1051 "script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2"
1028 "script",
1029 f"--bg --out output --err error {sys.executable}",
1030 "\n".join(
1031 [
1032 "import sys",
1033 "print('hi')",
1034 "print('hello', file=sys.stderr)",
1035 ]
1036 ),
1037 )
1038 assert (await ip.user_ns["output"].read()).strip() == b"hi"
1039 assert (await ip.user_ns["error"].read()).strip() == b"hello"
1040 assert ip.user_ns["output"].at_eof()
1041 assert ip.user_ns["error"].at_eof()
1042
1043
1044 async def test_script_bg_proc():
1045 ip = get_ipython()
1046 ip.run_cell_magic(
1047 "script",
1048 f"--bg --out output --proc p {sys.executable}",
1049 "\n".join(
1050 [
1051 "import sys",
1052 "print('hi')",
1053 "print('hello', file=sys.stderr)",
1054 ]
1055 ),
1052 1056 )
1053 assert (await ip.user_ns["output"].read()) == b"hi\n"
1054 assert (await ip.user_ns["error"].read()) == b"hello\n"
1055 ip.user_ns["output"].close()
1056 ip.user_ns["error"].close()
1057 p = ip.user_ns["p"]
1058 await p.wait()
1059 assert p.returncode == 0
1060 assert (await p.stdout.read()).strip() == b"hi"
1061 # not captured, so empty
1062 assert (await p.stderr.read()) == b""
1063 assert p.stdout.at_eof()
1064 assert p.stderr.at_eof()
1057 1065
1058 1066
1059 1067 def test_script_defaults():
@@ -6,6 +6,7 b' import sys'
6 6 import warnings
7 7 from warnings import warn
8 8
9 from IPython.core.async_helpers import get_asyncio_loop
9 10 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 11 from IPython.utils import io
11 12 from IPython.utils.py3compat import input
@@ -521,14 +522,14 b' class TerminalInteractiveShell(InteractiveShell):'
521 522 # while/true inside which will freeze the prompt.
522 523
523 524 policy = asyncio.get_event_loop_policy()
524 try:
525 old_loop = policy.get_event_loop()
526 except RuntimeError:
527 # This happens when the the event loop is closed,
528 # e.g. by calling `asyncio.run()`.
529 old_loop = None
530
531 policy.set_event_loop(self.pt_loop)
525 old_loop = get_asyncio_loop()
526
527 # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop`
528 # to get the current event loop.
529 # This will probably be replaced by an attribute or input argument,
530 # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here.
531 if old_loop is not self.pt_loop:
532 policy.set_event_loop(self.pt_loop)
532 533 try:
533 534 with patch_stdout(raw=True):
534 535 text = self.pt_app.prompt(
@@ -536,7 +537,7 b' class TerminalInteractiveShell(InteractiveShell):'
536 537 **self._extra_prompt_options())
537 538 finally:
538 539 # Restore the original event loop.
539 if old_loop is not None:
540 if old_loop is not None and old_loop is not self.pt_loop:
540 541 policy.set_event_loop(old_loop)
541 542
542 543 return text
@@ -652,7 +653,7 b' class TerminalInteractiveShell(InteractiveShell):'
652 653 # When we integrate the asyncio event loop, run the UI in the
653 654 # same event loop as the rest of the code. don't use an actual
654 655 # input hook. (Asyncio is not made for nesting event loops.)
655 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
656 self.pt_loop = get_asyncio_loop()
656 657
657 658 elif self._inputhook:
658 659 # If an inputhook was set, create a new asyncio event loop with
@@ -27,15 +27,12 b' prompt_toolkit`s `patch_stdout`)::'
27 27 In [4]: asyncio.ensure_future(f())
28 28
29 29 """
30 import asyncio
31 30 from prompt_toolkit import __version__ as ptk_version
32 31
33 PTK3 = ptk_version.startswith('3.')
32 from IPython.core.async_helpers import get_asyncio_loop
34 33
34 PTK3 = ptk_version.startswith('3.')
35 35
36 # Keep reference to the original asyncio loop, because getting the event loop
37 # within the input hook would return the other loop.
38 loop = asyncio.get_event_loop_policy().get_event_loop()
39 36
40 37
41 38 def inputhook(context):
@@ -52,6 +49,9 b' def inputhook(context):'
52 49 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
53 50 # because prompt_toolkit 2.0 uses a different event loop internally.
54 51
52 # get the persistent asyncio event loop
53 loop = get_asyncio_loop()
54
55 55 def stop():
56 56 loop.stop()
57 57
@@ -61,4 +61,3 b' def inputhook(context):'
61 61 loop.run_forever()
62 62 finally:
63 63 loop.remove_reader(fileno)
64
@@ -150,6 +150,7 b' extras_require = dict('
150 150 doc=["Sphinx>=1.3"],
151 151 test=[
152 152 "pytest",
153 "pytest-asyncio",
153 154 "testpath",
154 155 "pygments",
155 156 ],
General Comments 0
You need to be logged in to leave comments. Login now