From ce820924d81ff15f711a5ad1ce2d6e2868d22039 2020-10-27 21:39:37 From: Ethan Madden Date: 2020-10-27 21:39:37 Subject: [PATCH] Line-buffered output for %%magic scripts --- diff --git a/IPython/core/magics/script.py b/IPython/core/magics/script.py index 8b7f6f9..d45d3d3 100644 --- a/IPython/core/magics/script.py +++ b/IPython/core/magics/script.py @@ -8,7 +8,7 @@ import os import sys import signal import time -from subprocess import Popen, PIPE, CalledProcessError +import asyncio import atexit from IPython.core import magic_arguments @@ -175,11 +175,44 @@ class ScriptMagics(Magics): 2 3 """ - argv = arg_split(line, posix = not sys.platform.startswith('win')) + + async def _handle_stream(stream, stream_arg, file_object): + while True: + line = (await stream.readline()).decode("utf8") + if not line: + break + if stream_arg: + self.shell.user_ns[stream_arg] = line + else: + file_object.write(line) + file_object.flush() + + async def _stream_communicate(process, cell): + process.stdin.write(cell) + process.stdin.close() + stdout_task = asyncio.create_task( + _handle_stream(process.stdout, args.out, sys.stdout) + ) + stderr_task = asyncio.create_task( + _handle_stream(process.stderr, args.err, sys.stderr) + ) + await asyncio.wait([stdout_task, stderr_task]) + + if sys.platform.startswith("win"): + asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) + loop = asyncio.get_event_loop() + + argv = arg_split(line, posix=not sys.platform.startswith('win')) args, cmd = self.shebang.parser.parse_known_args(argv) - try: - p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE) + p = loop.run_until_complete( + asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE + ) + ) except OSError as e: if e.errno == errno.ENOENT: print("Couldn't find program: %r" % cmd[0]) @@ -208,17 +241,17 @@ class ScriptMagics(Magics): return try: - out, err = p.communicate(cell) + loop.run_until_complete(_stream_communicate(p, cell)) except KeyboardInterrupt: try: p.send_signal(signal.SIGINT) time.sleep(0.1) - if p.poll() is not None: + if p.returncode is not None: print("Process is interrupted.") return p.terminate() time.sleep(0.1) - if p.poll() is not None: + if p.returncode is not None: print("Process is terminated.") return p.kill() @@ -229,20 +262,9 @@ class ScriptMagics(Magics): print("Error while terminating subprocess (pid=%i): %s" \ % (p.pid, e)) return - out = py3compat.decode(out) - err = py3compat.decode(err) - if args.out: - self.shell.user_ns[args.out] = out - else: - sys.stdout.write(out) - sys.stdout.flush() - if args.err: - self.shell.user_ns[args.err] = err - else: - sys.stderr.write(err) - sys.stderr.flush() if args.raise_error and p.returncode!=0: - raise CalledProcessError(p.returncode, cell, output=out, stderr=err) + print(p.returncode) + raise CalledProcessError(p.returncode, cell) def _run_script(self, p, cell, to_close): """callback for running the script in the background""" @@ -263,7 +285,7 @@ class ScriptMagics(Magics): if not self.bg_processes: return for p in self.bg_processes: - if p.poll() is None: + if p.returncode is None: try: p.send_signal(signal.SIGINT) except: @@ -273,7 +295,7 @@ class ScriptMagics(Magics): if not self.bg_processes: return for p in self.bg_processes: - if p.poll() is None: + if p.returncode is None: try: p.terminate() except: @@ -283,7 +305,7 @@ class ScriptMagics(Magics): if not self.bg_processes: return for p in self.bg_processes: - if p.poll() is None: + if p.returncode is None: try: p.kill() except: @@ -291,4 +313,4 @@ class ScriptMagics(Magics): self._gc_bg_processes() def _gc_bg_processes(self): - self.bg_processes = [p for p in self.bg_processes if p.poll() is None] + self.bg_processes = [p for p in self.bg_processes if p.returncode is None] diff --git a/IPython/core/tests/test_magic.py b/IPython/core/tests/test_magic.py index 10dd8ce..35f04cc 100644 --- a/IPython/core/tests/test_magic.py +++ b/IPython/core/tests/test_magic.py @@ -948,26 +948,26 @@ def test_script_out_err(): nt.assert_equal(ip.user_ns['error'], 'hello\n') @dec.skip_win32 -def test_script_bg_out(): +async def test_script_bg_out(): ip = get_ipython() ip.run_cell_magic("script", "--bg --out output sh", "echo 'hi'") - nt.assert_equal(ip.user_ns['output'].read(), b'hi\n') + nt.assert_equal((await ip.user_ns['output'].read()), b'hi\n') ip.user_ns['output'].close() @dec.skip_win32 -def test_script_bg_err(): +async def test_script_bg_err(): ip = get_ipython() ip.run_cell_magic("script", "--bg --err error sh", "echo 'hello' >&2") - nt.assert_equal(ip.user_ns['error'].read(), b'hello\n') + nt.assert_equal((await ip.user_ns['error'].read()), b'hello\n') ip.user_ns['error'].close() @dec.skip_win32 -def test_script_bg_out_err(): +async def test_script_bg_out_err(): ip = get_ipython() ip.run_cell_magic("script", "--bg --out output --err error sh", "echo 'hi'\necho 'hello' >&2") - nt.assert_equal(ip.user_ns['output'].read(), b'hi\n') - nt.assert_equal(ip.user_ns['error'].read(), b'hello\n') + nt.assert_equal((await ip.user_ns['output'].read()), b'hi\n') + nt.assert_equal((await ip.user_ns['error'].read()), b'hello\n') ip.user_ns['output'].close() ip.user_ns['error'].close()