##// END OF EJS Templates
Merge pull request #12639 from jetpacktuxedo/buffering
Merge pull request #12639 from jetpacktuxedo/buffering

File last commit:

r26200:346181eb
r26204:3b094c83 merge
Show More
script.py
315 lines | 10.1 KiB | text/x-python | PythonLexer
"""Magic functions for running cells in various scripts."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import errno
import os
import sys
import signal
import time
import asyncio
import atexit
from IPython.core import magic_arguments
from IPython.core.magic import (
Magics, magics_class, line_magic, cell_magic
)
from IPython.lib.backgroundjobs import BackgroundJobManager
from IPython.utils import py3compat
from IPython.utils.process import arg_split
from traitlets import List, Dict, default
#-----------------------------------------------------------------------------
# Magic implementation classes
#-----------------------------------------------------------------------------
def script_args(f):
"""single decorator for adding script args"""
args = [
magic_arguments.argument(
'--out', type=str,
help="""The variable in which to store stdout from the script.
If the script is backgrounded, this will be the stdout *pipe*,
instead of the stderr text itself and will not be auto closed.
"""
),
magic_arguments.argument(
'--err', type=str,
help="""The variable in which to store stderr from the script.
If the script is backgrounded, this will be the stderr *pipe*,
instead of the stderr text itself and will not be autoclosed.
"""
),
magic_arguments.argument(
'--bg', action="store_true",
help="""Whether to run the script in the background.
If given, the only way to see the output of the command is
with --out/err.
"""
),
magic_arguments.argument(
'--proc', type=str,
help="""The variable in which to store Popen instance.
This is used only when --bg option is given.
"""
),
magic_arguments.argument(
'--no-raise-error', action="store_false", dest='raise_error',
help="""Whether you should raise an error message in addition to
a stream on stderr if you get a nonzero exit code.
"""
)
]
for arg in args:
f = arg(f)
return f
@magics_class
class ScriptMagics(Magics):
"""Magics for talking to scripts
This defines a base `%%script` cell magic for running a cell
with a program in a subprocess, and registers a few top-level
magics that call %%script with common interpreters.
"""
script_magics = List(
help="""Extra script cell magics to define
This generates simple wrappers of `%%script foo` as `%%foo`.
If you want to add script magics that aren't on your path,
specify them in script_paths
""",
).tag(config=True)
@default('script_magics')
def _script_magics_default(self):
"""default to a common list of programs"""
defaults = [
'sh',
'bash',
'perl',
'ruby',
'python',
'python2',
'python3',
'pypy',
]
if os.name == 'nt':
defaults.extend([
'cmd',
])
return defaults
script_paths = Dict(
help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
Only necessary for items in script_magics where the default path will not
find the right interpreter.
"""
).tag(config=True)
def __init__(self, shell=None):
super(ScriptMagics, self).__init__(shell=shell)
self._generate_script_magics()
self.job_manager = BackgroundJobManager()
self.bg_processes = []
atexit.register(self.kill_bg_processes)
def __del__(self):
self.kill_bg_processes()
def _generate_script_magics(self):
cell_magics = self.magics['cell']
for name in self.script_magics:
cell_magics[name] = self._make_script_magic(name)
def _make_script_magic(self, name):
"""make a named magic, that calls %%script with a particular program"""
# expand to explicit path if necessary:
script = self.script_paths.get(name, name)
@magic_arguments.magic_arguments()
@script_args
def named_script_magic(line, cell):
# if line, add it as cl-flags
if line:
line = "%s %s" % (script, line)
else:
line = script
return self.shebang(line, cell)
# write a basic docstring:
named_script_magic.__doc__ = \
"""%%{name} script magic
Run cells with {script} in a subprocess.
This is a shortcut for `%%script {script}`
""".format(**locals())
return named_script_magic
@magic_arguments.magic_arguments()
@script_args
@cell_magic("script")
def shebang(self, line, cell):
"""Run a cell via a shell command
The `%%script` line is like the #! line of script,
specifying a program (bash, perl, ruby, etc.) with which to run.
The rest of the cell is run by that program.
Examples
--------
::
In [1]: %%script bash
...: for i in 1 2 3; do
...: echo $i
...: done
1
2
3
"""
async def _handle_stream(stream, stream_arg, file_object):
while True:
line = (await stream.readline()).decode("utf8")
if not line:
break
if stream_arg:
self.shell.user_ns[stream_arg] = line
else:
file_object.write(line)
file_object.flush()
async def _stream_communicate(process, cell):
process.stdin.write(cell)
process.stdin.close()
stdout_task = asyncio.create_task(
_handle_stream(process.stdout, args.out, sys.stdout)
)
stderr_task = asyncio.create_task(
_handle_stream(process.stderr, args.err, sys.stderr)
)
await asyncio.wait([stdout_task, stderr_task])
if sys.platform.startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
loop = asyncio.get_event_loop()
argv = arg_split(line, posix=not sys.platform.startswith("win"))
args, cmd = self.shebang.parser.parse_known_args(argv)
try:
p = loop.run_until_complete(
asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE
)
)
except OSError as e:
if e.errno == errno.ENOENT:
print("Couldn't find program: %r" % cmd[0])
return
else:
raise
if not cell.endswith('\n'):
cell += '\n'
cell = cell.encode('utf8', 'replace')
if args.bg:
self.bg_processes.append(p)
self._gc_bg_processes()
to_close = []
if args.out:
self.shell.user_ns[args.out] = p.stdout
else:
to_close.append(p.stdout)
if args.err:
self.shell.user_ns[args.err] = p.stderr
else:
to_close.append(p.stderr)
self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
if args.proc:
self.shell.user_ns[args.proc] = p
return
try:
loop.run_until_complete(_stream_communicate(p, cell))
except KeyboardInterrupt:
try:
p.send_signal(signal.SIGINT)
time.sleep(0.1)
if p.returncode is not None:
print("Process is interrupted.")
return
p.terminate()
time.sleep(0.1)
if p.returncode is not None:
print("Process is terminated.")
return
p.kill()
print("Process is killed.")
except OSError:
pass
except Exception as e:
print("Error while terminating subprocess (pid=%i): %s" \
% (p.pid, e))
return
if args.raise_error and p.returncode!=0:
raise CalledProcessError(p.returncode, cell)
def _run_script(self, p, cell, to_close):
"""callback for running the script in the background"""
p.stdin.write(cell)
p.stdin.close()
for s in to_close:
s.close()
p.wait()
@line_magic("killbgscripts")
def killbgscripts(self, _nouse_=''):
"""Kill all BG processes started by %%script and its family."""
self.kill_bg_processes()
print("All background processes were killed.")
def kill_bg_processes(self):
"""Kill all BG processes which are still running."""
if not self.bg_processes:
return
for p in self.bg_processes:
if p.returncode is None:
try:
p.send_signal(signal.SIGINT)
except:
pass
time.sleep(0.1)
self._gc_bg_processes()
if not self.bg_processes:
return
for p in self.bg_processes:
if p.returncode is None:
try:
p.terminate()
except:
pass
time.sleep(0.1)
self._gc_bg_processes()
if not self.bg_processes:
return
for p in self.bg_processes:
if p.returncode is None:
try:
p.kill()
except:
pass
self._gc_bg_processes()
def _gc_bg_processes(self):
self.bg_processes = [p for p in self.bg_processes if p.returncode is None]