script.py
372 lines
| 12.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r7299 | """Magic functions for running cells in various scripts.""" | ||
Min RK
|
r22340 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
MinRK
|
r7299 | |||
Matthias Bussonnier
|
r26854 | import asyncio | ||
Garland Zhang
|
r28217 | import asyncio.exceptions | ||
Matthias Bussonnier
|
r26854 | import atexit | ||
MinRK
|
r8470 | import errno | ||
MinRK
|
r7299 | import os | ||
Takafumi Arakaki
|
r7561 | import signal | ||
Matthias Bussonnier
|
r26854 | import sys | ||
Takafumi Arakaki
|
r7561 | import time | ||
Matthias Bussonnier
|
r26207 | from subprocess import CalledProcessError | ||
Min RK
|
r27387 | from threading import Thread | ||
Matthias Bussonnier
|
r26207 | |||
Min RK
|
r27387 | from traitlets import Any, Dict, List, default | ||
Matthias Bussonnier
|
r26854 | |||
MinRK
|
r7398 | from IPython.core import magic_arguments | ||
Min RK
|
r27387 | from IPython.core.async_helpers import _AsyncIOProxy | ||
Matthias Bussonnier
|
r26854 | from IPython.core.magic import Magics, cell_magic, line_magic, magics_class | ||
MinRK
|
r8469 | from IPython.utils.process import arg_split | ||
Matthias Bussonnier
|
r26207 | |||
MinRK
|
r7299 | #----------------------------------------------------------------------------- | ||
# Magic implementation classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r7398 | def script_args(f): | ||
"""single decorator for adding script args""" | ||||
args = [ | ||||
magic_arguments.argument( | ||||
'--out', type=str, | ||||
help="""The variable in which to store stdout from the script. | ||||
If the script is backgrounded, this will be the stdout *pipe*, | ||||
Matthias Bussonnier
|
r24375 | instead of the stderr text itself and will not be auto closed. | ||
MinRK
|
r7398 | """ | ||
), | ||||
magic_arguments.argument( | ||||
'--err', type=str, | ||||
help="""The variable in which to store stderr from the script. | ||||
If the script is backgrounded, this will be the stderr *pipe*, | ||||
Matthias Bussonnier
|
r24375 | instead of the stderr text itself and will not be autoclosed. | ||
MinRK
|
r7398 | """ | ||
), | ||||
magic_arguments.argument( | ||||
'--bg', action="store_true", | ||||
help="""Whether to run the script in the background. | ||||
If given, the only way to see the output of the command is | ||||
with --out/err. | ||||
""" | ||||
), | ||||
Takafumi Arakaki
|
r7549 | magic_arguments.argument( | ||
'--proc', type=str, | ||||
help="""The variable in which to store Popen instance. | ||||
This is used only when --bg option is given. | ||||
""" | ||||
), | ||||
M Pacer
|
r24410 | magic_arguments.argument( | ||
Matthias Bussonnier
|
r24519 | '--no-raise-error', action="store_false", dest='raise_error', | ||
Min RK
|
r27387 | help="""Whether you should raise an error message in addition to | ||
M Pacer
|
r24410 | a stream on stderr if you get a nonzero exit code. | ||
Matthias Bussonnier
|
r27639 | """, | ||
), | ||||
MinRK
|
r7398 | ] | ||
for arg in args: | ||||
f = arg(f) | ||||
return f | ||||
Matthias Bussonnier
|
r26854 | |||
MinRK
|
r7299 | @magics_class | ||
Matthias BUSSONNIER
|
r13237 | class ScriptMagics(Magics): | ||
MinRK
|
r7299 | """Magics for talking to scripts | ||
This defines a base `%%script` cell magic for running a cell | ||||
with a program in a subprocess, and registers a few top-level | ||||
magics that call %%script with common interpreters. | ||||
""" | ||||
Min RK
|
r27387 | |||
event_loop = Any( | ||||
help=""" | ||||
The event loop on which to run subprocesses | ||||
Not the main event loop, | ||||
because we want to be able to make blocking calls | ||||
and have certain requirements we don't want to impose on the main loop. | ||||
""" | ||||
) | ||||
Matthias Bussonnier
|
r28628 | script_magics: List = List( | ||
MinRK
|
r7299 | help="""Extra script cell magics to define | ||
This generates simple wrappers of `%%script foo` as `%%foo`. | ||||
If you want to add script magics that aren't on your path, | ||||
specify them in script_paths | ||||
""", | ||||
Min RK
|
r22340 | ).tag(config=True) | ||
Matthias Bussonnier
|
r28628 | |||
Min RK
|
r22340 | @default('script_magics') | ||
MinRK
|
r7299 | def _script_magics_default(self): | ||
MinRK
|
r8469 | """default to a common list of programs""" | ||
MinRK
|
r7299 | |||
MinRK
|
r8469 | defaults = [ | ||
MinRK
|
r7299 | 'sh', | ||
'bash', | ||||
'perl', | ||||
'ruby', | ||||
MinRK
|
r8469 | 'python', | ||
Thomas Kluyver
|
r16020 | 'python2', | ||
MinRK
|
r7299 | 'python3', | ||
'pypy', | ||||
MinRK
|
r8469 | ] | ||
if os.name == 'nt': | ||||
defaults.extend([ | ||||
'cmd', | ||||
]) | ||||
MinRK
|
r7299 | |||
return defaults | ||||
Min RK
|
r22340 | script_paths = Dict( | ||
MinRK
|
r7299 | help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby' | ||
Only necessary for items in script_magics where the default path will not | ||||
find the right interpreter. | ||||
""" | ||||
Min RK
|
r22340 | ).tag(config=True) | ||
MinRK
|
r7299 | |||
def __init__(self, shell=None): | ||||
Matthias BUSSONNIER
|
r13237 | super(ScriptMagics, self).__init__(shell=shell) | ||
MinRK
|
r7299 | self._generate_script_magics() | ||
Takafumi Arakaki
|
r7618 | self.bg_processes = [] | ||
Takafumi Arakaki
|
r7622 | atexit.register(self.kill_bg_processes) | ||
Takafumi Arakaki
|
r7618 | |||
def __del__(self): | ||||
self.kill_bg_processes() | ||||
MinRK
|
r7299 | |||
def _generate_script_magics(self): | ||||
cell_magics = self.magics['cell'] | ||||
for name in self.script_magics: | ||||
cell_magics[name] = self._make_script_magic(name) | ||||
def _make_script_magic(self, name): | ||||
"""make a named magic, that calls %%script with a particular program""" | ||||
# expand to explicit path if necessary: | ||||
script = self.script_paths.get(name, name) | ||||
MinRK
|
r7398 | @magic_arguments.magic_arguments() | ||
@script_args | ||||
MinRK
|
r7299 | def named_script_magic(line, cell): | ||
# if line, add it as cl-flags | ||||
if line: | ||||
Matthias Bussonnier
|
r26208 | line = "%s %s" % (script, line) | ||
MinRK
|
r7299 | else: | ||
line = script | ||||
return self.shebang(line, cell) | ||||
# write a basic docstring: | ||||
named_script_magic.__doc__ = \ | ||||
"""%%{name} script magic | ||||
Run cells with {script} in a subprocess. | ||||
This is a shortcut for `%%script {script}` | ||||
""".format(**locals()) | ||||
return named_script_magic | ||||
MinRK
|
r7398 | |||
@magic_arguments.magic_arguments() | ||||
@script_args | ||||
MinRK
|
r7299 | @cell_magic("script") | ||
def shebang(self, line, cell): | ||||
"""Run a cell via a shell command | ||||
Matthias Bussonnier
|
r27294 | |||
MinRK
|
r7299 | The `%%script` line is like the #! line of script, | ||
specifying a program (bash, perl, ruby, etc.) with which to run. | ||||
Matthias Bussonnier
|
r27294 | |||
MinRK
|
r7299 | The rest of the cell is run by that program. | ||
Matthias Bussonnier
|
r27294 | |||
MinRK
|
r7299 | Examples | ||
-------- | ||||
:: | ||||
Matthias Bussonnier
|
r27294 | |||
MinRK
|
r7299 | In [1]: %%script bash | ||
...: for i in 1 2 3; do | ||||
...: echo $i | ||||
...: done | ||||
1 | ||||
2 | ||||
3 | ||||
""" | ||||
Ethan Madden
|
r26199 | |||
Min RK
|
r27387 | # Create the event loop in which to run script magics | ||
# this operates on a background thread | ||||
if self.event_loop is None: | ||||
if sys.platform == "win32": | ||||
# don't override the current policy, | ||||
# just create an event loop | ||||
event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop() | ||||
else: | ||||
event_loop = asyncio.new_event_loop() | ||||
self.event_loop = event_loop | ||||
# start the loop in a background thread | ||||
asyncio_thread = Thread(target=event_loop.run_forever, daemon=True) | ||||
asyncio_thread.start() | ||||
else: | ||||
event_loop = self.event_loop | ||||
def in_thread(coro): | ||||
"""Call a coroutine on the asyncio thread""" | ||||
return asyncio.run_coroutine_threadsafe(coro, event_loop).result() | ||||
Garland Zhang
|
r28217 | async def _readchunk(stream): | ||
try: | ||||
Garland Zhang
|
r28218 | return await stream.readuntil(b"\n") | ||
Garland Zhang
|
r28217 | except asyncio.exceptions.IncompleteReadError as e: | ||
return e.partial | ||||
except asyncio.exceptions.LimitOverrunError as e: | ||||
return await stream.read(e.consumed) | ||||
Ethan Madden
|
r26199 | async def _handle_stream(stream, stream_arg, file_object): | ||
while True: | ||||
Garland Zhang
|
r28217 | chunk = (await _readchunk(stream)).decode("utf8", errors="replace") | ||
if not chunk: | ||||
Ethan Madden
|
r26199 | break | ||
if stream_arg: | ||||
Garland Zhang
|
r28217 | self.shell.user_ns[stream_arg] = chunk | ||
Ethan Madden
|
r26199 | else: | ||
Garland Zhang
|
r28217 | file_object.write(chunk) | ||
Ethan Madden
|
r26199 | file_object.flush() | ||
async def _stream_communicate(process, cell): | ||||
process.stdin.write(cell) | ||||
process.stdin.close() | ||||
stdout_task = asyncio.create_task( | ||||
_handle_stream(process.stdout, args.out, sys.stdout) | ||||
) | ||||
stderr_task = asyncio.create_task( | ||||
_handle_stream(process.stderr, args.err, sys.stderr) | ||||
) | ||||
await asyncio.wait([stdout_task, stderr_task]) | ||||
Thomas Kluyver
|
r26438 | await process.wait() | ||
Ethan Madden
|
r26199 | |||
Ethan Madden
|
r26200 | argv = arg_split(line, posix=not sys.platform.startswith("win")) | ||
MinRK
|
r7398 | args, cmd = self.shebang.parser.parse_known_args(argv) | ||
Min RK
|
r27387 | |||
MinRK
|
r8470 | try: | ||
Min RK
|
r27387 | p = in_thread( | ||
Ethan Madden
|
r26199 | asyncio.create_subprocess_exec( | ||
*cmd, | ||||
stdout=asyncio.subprocess.PIPE, | ||||
stderr=asyncio.subprocess.PIPE, | ||||
Matthias Bussonnier
|
r26505 | stdin=asyncio.subprocess.PIPE, | ||
Ethan Madden
|
r26199 | ) | ||
) | ||||
MinRK
|
r8470 | except OSError as e: | ||
if e.errno == errno.ENOENT: | ||||
Thomas Kluyver
|
r13348 | print("Couldn't find program: %r" % cmd[0]) | ||
MinRK
|
r8470 | return | ||
else: | ||||
raise | ||||
Min RK
|
r27387 | |||
Eric Galloway
|
r17171 | if not cell.endswith('\n'): | ||
cell += '\n' | ||||
MinRK
|
r7440 | cell = cell.encode('utf8', 'replace') | ||
MinRK
|
r7398 | if args.bg: | ||
Takafumi Arakaki
|
r7618 | self.bg_processes.append(p) | ||
Takafumi Arakaki
|
r7630 | self._gc_bg_processes() | ||
Matthias Bussonnier
|
r24375 | to_close = [] | ||
MinRK
|
r7398 | if args.out: | ||
Min RK
|
r27387 | self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop) | ||
Matthias Bussonnier
|
r24375 | else: | ||
to_close.append(p.stdout) | ||||
MinRK
|
r7398 | if args.err: | ||
Min RK
|
r27387 | self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop) | ||
Matthias Bussonnier
|
r24375 | else: | ||
to_close.append(p.stderr) | ||||
Min RK
|
r27387 | event_loop.call_soon_threadsafe( | ||
lambda: asyncio.Task(self._run_script(p, cell, to_close)) | ||||
) | ||||
Takafumi Arakaki
|
r7549 | if args.proc: | ||
Min RK
|
r27387 | proc_proxy = _AsyncIOProxy(p, event_loop) | ||
proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop) | ||||
proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop) | ||||
self.shell.user_ns[args.proc] = proc_proxy | ||||
MinRK
|
r7398 | return | ||
Min RK
|
r27387 | |||
Takafumi Arakaki
|
r7561 | try: | ||
Min RK
|
r27387 | in_thread(_stream_communicate(p, cell)) | ||
Takafumi Arakaki
|
r7561 | except KeyboardInterrupt: | ||
Takafumi Arakaki
|
r7562 | try: | ||
p.send_signal(signal.SIGINT) | ||||
Min RK
|
r27387 | in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) | ||
Ethan Madden
|
r26199 | if p.returncode is not None: | ||
Thomas Kluyver
|
r13348 | print("Process is interrupted.") | ||
Takafumi Arakaki
|
r7562 | return | ||
p.terminate() | ||||
Min RK
|
r27387 | in_thread(asyncio.wait_for(p.wait(), timeout=0.1)) | ||
Ethan Madden
|
r26199 | if p.returncode is not None: | ||
Thomas Kluyver
|
r13348 | print("Process is terminated.") | ||
Takafumi Arakaki
|
r7562 | return | ||
p.kill() | ||||
Thomas Kluyver
|
r13348 | print("Process is killed.") | ||
Takafumi Arakaki
|
r7565 | except OSError: | ||
pass | ||||
except Exception as e: | ||||
Matthias Bussonnier
|
r26208 | print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e)) | ||
Takafumi Arakaki
|
r7561 | return | ||
Min RK
|
r27387 | |||
if args.raise_error and p.returncode != 0: | ||||
Thomas Kluyver
|
r26438 | # If we get here and p.returncode is still None, we must have | ||
# killed it but not yet seen its return code. We don't wait for it, | ||||
# in case it's stuck in uninterruptible sleep. -9 = SIGKILL | ||||
rc = p.returncode or -9 | ||||
raise CalledProcessError(rc, cell) | ||||
Nikita Kniazev
|
r27102 | |||
shebang.__skip_doctest__ = os.name != "posix" | ||||
Min RK
|
r27387 | |||
async def _run_script(self, p, cell, to_close): | ||||
MinRK
|
r7398 | """callback for running the script in the background""" | ||
Min RK
|
r27387 | |||
MinRK
|
r7398 | p.stdin.write(cell) | ||
Min RK
|
r27387 | await p.stdin.drain() | ||
MinRK
|
r7398 | p.stdin.close() | ||
Min RK
|
r27387 | await p.stdin.wait_closed() | ||
await p.wait() | ||||
# asyncio read pipes have no close | ||||
# but we should drain the data anyway | ||||
Matthias Bussonnier
|
r24375 | for s in to_close: | ||
Min RK
|
r27387 | await s.read() | ||
self._gc_bg_processes() | ||||
Takafumi Arakaki
|
r7618 | |||
Takafumi Arakaki
|
r7619 | @line_magic("killbgscripts") | ||
Takafumi Arakaki
|
r7629 | def killbgscripts(self, _nouse_=''): | ||
"""Kill all BG processes started by %%script and its family.""" | ||||
self.kill_bg_processes() | ||||
Thomas Kluyver
|
r13348 | print("All background processes were killed.") | ||
Takafumi Arakaki
|
r7629 | |||
def kill_bg_processes(self): | ||||
Takafumi Arakaki
|
r7618 | """Kill all BG processes which are still running.""" | ||
Leo Singer
|
r23508 | if not self.bg_processes: | ||
return | ||||
Takafumi Arakaki
|
r7618 | for p in self.bg_processes: | ||
Ethan Madden
|
r26199 | if p.returncode is None: | ||
Takafumi Arakaki
|
r7618 | try: | ||
p.send_signal(signal.SIGINT) | ||||
except: | ||||
pass | ||||
time.sleep(0.1) | ||||
Leo Singer
|
r23508 | self._gc_bg_processes() | ||
if not self.bg_processes: | ||||
return | ||||
Takafumi Arakaki
|
r7618 | for p in self.bg_processes: | ||
Ethan Madden
|
r26199 | if p.returncode is None: | ||
Takafumi Arakaki
|
r7618 | try: | ||
p.terminate() | ||||
except: | ||||
pass | ||||
time.sleep(0.1) | ||||
Leo Singer
|
r23508 | self._gc_bg_processes() | ||
if not self.bg_processes: | ||||
return | ||||
Takafumi Arakaki
|
r7618 | for p in self.bg_processes: | ||
Ethan Madden
|
r26199 | if p.returncode is None: | ||
Takafumi Arakaki
|
r7618 | try: | ||
p.kill() | ||||
except: | ||||
pass | ||||
Takafumi Arakaki
|
r7630 | self._gc_bg_processes() | ||
def _gc_bg_processes(self): | ||||
Ethan Madden
|
r26199 | self.bg_processes = [p for p in self.bg_processes if p.returncode is None] | ||