##// END OF EJS Templates
avoid deprecated get_event_loop...
Min RK -
Show More
@@ -14,26 +14,46 b' Python semantics.'
14 14 import ast
15 15 import asyncio
16 16 import inspect
17 from functools import wraps
17 18
19 _asyncio_event_loop = None
18 20
19 class _AsyncIORunner:
20 def __init__(self):
21 self._loop = None
22
23 @property
24 def loop(self):
25 """Always returns a non-closed event loop"""
26 if self._loop is None or self._loop.is_closed():
27 policy = asyncio.get_event_loop_policy()
28 self._loop = policy.new_event_loop()
29 policy.set_event_loop(self._loop)
30 return self._loop
31 21
22 def get_asyncio_loop():
23 """asyncio has deprecated get_event_loop
24
25 Replicate it here, with our desired semantics:
26
27 - always returns a valid, not-closed loop
28 - not thread-local like asyncio's,
29 because we only want one loop for IPython
30 - if called from inside a coroutine (e.g. in ipykernel),
31 return the running loop
32
33 .. versionadded:: 8.0
34 """
35 try:
36 return asyncio.get_running_loop()
37 except RuntimeError:
38 # not inside a coroutine,
39 # track our own global
40 pass
41
42 # not thread-local like asyncio's,
43 # because we only track one event loop to run for IPython itself,
44 # always in the main thread.
45 global _asyncio_event_loop
46 if _asyncio_event_loop is None or _asyncio_event_loop.is_closed():
47 _asyncio_event_loop = asyncio.new_event_loop()
48 return _asyncio_event_loop
49
50
51 class _AsyncIORunner:
32 52 def __call__(self, coro):
33 53 """
34 54 Handler for asyncio autoawait
35 55 """
36 return self.loop.run_until_complete(coro)
56 return get_asyncio_loop().run_until_complete(coro)
37 57
38 58 def __str__(self):
39 59 return "asyncio"
@@ -42,6 +62,39 b' class _AsyncIORunner:'
42 62 _asyncio_runner = _AsyncIORunner()
43 63
44 64
65 class _AsyncIOProxy:
66 """Proxy-object for an asyncio
67
68 Any coroutine methods will be wrapped in event_loop.run_
69 """
70
71 def __init__(self, obj, event_loop):
72 self._obj = obj
73 self._event_loop = event_loop
74
75 def __repr__(self):
76 return f"<_AsyncIOProxy({self._obj!r})>"
77
78 def __getattr__(self, key):
79 attr = getattr(self._obj, key)
80 if inspect.iscoroutinefunction(attr):
81 # if it's a coroutine method,
82 # return a threadsafe wrapper onto the _current_ asyncio loop
83 @wraps(attr)
84 def _wrapped(*args, **kwargs):
85 concurrent_future = asyncio.run_coroutine_threadsafe(
86 attr(*args, **kwargs), self._event_loop
87 )
88 return asyncio.wrap_future(concurrent_future)
89
90 return _wrapped
91 else:
92 return attr
93
94 def __dir__(self):
95 return dir(self._obj)
96
97
45 98 def _curio_runner(coroutine):
46 99 """
47 100 handler for curio autoawait
@@ -6,19 +6,18 b''
6 6 import asyncio
7 7 import atexit
8 8 import errno
9 import functools
10 9 import os
11 10 import signal
12 11 import sys
13 12 import time
14 from contextlib import contextmanager
15 13 from subprocess import CalledProcessError
14 from threading import Thread
16 15
17 from traitlets import Dict, List, default
16 from traitlets import Any, Dict, List, default
18 17
19 18 from IPython.core import magic_arguments
19 from IPython.core.async_helpers import _AsyncIOProxy
20 20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 21 from IPython.utils.process import arg_split
23 22
24 23 #-----------------------------------------------------------------------------
@@ -67,48 +66,6 b' def script_args(f):'
67 66 return f
68 67
69 68
70 @contextmanager
71 def safe_watcher():
72 if sys.platform == "win32":
73 yield
74 return
75
76 from asyncio import SafeChildWatcher
77
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
82 return
83
84 try:
85 loop = policy.get_event_loop()
86 if loop.is_closed():
87 raise RuntimeError("open a new one")
88 except RuntimeError:
89 # closed loop, make a new one
90 loop = policy.new_event_loop()
91 policy.set_event_loop(loop)
92
93 try:
94 watcher = asyncio.SafeChildWatcher()
95 watcher.attach_loop(loop)
96 policy.set_child_watcher(watcher)
97 yield
98 finally:
99 watcher.close()
100 policy.set_child_watcher(old_watcher)
101
102
103 def dec_safe_watcher(fun):
104 @functools.wraps(fun)
105 def _inner(*args, **kwargs):
106 with safe_watcher():
107 return fun(*args, **kwargs)
108
109 return _inner
110
111
112 69 @magics_class
113 70 class ScriptMagics(Magics):
114 71 """Magics for talking to scripts
@@ -117,6 +74,17 b' class ScriptMagics(Magics):'
117 74 with a program in a subprocess, and registers a few top-level
118 75 magics that call %%script with common interpreters.
119 76 """
77
78 event_loop = Any(
79 help="""
80 The event loop on which to run subprocesses
81
82 Not the main event loop,
83 because we want to be able to make blocking calls
84 and have certain requirements we don't want to impose on the main loop.
85 """
86 )
87
120 88 script_magics = List(
121 89 help="""Extra script cell magics to define
122 90
@@ -158,7 +126,6 b' class ScriptMagics(Magics):'
158 126 def __init__(self, shell=None):
159 127 super(ScriptMagics, self).__init__(shell=shell)
160 128 self._generate_script_magics()
161 self.job_manager = BackgroundJobManager()
162 129 self.bg_processes = []
163 130 atexit.register(self.kill_bg_processes)
164 131
@@ -199,7 +166,6 b' class ScriptMagics(Magics):'
199 166 @magic_arguments.magic_arguments()
200 167 @script_args
201 168 @cell_magic("script")
202 @dec_safe_watcher
203 169 def shebang(self, line, cell):
204 170 """Run a cell via a shell command
205 171
@@ -221,6 +187,27 b' class ScriptMagics(Magics):'
221 187 3
222 188 """
223 189
190 # Create the event loop in which to run script magics
191 # this operates on a background thread
192 if self.event_loop is None:
193 if sys.platform == "win32":
194 # don't override the current policy,
195 # just create an event loop
196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 else:
198 event_loop = asyncio.new_event_loop()
199 self.event_loop = event_loop
200
201 # start the loop in a background thread
202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread.start()
204 else:
205 event_loop = self.event_loop
206
207 def in_thread(coro):
208 """Call a coroutine on the asyncio thread"""
209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210
224 211 async def _handle_stream(stream, stream_arg, file_object):
225 212 while True:
226 213 line = (await stream.readline()).decode("utf8")
@@ -244,23 +231,11 b' class ScriptMagics(Magics):'
244 231 await asyncio.wait([stdout_task, stderr_task])
245 232 await process.wait()
246 233
247 policy = asyncio.get_event_loop_policy()
248 if sys.platform.startswith("win") and not isinstance(
249 policy, asyncio.WindowsProactorEventLoopPolicy
250 ):
251 # _do not_ overwrite the current policy
252 policy = asyncio.WindowsProactorEventLoopPolicy()
253
254 try:
255 loop = policy.get_event_loop()
256 except RuntimeError:
257 # closed loop, make a new one
258 loop = policy.new_event_loop()
259 policy.set_event_loop(loop)
260 234 argv = arg_split(line, posix=not sys.platform.startswith("win"))
261 235 args, cmd = self.shebang.parser.parse_known_args(argv)
236
262 237 try:
263 p = loop.run_until_complete(
238 p = in_thread(
264 239 asyncio.create_subprocess_exec(
265 240 *cmd,
266 241 stdout=asyncio.subprocess.PIPE,
@@ -283,29 +258,34 b' class ScriptMagics(Magics):'
283 258 self._gc_bg_processes()
284 259 to_close = []
285 260 if args.out:
286 self.shell.user_ns[args.out] = p.stdout
261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
287 262 else:
288 263 to_close.append(p.stdout)
289 264 if args.err:
290 self.shell.user_ns[args.err] = p.stderr
265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
291 266 else:
292 267 to_close.append(p.stderr)
293 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
268 event_loop.call_soon_threadsafe(
269 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 )
294 271 if args.proc:
295 self.shell.user_ns[args.proc] = p
272 proc_proxy = _AsyncIOProxy(p, event_loop)
273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 self.shell.user_ns[args.proc] = proc_proxy
296 276 return
297 277
298 278 try:
299 loop.run_until_complete(_stream_communicate(p, cell))
279 in_thread(_stream_communicate(p, cell))
300 280 except KeyboardInterrupt:
301 281 try:
302 282 p.send_signal(signal.SIGINT)
303 time.sleep(0.1)
283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
304 284 if p.returncode is not None:
305 285 print("Process is interrupted.")
306 286 return
307 287 p.terminate()
308 time.sleep(0.1)
288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
309 289 if p.returncode is not None:
310 290 print("Process is terminated.")
311 291 return
@@ -316,6 +296,7 b' class ScriptMagics(Magics):'
316 296 except Exception as e:
317 297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
318 298 return
299
319 300 if args.raise_error and p.returncode!=0:
320 301 # If we get here and p.returncode is still None, we must have
321 302 # killed it but not yet seen its return code. We don't wait for it,
@@ -325,13 +306,19 b' class ScriptMagics(Magics):'
325 306
326 307 shebang.__skip_doctest__ = os.name != "posix"
327 308
328 def _run_script(self, p, cell, to_close):
309 async def _run_script(self, p, cell, to_close):
329 310 """callback for running the script in the background"""
311
330 312 p.stdin.write(cell)
313 await p.stdin.drain()
331 314 p.stdin.close()
315 await p.stdin.wait_closed()
316 await p.wait()
317 # asyncio read pipes have no close
318 # but we should drain the data anyway
332 319 for s in to_close:
333 s.close()
334 p.wait()
320 await s.read()
321 self._gc_bg_processes()
335 322
336 323 @line_magic("killbgscripts")
337 324 def killbgscripts(self, _nouse_=''):
@@ -1049,10 +1049,10 b' def test_custom_exc_count():'
1049 1049
1050 1050
1051 1051 def test_run_cell_async():
1052 loop = asyncio.get_event_loop_policy().get_event_loop()
1053 1052 ip.run_cell("import asyncio")
1054 1053 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1055 1054 assert asyncio.iscoroutine(coro)
1055 loop = asyncio.new_event_loop()
1056 1056 result = loop.run_until_complete(coro)
1057 1057 assert isinstance(result, interactiveshell.ExecutionResult)
1058 1058 assert result.result == 5
@@ -988,7 +988,7 b' def test_script_out(event_loop):'
988 988
989 989 ip = get_ipython()
990 990 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
991 assert event_loop.is_running() is False
991 assert not event_loop.is_running()
992 992 assert ip.user_ns["output"] == "hi\n"
993 993
994 994
@@ -998,9 +998,9 b' def test_script_out(event_loop):'
998 998 )
999 999 def test_script_err(event_loop):
1000 1000 ip = get_ipython()
1001 assert event_loop.is_running() is False
1001 assert not event_loop.is_running()
1002 1002 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
1003 assert event_loop.is_running() is False
1003 assert not event_loop.is_running()
1004 1004 assert ip.user_ns["error"] == "hello\n"
1005 1005
1006 1006
@@ -1022,13 +1022,11 b' def test_script_out_err():'
1022 1022 @pytest.mark.skipif(
1023 1023 sys.platform == "win32", reason="This test does not run under Windows"
1024 1024 )
1025 async def test_script_bg_out(event_loop):
1025 async def test_script_bg_out():
1026 1026 ip = get_ipython()
1027 1027 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1028 1028 assert (await ip.user_ns["output"].read()) == b"hi\n"
1029 ip.user_ns["output"].close()
1030 event_loop.stop()
1031
1029 assert ip.user_ns["output"].at_eof()
1032 1030
1033 1031 @dec.skip_win32
1034 1032 @pytest.mark.skipif(
@@ -1038,7 +1036,7 b' async def test_script_bg_err():'
1038 1036 ip = get_ipython()
1039 1037 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1040 1038 assert (await ip.user_ns["error"].read()) == b"hello\n"
1041 ip.user_ns["error"].close()
1039 assert ip.user_ns["error"].at_eof()
1042 1040
1043 1041
1044 1042 @dec.skip_win32
@@ -1052,8 +1050,27 b' async def test_script_bg_out_err():'
1052 1050 )
1053 1051 assert (await ip.user_ns["output"].read()) == b"hi\n"
1054 1052 assert (await ip.user_ns["error"].read()) == b"hello\n"
1055 ip.user_ns["output"].close()
1056 ip.user_ns["error"].close()
1053 assert ip.user_ns["output"].at_eof()
1054 assert ip.user_ns["error"].at_eof()
1055
1056
1057 @dec.skip_win32
1058 @pytest.mark.skipif(
1059 sys.platform == "win32", reason="This test does not run under Windows"
1060 )
1061 async def test_script_bg_proc():
1062 ip = get_ipython()
1063 ip.run_cell_magic(
1064 "script", "--bg --proc p sh --out out", "echo 'hi'\necho 'hello' >&2"
1065 )
1066 p = ip.user_ns["p"]
1067 await p.wait()
1068 assert p.returncode == 0
1069 assert (await p.stdout.read()) == b"hi\n"
1070 # not captured, so empty
1071 assert (await p.stderr.read()) == b""
1072 assert p.stdout.at_eof()
1073 assert p.stderr.at_eof()
1057 1074
1058 1075
1059 1076 def test_script_defaults():
@@ -6,6 +6,7 b' import sys'
6 6 import warnings
7 7 from warnings import warn
8 8
9 from IPython.core.async_helpers import get_asyncio_loop
9 10 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 11 from IPython.utils import io
11 12 from IPython.utils.py3compat import input
@@ -521,13 +522,13 b' class TerminalInteractiveShell(InteractiveShell):'
521 522 # while/true inside which will freeze the prompt.
522 523
523 524 policy = asyncio.get_event_loop_policy()
524 try:
525 old_loop = policy.get_event_loop()
526 except RuntimeError:
527 # This happens when the the event loop is closed,
528 # e.g. by calling `asyncio.run()`.
529 old_loop = None
525 old_loop = get_asyncio_loop()
530 526
527 # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop`
528 # to get the current event loop.
529 # This will probably be replaced by an attribute or input argument,
530 # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here.
531 if old_loop is not self.pt_loop:
531 532 policy.set_event_loop(self.pt_loop)
532 533 try:
533 534 with patch_stdout(raw=True):
@@ -536,7 +537,7 b' class TerminalInteractiveShell(InteractiveShell):'
536 537 **self._extra_prompt_options())
537 538 finally:
538 539 # Restore the original event loop.
539 if old_loop is not None:
540 if old_loop is not None and old_loop is not self.pt_loop:
540 541 policy.set_event_loop(old_loop)
541 542
542 543 return text
@@ -652,7 +653,7 b' class TerminalInteractiveShell(InteractiveShell):'
652 653 # When we integrate the asyncio event loop, run the UI in the
653 654 # same event loop as the rest of the code. don't use an actual
654 655 # input hook. (Asyncio is not made for nesting event loops.)
655 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
656 self.pt_loop = get_asyncio_loop()
656 657
657 658 elif self._inputhook:
658 659 # If an inputhook was set, create a new asyncio event loop with
@@ -27,15 +27,12 b' prompt_toolkit`s `patch_stdout`)::'
27 27 In [4]: asyncio.ensure_future(f())
28 28
29 29 """
30 import asyncio
31 30 from prompt_toolkit import __version__ as ptk_version
32 31
33 PTK3 = ptk_version.startswith('3.')
32 from IPython.core.async_helpers import get_asyncio_loop
34 33
34 PTK3 = ptk_version.startswith('3.')
35 35
36 # Keep reference to the original asyncio loop, because getting the event loop
37 # within the input hook would return the other loop.
38 loop = asyncio.get_event_loop_policy().get_event_loop()
39 36
40 37
41 38 def inputhook(context):
@@ -52,6 +49,9 b' def inputhook(context):'
52 49 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
53 50 # because prompt_toolkit 2.0 uses a different event loop internally.
54 51
52 # get the persistent asyncio event loop
53 loop = get_asyncio_loop()
54
55 55 def stop():
56 56 loop.stop()
57 57
@@ -61,4 +61,3 b' def inputhook(context):'
61 61 loop.run_forever()
62 62 finally:
63 63 loop.remove_reader(fileno)
64
General Comments 0
You need to be logged in to leave comments. Login now