diff --git a/IPython/core/async_helpers.py b/IPython/core/async_helpers.py index 6623fae..0e7db0b 100644 --- a/IPython/core/async_helpers.py +++ b/IPython/core/async_helpers.py @@ -14,26 +14,46 @@ Python semantics. import ast import asyncio import inspect +from functools import wraps +_asyncio_event_loop = None -class _AsyncIORunner: - def __init__(self): - self._loop = None - - @property - def loop(self): - """Always returns a non-closed event loop""" - if self._loop is None or self._loop.is_closed(): - policy = asyncio.get_event_loop_policy() - self._loop = policy.new_event_loop() - policy.set_event_loop(self._loop) - return self._loop +def get_asyncio_loop(): + """asyncio has deprecated get_event_loop + + Replicate it here, with our desired semantics: + + - always returns a valid, not-closed loop + - not thread-local like asyncio's, + because we only want one loop for IPython + - if called from inside a coroutine (e.g. in ipykernel), + return the running loop + + .. versionadded:: 8.0 + """ + try: + return asyncio.get_running_loop() + except RuntimeError: + # not inside a coroutine, + # track our own global + pass + + # not thread-local like asyncio's, + # because we only track one event loop to run for IPython itself, + # always in the main thread. + global _asyncio_event_loop + if _asyncio_event_loop is None or _asyncio_event_loop.is_closed(): + _asyncio_event_loop = asyncio.new_event_loop() + return _asyncio_event_loop + + +class _AsyncIORunner: def __call__(self, coro): """ Handler for asyncio autoawait """ - return self.loop.run_until_complete(coro) + return get_asyncio_loop().run_until_complete(coro) def __str__(self): return "asyncio" @@ -42,6 +62,39 @@ class _AsyncIORunner: _asyncio_runner = _AsyncIORunner() +class _AsyncIOProxy: + """Proxy-object for an asyncio + + Any coroutine methods will be wrapped in event_loop.run_ + """ + + def __init__(self, obj, event_loop): + self._obj = obj + self._event_loop = event_loop + + def __repr__(self): + return f"<_AsyncIOProxy({self._obj!r})>" + + def __getattr__(self, key): + attr = getattr(self._obj, key) + if inspect.iscoroutinefunction(attr): + # if it's a coroutine method, + # return a threadsafe wrapper onto the _current_ asyncio loop + @wraps(attr) + def _wrapped(*args, **kwargs): + concurrent_future = asyncio.run_coroutine_threadsafe( + attr(*args, **kwargs), self._event_loop + ) + return asyncio.wrap_future(concurrent_future) + + return _wrapped + else: + return attr + + def __dir__(self): + return dir(self._obj) + + def _curio_runner(coroutine): """ handler for curio autoawait diff --git a/IPython/core/magics/script.py b/IPython/core/magics/script.py index 59f27c2..a0028c2 100644 --- a/IPython/core/magics/script.py +++ b/IPython/core/magics/script.py @@ -6,19 +6,18 @@ import asyncio import atexit import errno -import functools import os import signal import sys import time -from contextlib import contextmanager from subprocess import CalledProcessError +from threading import Thread -from traitlets import Dict, List, default +from traitlets import Any, Dict, List, default from IPython.core import magic_arguments +from IPython.core.async_helpers import _AsyncIOProxy from IPython.core.magic import Magics, cell_magic, line_magic, magics_class -from IPython.lib.backgroundjobs import BackgroundJobManager from IPython.utils.process import arg_split #----------------------------------------------------------------------------- @@ -57,7 +56,7 @@ def script_args(f): ), magic_arguments.argument( '--no-raise-error', action="store_false", dest='raise_error', - help="""Whether you should raise an error message in addition to + help="""Whether you should raise an error message in addition to a stream on stderr if you get a nonzero exit code. """ ) @@ -67,48 +66,6 @@ def script_args(f): return f -@contextmanager -def safe_watcher(): - if sys.platform == "win32": - yield - return - - from asyncio import SafeChildWatcher - - policy = asyncio.get_event_loop_policy() - old_watcher = policy.get_child_watcher() - if isinstance(old_watcher, SafeChildWatcher): - yield - return - - try: - loop = policy.get_event_loop() - if loop.is_closed(): - raise RuntimeError("open a new one") - except RuntimeError: - # closed loop, make a new one - loop = policy.new_event_loop() - policy.set_event_loop(loop) - - try: - watcher = asyncio.SafeChildWatcher() - watcher.attach_loop(loop) - policy.set_child_watcher(watcher) - yield - finally: - watcher.close() - policy.set_child_watcher(old_watcher) - - -def dec_safe_watcher(fun): - @functools.wraps(fun) - def _inner(*args, **kwargs): - with safe_watcher(): - return fun(*args, **kwargs) - - return _inner - - @magics_class class ScriptMagics(Magics): """Magics for talking to scripts @@ -117,6 +74,17 @@ class ScriptMagics(Magics): with a program in a subprocess, and registers a few top-level magics that call %%script with common interpreters. """ + + event_loop = Any( + help=""" + The event loop on which to run subprocesses + + Not the main event loop, + because we want to be able to make blocking calls + and have certain requirements we don't want to impose on the main loop. + """ + ) + script_magics = List( help="""Extra script cell magics to define @@ -158,7 +126,6 @@ class ScriptMagics(Magics): def __init__(self, shell=None): super(ScriptMagics, self).__init__(shell=shell) self._generate_script_magics() - self.job_manager = BackgroundJobManager() self.bg_processes = [] atexit.register(self.kill_bg_processes) @@ -199,7 +166,6 @@ class ScriptMagics(Magics): @magic_arguments.magic_arguments() @script_args @cell_magic("script") - @dec_safe_watcher def shebang(self, line, cell): """Run a cell via a shell command @@ -221,6 +187,27 @@ class ScriptMagics(Magics): 3 """ + # Create the event loop in which to run script magics + # this operates on a background thread + if self.event_loop is None: + if sys.platform == "win32": + # don't override the current policy, + # just create an event loop + event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() + else: + event_loop = asyncio.new_event_loop() + self.event_loop = event_loop + + # start the loop in a background thread + asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) + asyncio_thread.start() + else: + event_loop = self.event_loop + + def in_thread(coro): + """Call a coroutine on the asyncio thread""" + return asyncio.run_coroutine_threadsafe(coro, event_loop).result() + async def _handle_stream(stream, stream_arg, file_object): while True: line = (await stream.readline()).decode("utf8") @@ -244,23 +231,11 @@ class ScriptMagics(Magics): await asyncio.wait([stdout_task, stderr_task]) await process.wait() - policy = asyncio.get_event_loop_policy() - if sys.platform.startswith("win") and not isinstance( - policy, asyncio.WindowsProactorEventLoopPolicy - ): - # _do not_ overwrite the current policy - policy = asyncio.WindowsProactorEventLoopPolicy() - - try: - loop = policy.get_event_loop() - except RuntimeError: - # closed loop, make a new one - loop = policy.new_event_loop() - policy.set_event_loop(loop) argv = arg_split(line, posix=not sys.platform.startswith("win")) args, cmd = self.shebang.parser.parse_known_args(argv) + try: - p = loop.run_until_complete( + p = in_thread( asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, @@ -274,7 +249,7 @@ class ScriptMagics(Magics): return else: raise - + if not cell.endswith('\n'): cell += '\n' cell = cell.encode('utf8', 'replace') @@ -283,29 +258,34 @@ class ScriptMagics(Magics): self._gc_bg_processes() to_close = [] if args.out: - self.shell.user_ns[args.out] = p.stdout + self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) else: to_close.append(p.stdout) if args.err: - self.shell.user_ns[args.err] = p.stderr + self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) else: to_close.append(p.stderr) - self.job_manager.new(self._run_script, p, cell, to_close, daemon=True) + event_loop.call_soon_threadsafe( + lambda: asyncio.Task(self._run_script(p, cell, to_close)) + ) if args.proc: - self.shell.user_ns[args.proc] = p + proc_proxy = _AsyncIOProxy(p, event_loop) + proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) + proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) + self.shell.user_ns[args.proc] = proc_proxy return - + try: - loop.run_until_complete(_stream_communicate(p, cell)) + in_thread(_stream_communicate(p, cell)) except KeyboardInterrupt: try: p.send_signal(signal.SIGINT) - time.sleep(0.1) + in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) if p.returncode is not None: print("Process is interrupted.") return p.terminate() - time.sleep(0.1) + in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) if p.returncode is not None: print("Process is terminated.") return @@ -316,7 +296,8 @@ class ScriptMagics(Magics): except Exception as e: print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) return - if args.raise_error and p.returncode!=0: + + if args.raise_error and p.returncode != 0: # If we get here and p.returncode is still None, we must have # killed it but not yet seen its return code. We don't wait for it, # in case it's stuck in uninterruptible sleep. -9 = SIGKILL @@ -324,14 +305,20 @@ class ScriptMagics(Magics): raise CalledProcessError(rc, cell) shebang.__skip_doctest__ = os.name != "posix" - - def _run_script(self, p, cell, to_close): + + async def _run_script(self, p, cell, to_close): """callback for running the script in the background""" + p.stdin.write(cell) + await p.stdin.drain() p.stdin.close() + await p.stdin.wait_closed() + await p.wait() + # asyncio read pipes have no close + # but we should drain the data anyway for s in to_close: - s.close() - p.wait() + await s.read() + self._gc_bg_processes() @line_magic("killbgscripts") def killbgscripts(self, _nouse_=''): diff --git a/IPython/core/tests/test_interactiveshell.py b/IPython/core/tests/test_interactiveshell.py index 355f2b0..41f5a35 100644 --- a/IPython/core/tests/test_interactiveshell.py +++ b/IPython/core/tests/test_interactiveshell.py @@ -1049,10 +1049,10 @@ def test_custom_exc_count(): def test_run_cell_async(): - loop = asyncio.get_event_loop_policy().get_event_loop() ip.run_cell("import asyncio") coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5") assert asyncio.iscoroutine(coro) + loop = asyncio.new_event_loop() result = loop.run_until_complete(coro) assert isinstance(result, interactiveshell.ExecutionResult) assert result.result == 5 diff --git a/IPython/core/tests/test_magic.py b/IPython/core/tests/test_magic.py index 40f0ae4..f66136b 100644 --- a/IPython/core/tests/test_magic.py +++ b/IPython/core/tests/test_magic.py @@ -988,7 +988,7 @@ def test_script_out(event_loop): ip = get_ipython() ip.run_cell_magic("script", "--out output sh", "echo 'hi'") - assert event_loop.is_running() is False + assert not event_loop.is_running() assert ip.user_ns["output"] == "hi\n" @@ -998,9 +998,9 @@ def test_script_out(event_loop): ) def test_script_err(event_loop): ip = get_ipython() - assert event_loop.is_running() is False + assert not event_loop.is_running() ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2") - assert event_loop.is_running() is False + assert not event_loop.is_running() assert ip.user_ns["error"] == "hello\n" @@ -1022,13 +1022,11 @@ def test_script_out_err(): @pytest.mark.skipif( sys.platform == "win32", reason="This test does not run under Windows" ) -async def test_script_bg_out(event_loop): +async def test_script_bg_out(): ip = get_ipython() ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'") assert (await ip.user_ns["output"].read()) == b"hi\n" - ip.user_ns["output"].close() - event_loop.stop() - + assert ip.user_ns["output"].at_eof() @dec.skip_win32 @pytest.mark.skipif( @@ -1038,7 +1036,7 @@ async def test_script_bg_err(): ip = get_ipython() ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2") assert (await ip.user_ns["error"].read()) == b"hello\n" - ip.user_ns["error"].close() + assert ip.user_ns["error"].at_eof() @dec.skip_win32 @@ -1052,8 +1050,27 @@ async def test_script_bg_out_err(): ) assert (await ip.user_ns["output"].read()) == b"hi\n" assert (await ip.user_ns["error"].read()) == b"hello\n" - ip.user_ns["output"].close() - ip.user_ns["error"].close() + assert ip.user_ns["output"].at_eof() + assert ip.user_ns["error"].at_eof() + + +@dec.skip_win32 +@pytest.mark.skipif( + sys.platform == "win32", reason="This test does not run under Windows" +) +async def test_script_bg_proc(): + ip = get_ipython() + ip.run_cell_magic( + "script", "--bg --proc p sh --out out", "echo 'hi'\necho 'hello' >&2" + ) + p = ip.user_ns["p"] + await p.wait() + assert p.returncode == 0 + assert (await p.stdout.read()) == b"hi\n" + # not captured, so empty + assert (await p.stderr.read()) == b"" + assert p.stdout.at_eof() + assert p.stderr.at_eof() def test_script_defaults(): diff --git a/IPython/terminal/interactiveshell.py b/IPython/terminal/interactiveshell.py index 98e5eb8..8e61f8c 100644 --- a/IPython/terminal/interactiveshell.py +++ b/IPython/terminal/interactiveshell.py @@ -6,6 +6,7 @@ import sys import warnings from warnings import warn +from IPython.core.async_helpers import get_asyncio_loop from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC from IPython.utils import io from IPython.utils.py3compat import input @@ -521,14 +522,14 @@ class TerminalInteractiveShell(InteractiveShell): # while/true inside which will freeze the prompt. policy = asyncio.get_event_loop_policy() - try: - old_loop = policy.get_event_loop() - except RuntimeError: - # This happens when the the event loop is closed, - # e.g. by calling `asyncio.run()`. - old_loop = None - - policy.set_event_loop(self.pt_loop) + old_loop = get_asyncio_loop() + + # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop` + # to get the current event loop. + # This will probably be replaced by an attribute or input argument, + # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here. + if old_loop is not self.pt_loop: + policy.set_event_loop(self.pt_loop) try: with patch_stdout(raw=True): text = self.pt_app.prompt( @@ -536,7 +537,7 @@ class TerminalInteractiveShell(InteractiveShell): **self._extra_prompt_options()) finally: # Restore the original event loop. - if old_loop is not None: + if old_loop is not None and old_loop is not self.pt_loop: policy.set_event_loop(old_loop) return text @@ -652,7 +653,7 @@ class TerminalInteractiveShell(InteractiveShell): # When we integrate the asyncio event loop, run the UI in the # same event loop as the rest of the code. don't use an actual # input hook. (Asyncio is not made for nesting event loops.) - self.pt_loop = asyncio.get_event_loop_policy().get_event_loop() + self.pt_loop = get_asyncio_loop() elif self._inputhook: # If an inputhook was set, create a new asyncio event loop with diff --git a/IPython/terminal/pt_inputhooks/asyncio.py b/IPython/terminal/pt_inputhooks/asyncio.py index 72c45ea..2d8c128 100644 --- a/IPython/terminal/pt_inputhooks/asyncio.py +++ b/IPython/terminal/pt_inputhooks/asyncio.py @@ -27,15 +27,12 @@ prompt_toolkit`s `patch_stdout`):: In [4]: asyncio.ensure_future(f()) """ -import asyncio from prompt_toolkit import __version__ as ptk_version -PTK3 = ptk_version.startswith('3.') +from IPython.core.async_helpers import get_asyncio_loop +PTK3 = ptk_version.startswith('3.') -# Keep reference to the original asyncio loop, because getting the event loop -# within the input hook would return the other loop. -loop = asyncio.get_event_loop_policy().get_event_loop() def inputhook(context): @@ -52,6 +49,9 @@ def inputhook(context): # For prompt_toolkit 2.0, we can run the current asyncio event loop, # because prompt_toolkit 2.0 uses a different event loop internally. + # get the persistent asyncio event loop + loop = get_asyncio_loop() + def stop(): loop.stop() @@ -61,4 +61,3 @@ def inputhook(context): loop.run_forever() finally: loop.remove_reader(fileno) -