##// END OF EJS Templates
avoid deprecated get_event_loop...
Min RK -
Show More
@@ -14,26 +14,46 b' Python semantics.'
14 import ast
14 import ast
15 import asyncio
15 import asyncio
16 import inspect
16 import inspect
17 from functools import wraps
17
18
19 _asyncio_event_loop = None
18
20
19 class _AsyncIORunner:
20 def __init__(self):
21 self._loop = None
22
23 @property
24 def loop(self):
25 """Always returns a non-closed event loop"""
26 if self._loop is None or self._loop.is_closed():
27 policy = asyncio.get_event_loop_policy()
28 self._loop = policy.new_event_loop()
29 policy.set_event_loop(self._loop)
30 return self._loop
31
21
22 def get_asyncio_loop():
23 """asyncio has deprecated get_event_loop
24
25 Replicate it here, with our desired semantics:
26
27 - always returns a valid, not-closed loop
28 - not thread-local like asyncio's,
29 because we only want one loop for IPython
30 - if called from inside a coroutine (e.g. in ipykernel),
31 return the running loop
32
33 .. versionadded:: 8.0
34 """
35 try:
36 return asyncio.get_running_loop()
37 except RuntimeError:
38 # not inside a coroutine,
39 # track our own global
40 pass
41
42 # not thread-local like asyncio's,
43 # because we only track one event loop to run for IPython itself,
44 # always in the main thread.
45 global _asyncio_event_loop
46 if _asyncio_event_loop is None or _asyncio_event_loop.is_closed():
47 _asyncio_event_loop = asyncio.new_event_loop()
48 return _asyncio_event_loop
49
50
51 class _AsyncIORunner:
32 def __call__(self, coro):
52 def __call__(self, coro):
33 """
53 """
34 Handler for asyncio autoawait
54 Handler for asyncio autoawait
35 """
55 """
36 return self.loop.run_until_complete(coro)
56 return get_asyncio_loop().run_until_complete(coro)
37
57
38 def __str__(self):
58 def __str__(self):
39 return "asyncio"
59 return "asyncio"
@@ -42,6 +62,39 b' class _AsyncIORunner:'
42 _asyncio_runner = _AsyncIORunner()
62 _asyncio_runner = _AsyncIORunner()
43
63
44
64
65 class _AsyncIOProxy:
66 """Proxy-object for an asyncio
67
68 Any coroutine methods will be wrapped in event_loop.run_
69 """
70
71 def __init__(self, obj, event_loop):
72 self._obj = obj
73 self._event_loop = event_loop
74
75 def __repr__(self):
76 return f"<_AsyncIOProxy({self._obj!r})>"
77
78 def __getattr__(self, key):
79 attr = getattr(self._obj, key)
80 if inspect.iscoroutinefunction(attr):
81 # if it's a coroutine method,
82 # return a threadsafe wrapper onto the _current_ asyncio loop
83 @wraps(attr)
84 def _wrapped(*args, **kwargs):
85 concurrent_future = asyncio.run_coroutine_threadsafe(
86 attr(*args, **kwargs), self._event_loop
87 )
88 return asyncio.wrap_future(concurrent_future)
89
90 return _wrapped
91 else:
92 return attr
93
94 def __dir__(self):
95 return dir(self._obj)
96
97
45 def _curio_runner(coroutine):
98 def _curio_runner(coroutine):
46 """
99 """
47 handler for curio autoawait
100 handler for curio autoawait
@@ -6,19 +6,18 b''
6 import asyncio
6 import asyncio
7 import atexit
7 import atexit
8 import errno
8 import errno
9 import functools
10 import os
9 import os
11 import signal
10 import signal
12 import sys
11 import sys
13 import time
12 import time
14 from contextlib import contextmanager
15 from subprocess import CalledProcessError
13 from subprocess import CalledProcessError
14 from threading import Thread
16
15
17 from traitlets import Dict, List, default
16 from traitlets import Any, Dict, List, default
18
17
19 from IPython.core import magic_arguments
18 from IPython.core import magic_arguments
19 from IPython.core.async_helpers import _AsyncIOProxy
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 from IPython.utils.process import arg_split
21 from IPython.utils.process import arg_split
23
22
24 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
@@ -57,7 +56,7 b' def script_args(f):'
57 ),
56 ),
58 magic_arguments.argument(
57 magic_arguments.argument(
59 '--no-raise-error', action="store_false", dest='raise_error',
58 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
59 help="""Whether you should raise an error message in addition to
61 a stream on stderr if you get a nonzero exit code.
60 a stream on stderr if you get a nonzero exit code.
62 """
61 """
63 )
62 )
@@ -67,48 +66,6 b' def script_args(f):'
67 return f
66 return f
68
67
69
68
70 @contextmanager
71 def safe_watcher():
72 if sys.platform == "win32":
73 yield
74 return
75
76 from asyncio import SafeChildWatcher
77
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
82 return
83
84 try:
85 loop = policy.get_event_loop()
86 if loop.is_closed():
87 raise RuntimeError("open a new one")
88 except RuntimeError:
89 # closed loop, make a new one
90 loop = policy.new_event_loop()
91 policy.set_event_loop(loop)
92
93 try:
94 watcher = asyncio.SafeChildWatcher()
95 watcher.attach_loop(loop)
96 policy.set_child_watcher(watcher)
97 yield
98 finally:
99 watcher.close()
100 policy.set_child_watcher(old_watcher)
101
102
103 def dec_safe_watcher(fun):
104 @functools.wraps(fun)
105 def _inner(*args, **kwargs):
106 with safe_watcher():
107 return fun(*args, **kwargs)
108
109 return _inner
110
111
112 @magics_class
69 @magics_class
113 class ScriptMagics(Magics):
70 class ScriptMagics(Magics):
114 """Magics for talking to scripts
71 """Magics for talking to scripts
@@ -117,6 +74,17 b' class ScriptMagics(Magics):'
117 with a program in a subprocess, and registers a few top-level
74 with a program in a subprocess, and registers a few top-level
118 magics that call %%script with common interpreters.
75 magics that call %%script with common interpreters.
119 """
76 """
77
78 event_loop = Any(
79 help="""
80 The event loop on which to run subprocesses
81
82 Not the main event loop,
83 because we want to be able to make blocking calls
84 and have certain requirements we don't want to impose on the main loop.
85 """
86 )
87
120 script_magics = List(
88 script_magics = List(
121 help="""Extra script cell magics to define
89 help="""Extra script cell magics to define
122
90
@@ -158,7 +126,6 b' class ScriptMagics(Magics):'
158 def __init__(self, shell=None):
126 def __init__(self, shell=None):
159 super(ScriptMagics, self).__init__(shell=shell)
127 super(ScriptMagics, self).__init__(shell=shell)
160 self._generate_script_magics()
128 self._generate_script_magics()
161 self.job_manager = BackgroundJobManager()
162 self.bg_processes = []
129 self.bg_processes = []
163 atexit.register(self.kill_bg_processes)
130 atexit.register(self.kill_bg_processes)
164
131
@@ -199,7 +166,6 b' class ScriptMagics(Magics):'
199 @magic_arguments.magic_arguments()
166 @magic_arguments.magic_arguments()
200 @script_args
167 @script_args
201 @cell_magic("script")
168 @cell_magic("script")
202 @dec_safe_watcher
203 def shebang(self, line, cell):
169 def shebang(self, line, cell):
204 """Run a cell via a shell command
170 """Run a cell via a shell command
205
171
@@ -221,6 +187,27 b' class ScriptMagics(Magics):'
221 3
187 3
222 """
188 """
223
189
190 # Create the event loop in which to run script magics
191 # this operates on a background thread
192 if self.event_loop is None:
193 if sys.platform == "win32":
194 # don't override the current policy,
195 # just create an event loop
196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 else:
198 event_loop = asyncio.new_event_loop()
199 self.event_loop = event_loop
200
201 # start the loop in a background thread
202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread.start()
204 else:
205 event_loop = self.event_loop
206
207 def in_thread(coro):
208 """Call a coroutine on the asyncio thread"""
209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210
224 async def _handle_stream(stream, stream_arg, file_object):
211 async def _handle_stream(stream, stream_arg, file_object):
225 while True:
212 while True:
226 line = (await stream.readline()).decode("utf8")
213 line = (await stream.readline()).decode("utf8")
@@ -244,23 +231,11 b' class ScriptMagics(Magics):'
244 await asyncio.wait([stdout_task, stderr_task])
231 await asyncio.wait([stdout_task, stderr_task])
245 await process.wait()
232 await process.wait()
246
233
247 policy = asyncio.get_event_loop_policy()
248 if sys.platform.startswith("win") and not isinstance(
249 policy, asyncio.WindowsProactorEventLoopPolicy
250 ):
251 # _do not_ overwrite the current policy
252 policy = asyncio.WindowsProactorEventLoopPolicy()
253
254 try:
255 loop = policy.get_event_loop()
256 except RuntimeError:
257 # closed loop, make a new one
258 loop = policy.new_event_loop()
259 policy.set_event_loop(loop)
260 argv = arg_split(line, posix=not sys.platform.startswith("win"))
234 argv = arg_split(line, posix=not sys.platform.startswith("win"))
261 args, cmd = self.shebang.parser.parse_known_args(argv)
235 args, cmd = self.shebang.parser.parse_known_args(argv)
236
262 try:
237 try:
263 p = loop.run_until_complete(
238 p = in_thread(
264 asyncio.create_subprocess_exec(
239 asyncio.create_subprocess_exec(
265 *cmd,
240 *cmd,
266 stdout=asyncio.subprocess.PIPE,
241 stdout=asyncio.subprocess.PIPE,
@@ -274,7 +249,7 b' class ScriptMagics(Magics):'
274 return
249 return
275 else:
250 else:
276 raise
251 raise
277
252
278 if not cell.endswith('\n'):
253 if not cell.endswith('\n'):
279 cell += '\n'
254 cell += '\n'
280 cell = cell.encode('utf8', 'replace')
255 cell = cell.encode('utf8', 'replace')
@@ -283,29 +258,34 b' class ScriptMagics(Magics):'
283 self._gc_bg_processes()
258 self._gc_bg_processes()
284 to_close = []
259 to_close = []
285 if args.out:
260 if args.out:
286 self.shell.user_ns[args.out] = p.stdout
261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
287 else:
262 else:
288 to_close.append(p.stdout)
263 to_close.append(p.stdout)
289 if args.err:
264 if args.err:
290 self.shell.user_ns[args.err] = p.stderr
265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
291 else:
266 else:
292 to_close.append(p.stderr)
267 to_close.append(p.stderr)
293 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
268 event_loop.call_soon_threadsafe(
269 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 )
294 if args.proc:
271 if args.proc:
295 self.shell.user_ns[args.proc] = p
272 proc_proxy = _AsyncIOProxy(p, event_loop)
273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 self.shell.user_ns[args.proc] = proc_proxy
296 return
276 return
297
277
298 try:
278 try:
299 loop.run_until_complete(_stream_communicate(p, cell))
279 in_thread(_stream_communicate(p, cell))
300 except KeyboardInterrupt:
280 except KeyboardInterrupt:
301 try:
281 try:
302 p.send_signal(signal.SIGINT)
282 p.send_signal(signal.SIGINT)
303 time.sleep(0.1)
283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
304 if p.returncode is not None:
284 if p.returncode is not None:
305 print("Process is interrupted.")
285 print("Process is interrupted.")
306 return
286 return
307 p.terminate()
287 p.terminate()
308 time.sleep(0.1)
288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
309 if p.returncode is not None:
289 if p.returncode is not None:
310 print("Process is terminated.")
290 print("Process is terminated.")
311 return
291 return
@@ -316,7 +296,8 b' class ScriptMagics(Magics):'
316 except Exception as e:
296 except Exception as e:
317 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
318 return
298 return
319 if args.raise_error and p.returncode!=0:
299
300 if args.raise_error and p.returncode != 0:
320 # If we get here and p.returncode is still None, we must have
301 # If we get here and p.returncode is still None, we must have
321 # killed it but not yet seen its return code. We don't wait for it,
302 # killed it but not yet seen its return code. We don't wait for it,
322 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
@@ -324,14 +305,20 b' class ScriptMagics(Magics):'
324 raise CalledProcessError(rc, cell)
305 raise CalledProcessError(rc, cell)
325
306
326 shebang.__skip_doctest__ = os.name != "posix"
307 shebang.__skip_doctest__ = os.name != "posix"
327
308
328 def _run_script(self, p, cell, to_close):
309 async def _run_script(self, p, cell, to_close):
329 """callback for running the script in the background"""
310 """callback for running the script in the background"""
311
330 p.stdin.write(cell)
312 p.stdin.write(cell)
313 await p.stdin.drain()
331 p.stdin.close()
314 p.stdin.close()
315 await p.stdin.wait_closed()
316 await p.wait()
317 # asyncio read pipes have no close
318 # but we should drain the data anyway
332 for s in to_close:
319 for s in to_close:
333 s.close()
320 await s.read()
334 p.wait()
321 self._gc_bg_processes()
335
322
336 @line_magic("killbgscripts")
323 @line_magic("killbgscripts")
337 def killbgscripts(self, _nouse_=''):
324 def killbgscripts(self, _nouse_=''):
@@ -1049,10 +1049,10 b' def test_custom_exc_count():'
1049
1049
1050
1050
1051 def test_run_cell_async():
1051 def test_run_cell_async():
1052 loop = asyncio.get_event_loop_policy().get_event_loop()
1053 ip.run_cell("import asyncio")
1052 ip.run_cell("import asyncio")
1054 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1053 coro = ip.run_cell_async("await asyncio.sleep(0.01)\n5")
1055 assert asyncio.iscoroutine(coro)
1054 assert asyncio.iscoroutine(coro)
1055 loop = asyncio.new_event_loop()
1056 result = loop.run_until_complete(coro)
1056 result = loop.run_until_complete(coro)
1057 assert isinstance(result, interactiveshell.ExecutionResult)
1057 assert isinstance(result, interactiveshell.ExecutionResult)
1058 assert result.result == 5
1058 assert result.result == 5
@@ -988,7 +988,7 b' def test_script_out(event_loop):'
988
988
989 ip = get_ipython()
989 ip = get_ipython()
990 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
990 ip.run_cell_magic("script", "--out output sh", "echo 'hi'")
991 assert event_loop.is_running() is False
991 assert not event_loop.is_running()
992 assert ip.user_ns["output"] == "hi\n"
992 assert ip.user_ns["output"] == "hi\n"
993
993
994
994
@@ -998,9 +998,9 b' def test_script_out(event_loop):'
998 )
998 )
999 def test_script_err(event_loop):
999 def test_script_err(event_loop):
1000 ip = get_ipython()
1000 ip = get_ipython()
1001 assert event_loop.is_running() is False
1001 assert not event_loop.is_running()
1002 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
1002 ip.run_cell_magic("script", "--err error sh", "echo 'hello' >&2")
1003 assert event_loop.is_running() is False
1003 assert not event_loop.is_running()
1004 assert ip.user_ns["error"] == "hello\n"
1004 assert ip.user_ns["error"] == "hello\n"
1005
1005
1006
1006
@@ -1022,13 +1022,11 b' def test_script_out_err():'
1022 @pytest.mark.skipif(
1022 @pytest.mark.skipif(
1023 sys.platform == "win32", reason="This test does not run under Windows"
1023 sys.platform == "win32", reason="This test does not run under Windows"
1024 )
1024 )
1025 async def test_script_bg_out(event_loop):
1025 async def test_script_bg_out():
1026 ip = get_ipython()
1026 ip = get_ipython()
1027 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1027 ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'")
1028 assert (await ip.user_ns["output"].read()) == b"hi\n"
1028 assert (await ip.user_ns["output"].read()) == b"hi\n"
1029 ip.user_ns["output"].close()
1029 assert ip.user_ns["output"].at_eof()
1030 event_loop.stop()
1031
1032
1030
1033 @dec.skip_win32
1031 @dec.skip_win32
1034 @pytest.mark.skipif(
1032 @pytest.mark.skipif(
@@ -1038,7 +1036,7 b' async def test_script_bg_err():'
1038 ip = get_ipython()
1036 ip = get_ipython()
1039 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1037 ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2")
1040 assert (await ip.user_ns["error"].read()) == b"hello\n"
1038 assert (await ip.user_ns["error"].read()) == b"hello\n"
1041 ip.user_ns["error"].close()
1039 assert ip.user_ns["error"].at_eof()
1042
1040
1043
1041
1044 @dec.skip_win32
1042 @dec.skip_win32
@@ -1052,8 +1050,27 b' async def test_script_bg_out_err():'
1052 )
1050 )
1053 assert (await ip.user_ns["output"].read()) == b"hi\n"
1051 assert (await ip.user_ns["output"].read()) == b"hi\n"
1054 assert (await ip.user_ns["error"].read()) == b"hello\n"
1052 assert (await ip.user_ns["error"].read()) == b"hello\n"
1055 ip.user_ns["output"].close()
1053 assert ip.user_ns["output"].at_eof()
1056 ip.user_ns["error"].close()
1054 assert ip.user_ns["error"].at_eof()
1055
1056
1057 @dec.skip_win32
1058 @pytest.mark.skipif(
1059 sys.platform == "win32", reason="This test does not run under Windows"
1060 )
1061 async def test_script_bg_proc():
1062 ip = get_ipython()
1063 ip.run_cell_magic(
1064 "script", "--bg --proc p sh --out out", "echo 'hi'\necho 'hello' >&2"
1065 )
1066 p = ip.user_ns["p"]
1067 await p.wait()
1068 assert p.returncode == 0
1069 assert (await p.stdout.read()) == b"hi\n"
1070 # not captured, so empty
1071 assert (await p.stderr.read()) == b""
1072 assert p.stdout.at_eof()
1073 assert p.stderr.at_eof()
1057
1074
1058
1075
1059 def test_script_defaults():
1076 def test_script_defaults():
@@ -6,6 +6,7 b' import sys'
6 import warnings
6 import warnings
7 from warnings import warn
7 from warnings import warn
8
8
9 from IPython.core.async_helpers import get_asyncio_loop
9 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
10 from IPython.utils import io
11 from IPython.utils import io
11 from IPython.utils.py3compat import input
12 from IPython.utils.py3compat import input
@@ -521,14 +522,14 b' class TerminalInteractiveShell(InteractiveShell):'
521 # while/true inside which will freeze the prompt.
522 # while/true inside which will freeze the prompt.
522
523
523 policy = asyncio.get_event_loop_policy()
524 policy = asyncio.get_event_loop_policy()
524 try:
525 old_loop = get_asyncio_loop()
525 old_loop = policy.get_event_loop()
526
526 except RuntimeError:
527 # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop`
527 # This happens when the the event loop is closed,
528 # to get the current event loop.
528 # e.g. by calling `asyncio.run()`.
529 # This will probably be replaced by an attribute or input argument,
529 old_loop = None
530 # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here.
530
531 if old_loop is not self.pt_loop:
531 policy.set_event_loop(self.pt_loop)
532 policy.set_event_loop(self.pt_loop)
532 try:
533 try:
533 with patch_stdout(raw=True):
534 with patch_stdout(raw=True):
534 text = self.pt_app.prompt(
535 text = self.pt_app.prompt(
@@ -536,7 +537,7 b' class TerminalInteractiveShell(InteractiveShell):'
536 **self._extra_prompt_options())
537 **self._extra_prompt_options())
537 finally:
538 finally:
538 # Restore the original event loop.
539 # Restore the original event loop.
539 if old_loop is not None:
540 if old_loop is not None and old_loop is not self.pt_loop:
540 policy.set_event_loop(old_loop)
541 policy.set_event_loop(old_loop)
541
542
542 return text
543 return text
@@ -652,7 +653,7 b' class TerminalInteractiveShell(InteractiveShell):'
652 # When we integrate the asyncio event loop, run the UI in the
653 # When we integrate the asyncio event loop, run the UI in the
653 # same event loop as the rest of the code. don't use an actual
654 # same event loop as the rest of the code. don't use an actual
654 # input hook. (Asyncio is not made for nesting event loops.)
655 # input hook. (Asyncio is not made for nesting event loops.)
655 self.pt_loop = asyncio.get_event_loop_policy().get_event_loop()
656 self.pt_loop = get_asyncio_loop()
656
657
657 elif self._inputhook:
658 elif self._inputhook:
658 # If an inputhook was set, create a new asyncio event loop with
659 # If an inputhook was set, create a new asyncio event loop with
@@ -27,15 +27,12 b' prompt_toolkit`s `patch_stdout`)::'
27 In [4]: asyncio.ensure_future(f())
27 In [4]: asyncio.ensure_future(f())
28
28
29 """
29 """
30 import asyncio
31 from prompt_toolkit import __version__ as ptk_version
30 from prompt_toolkit import __version__ as ptk_version
32
31
33 PTK3 = ptk_version.startswith('3.')
32 from IPython.core.async_helpers import get_asyncio_loop
34
33
34 PTK3 = ptk_version.startswith('3.')
35
35
36 # Keep reference to the original asyncio loop, because getting the event loop
37 # within the input hook would return the other loop.
38 loop = asyncio.get_event_loop_policy().get_event_loop()
39
36
40
37
41 def inputhook(context):
38 def inputhook(context):
@@ -52,6 +49,9 b' def inputhook(context):'
52 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
49 # For prompt_toolkit 2.0, we can run the current asyncio event loop,
53 # because prompt_toolkit 2.0 uses a different event loop internally.
50 # because prompt_toolkit 2.0 uses a different event loop internally.
54
51
52 # get the persistent asyncio event loop
53 loop = get_asyncio_loop()
54
55 def stop():
55 def stop():
56 loop.stop()
56 loop.stop()
57
57
@@ -61,4 +61,3 b' def inputhook(context):'
61 loop.run_forever()
61 loop.run_forever()
62 finally:
62 finally:
63 loop.remove_reader(fileno)
63 loop.remove_reader(fileno)
64
General Comments 0
You need to be logged in to leave comments. Login now