inputtransformer.py
444 lines
| 14.1 KiB
| text/x-python
|
PythonLexer
Thomas Kluyver
|
r10090 | import abc | ||
Thomas Kluyver
|
r10102 | import functools | ||
Thomas Kluyver
|
r10090 | import re | ||
Thomas Kluyver
|
r10093 | from StringIO import StringIO | ||
Thomas Kluyver
|
r10105 | |||
Thomas Kluyver
|
r11124 | from IPython.core.splitinput import LineInfo | ||
Thomas Kluyver
|
r10110 | from IPython.utils import tokenize2 | ||
from IPython.utils.tokenize2 import generate_tokens, untokenize, TokenError | ||||
Thomas Kluyver
|
r10093 | |||
#----------------------------------------------------------------------------- | ||||
# Globals | ||||
#----------------------------------------------------------------------------- | ||||
# The escape sequences that define the syntax transformations IPython will | ||||
# apply to user input. These can NOT be just changed here: many regular | ||||
# expressions and other parts of the code may use their hardcoded values, and | ||||
# for all intents and purposes they constitute the 'IPython syntax', so they | ||||
# should be considered fixed. | ||||
ESC_SHELL = '!' # Send line to underlying system shell | ||||
ESC_SH_CAP = '!!' # Send line to system shell and capture output | ||||
ESC_HELP = '?' # Find information about object | ||||
ESC_HELP2 = '??' # Find extra-detailed information about object | ||||
ESC_MAGIC = '%' # Call magic function | ||||
ESC_MAGIC2 = '%%' # Call cell-magic function | ||||
ESC_QUOTE = ',' # Split args on whitespace, quote each as string and call | ||||
ESC_QUOTE2 = ';' # Quote all args as a single string, call | ||||
ESC_PAREN = '/' # Call first argument with rest of line as arguments | ||||
ESC_SEQUENCES = [ESC_SHELL, ESC_SH_CAP, ESC_HELP ,\ | ||||
ESC_HELP2, ESC_MAGIC, ESC_MAGIC2,\ | ||||
ESC_QUOTE, ESC_QUOTE2, ESC_PAREN ] | ||||
Thomas Kluyver
|
r10090 | class InputTransformer(object): | ||
Thomas Kluyver
|
r10101 | """Abstract base class for line-based input transformers.""" | ||
Thomas Kluyver
|
r10090 | __metaclass__ = abc.ABCMeta | ||
@abc.abstractmethod | ||||
def push(self, line): | ||||
Thomas Kluyver
|
r10101 | """Send a line of input to the transformer, returning the transformed | ||
input or None if the transformer is waiting for more input. | ||||
Must be overridden by subclasses. | ||||
""" | ||||
Thomas Kluyver
|
r10090 | pass | ||
@abc.abstractmethod | ||||
def reset(self): | ||||
Thomas Kluyver
|
r10101 | """Return, transformed any lines that the transformer has accumulated, | ||
and reset its internal state. | ||||
Must be overridden by subclasses. | ||||
""" | ||||
Thomas Kluyver
|
r10090 | pass | ||
Thomas Kluyver
|
r10091 | |||
Thomas Kluyver
|
r10102 | @classmethod | ||
def wrap(cls, func): | ||||
"""Can be used by subclasses as a decorator, to return a factory that | ||||
will allow instantiation with the decorated object. | ||||
""" | ||||
@functools.wraps(func) | ||||
Thomas Kluyver
|
r10252 | def transformer_factory(**kwargs): | ||
return cls(func, **kwargs) | ||||
Thomas Kluyver
|
r10097 | |||
Thomas Kluyver
|
r10102 | return transformer_factory | ||
class StatelessInputTransformer(InputTransformer): | ||||
"""Wrapper for a stateless input transformer implemented as a function.""" | ||||
def __init__(self, func): | ||||
self.func = func | ||||
def __repr__(self): | ||||
Jason Grout
|
r11163 | return "StatelessInputTransformer(func={0!r})".format(self.func) | ||
Thomas Kluyver
|
r10090 | |||
Thomas Kluyver
|
r10102 | def push(self, line): | ||
"""Send a line of input to the transformer, returning the | ||||
transformed input.""" | ||||
return self.func(line) | ||||
def reset(self): | ||||
"""No-op - exists for compatibility.""" | ||||
pass | ||||
class CoroutineInputTransformer(InputTransformer): | ||||
"""Wrapper for an input transformer implemented as a coroutine.""" | ||||
Thomas Kluyver
|
r10252 | def __init__(self, coro, **kwargs): | ||
Thomas Kluyver
|
r10102 | # Prime it | ||
Thomas Kluyver
|
r10252 | self.coro = coro(**kwargs) | ||
Thomas Kluyver
|
r10102 | next(self.coro) | ||
def __repr__(self): | ||||
Jason Grout
|
r11163 | return "CoroutineInputTransformer(coro={0!r})".format(self.coro) | ||
Thomas Kluyver
|
r10102 | |||
def push(self, line): | ||||
"""Send a line of input to the transformer, returning the | ||||
transformed input or None if the transformer is waiting for more | ||||
input. | ||||
""" | ||||
return self.coro.send(line) | ||||
def reset(self): | ||||
"""Return, transformed any lines that the transformer has | ||||
accumulated, and reset its internal state. | ||||
""" | ||||
return self.coro.send(None) | ||||
Thomas Kluyver
|
r10090 | |||
Thomas Kluyver
|
r10103 | class TokenInputTransformer(InputTransformer): | ||
"""Wrapper for a token-based input transformer. | ||||
func should accept a list of tokens (5-tuples, see tokenize docs), and | ||||
return an iterable which can be passed to tokenize.untokenize(). | ||||
""" | ||||
def __init__(self, func): | ||||
self.func = func | ||||
self.current_line = "" | ||||
Thomas Kluyver
|
r10106 | self.line_used = False | ||
Thomas Kluyver
|
r10105 | self.reset_tokenizer() | ||
def reset_tokenizer(self): | ||||
self.tokenizer = generate_tokens(self.get_line) | ||||
Thomas Kluyver
|
r10103 | |||
def get_line(self): | ||||
if self.line_used: | ||||
Thomas Kluyver
|
r10110 | raise TokenError | ||
Thomas Kluyver
|
r10103 | self.line_used = True | ||
return self.current_line | ||||
def push(self, line): | ||||
self.current_line += line + "\n" | ||||
Thomas Kluyver
|
r10106 | if self.current_line.isspace(): | ||
return self.reset() | ||||
Thomas Kluyver
|
r10103 | self.line_used = False | ||
tokens = [] | ||||
Thomas Kluyver
|
r10106 | stop_at_NL = False | ||
Thomas Kluyver
|
r10103 | try: | ||
for intok in self.tokenizer: | ||||
tokens.append(intok) | ||||
Thomas Kluyver
|
r10106 | t = intok[0] | ||
Thomas Kluyver
|
r10110 | if t == tokenize2.NEWLINE or (stop_at_NL and t == tokenize2.NL): | ||
Thomas Kluyver
|
r10103 | # Stop before we try to pull a line we don't have yet | ||
break | ||||
Thomas Kluyver
|
r10110 | elif t == tokenize2.ERRORTOKEN: | ||
Thomas Kluyver
|
r10106 | stop_at_NL = True | ||
Thomas Kluyver
|
r10110 | except TokenError: | ||
Thomas Kluyver
|
r10103 | # Multi-line statement - stop and try again with the next line | ||
Thomas Kluyver
|
r10105 | self.reset_tokenizer() | ||
Thomas Kluyver
|
r10103 | return None | ||
Thomas Kluyver
|
r10106 | return self.output(tokens) | ||
def output(self, tokens): | ||||
Thomas Kluyver
|
r10103 | self.current_line = "" | ||
Thomas Kluyver
|
r10105 | self.reset_tokenizer() | ||
return untokenize(self.func(tokens)).rstrip('\n') | ||||
Thomas Kluyver
|
r10103 | |||
def reset(self): | ||||
l = self.current_line | ||||
self.current_line = "" | ||||
Thomas Kluyver
|
r10106 | self.reset_tokenizer() | ||
Thomas Kluyver
|
r10103 | if l: | ||
return l.rstrip('\n') | ||||
Thomas Kluyver
|
r10106 | class assemble_python_lines(TokenInputTransformer): | ||
def __init__(self): | ||||
super(assemble_python_lines, self).__init__(None) | ||||
def output(self, tokens): | ||||
return self.reset() | ||||
@CoroutineInputTransformer.wrap | ||||
def assemble_logical_lines(): | ||||
"""Join lines following explicit line continuations (\)""" | ||||
line = '' | ||||
while True: | ||||
line = (yield line) | ||||
if not line or line.isspace(): | ||||
continue | ||||
parts = [] | ||||
while line is not None: | ||||
Thomas Kluyver
|
r10112 | if line.endswith('\\') and (not has_comment(line)): | ||
parts.append(line[:-1]) | ||||
line = (yield None) # Get another line | ||||
else: | ||||
parts.append(line) | ||||
Thomas Kluyver
|
r10106 | break | ||
# Output | ||||
Thomas Kluyver
|
r10112 | line = ''.join(parts) | ||
Thomas Kluyver
|
r10093 | |||
# Utilities | ||||
def _make_help_call(target, esc, lspace, next_input=None): | ||||
"""Prepares a pinfo(2)/psearch call from a target name and the escape | ||||
(i.e. ? or ??)""" | ||||
method = 'pinfo2' if esc == '??' \ | ||||
else 'psearch' if '*' in target \ | ||||
else 'pinfo' | ||||
arg = " ".join([method, target]) | ||||
if next_input is None: | ||||
return '%sget_ipython().magic(%r)' % (lspace, arg) | ||||
else: | ||||
return '%sget_ipython().set_next_input(%r);get_ipython().magic(%r)' % \ | ||||
(lspace, next_input, arg) | ||||
Thomas Kluyver
|
r10101 | |||
Thomas Kluyver
|
r10107 | # These define the transformations for the different escape characters. | ||
def _tr_system(line_info): | ||||
"Translate lines escaped with: !" | ||||
cmd = line_info.line.lstrip().lstrip(ESC_SHELL) | ||||
return '%sget_ipython().system(%r)' % (line_info.pre, cmd) | ||||
def _tr_system2(line_info): | ||||
"Translate lines escaped with: !!" | ||||
cmd = line_info.line.lstrip()[2:] | ||||
return '%sget_ipython().getoutput(%r)' % (line_info.pre, cmd) | ||||
def _tr_help(line_info): | ||||
"Translate lines escaped with: ?/??" | ||||
# A naked help line should just fire the intro help screen | ||||
if not line_info.line[1:]: | ||||
return 'get_ipython().show_usage()' | ||||
return _make_help_call(line_info.ifun, line_info.esc, line_info.pre) | ||||
def _tr_magic(line_info): | ||||
"Translate lines escaped with: %" | ||||
tpl = '%sget_ipython().magic(%r)' | ||||
cmd = ' '.join([line_info.ifun, line_info.the_rest]).strip() | ||||
return tpl % (line_info.pre, cmd) | ||||
def _tr_quote(line_info): | ||||
"Translate lines escaped with: ," | ||||
return '%s%s("%s")' % (line_info.pre, line_info.ifun, | ||||
'", "'.join(line_info.the_rest.split()) ) | ||||
def _tr_quote2(line_info): | ||||
"Translate lines escaped with: ;" | ||||
return '%s%s("%s")' % (line_info.pre, line_info.ifun, | ||||
line_info.the_rest) | ||||
def _tr_paren(line_info): | ||||
"Translate lines escaped with: /" | ||||
return '%s%s(%s)' % (line_info.pre, line_info.ifun, | ||||
", ".join(line_info.the_rest.split())) | ||||
tr = { ESC_SHELL : _tr_system, | ||||
ESC_SH_CAP : _tr_system2, | ||||
ESC_HELP : _tr_help, | ||||
ESC_HELP2 : _tr_help, | ||||
ESC_MAGIC : _tr_magic, | ||||
ESC_QUOTE : _tr_quote, | ||||
ESC_QUOTE2 : _tr_quote2, | ||||
ESC_PAREN : _tr_paren } | ||||
@StatelessInputTransformer.wrap | ||||
def escaped_commands(line): | ||||
"""Transform escaped commands - %magic, !system, ?help + various autocalls. | ||||
Thomas Kluyver
|
r10101 | """ | ||
Thomas Kluyver
|
r10107 | if not line or line.isspace(): | ||
return line | ||||
lineinf = LineInfo(line) | ||||
if lineinf.esc not in tr: | ||||
return line | ||||
Thomas Kluyver
|
r10093 | |||
Thomas Kluyver
|
r10107 | return tr[lineinf.esc](lineinf) | ||
Thomas Kluyver
|
r10090 | |||
_initial_space_re = re.compile(r'\s*') | ||||
_help_end_re = re.compile(r"""(%{0,2} | ||||
[a-zA-Z_*][\w*]* # Variable name | ||||
(\.[a-zA-Z_*][\w*]*)* # .etc.etc | ||||
) | ||||
(\?\??)$ # ? or ??""", | ||||
re.VERBOSE) | ||||
Thomas Kluyver
|
r10093 | def has_comment(src): | ||
"""Indicate whether an input line has (i.e. ends in, or is) a comment. | ||||
This uses tokenize, so it can distinguish comments from # inside strings. | ||||
Parameters | ||||
---------- | ||||
src : string | ||||
A single line input string. | ||||
Returns | ||||
------- | ||||
Thomas Kluyver
|
r10109 | comment : bool | ||
True if source has a comment. | ||||
Thomas Kluyver
|
r10093 | """ | ||
readline = StringIO(src).readline | ||||
toktypes = set() | ||||
try: | ||||
Thomas Kluyver
|
r10110 | for t in generate_tokens(readline): | ||
Thomas Kluyver
|
r10093 | toktypes.add(t[0]) | ||
Thomas Kluyver
|
r10110 | except TokenError: | ||
Thomas Kluyver
|
r10093 | pass | ||
Thomas Kluyver
|
r10110 | return(tokenize2.COMMENT in toktypes) | ||
Thomas Kluyver
|
r10093 | |||
Thomas Kluyver
|
r10100 | |||
Thomas Kluyver
|
r10102 | @StatelessInputTransformer.wrap | ||
Thomas Kluyver
|
r10093 | def help_end(line): | ||
Thomas Kluyver
|
r10090 | """Translate lines with ?/?? at the end""" | ||
m = _help_end_re.search(line) | ||||
if m is None or has_comment(line): | ||||
return line | ||||
target = m.group(1) | ||||
esc = m.group(3) | ||||
lspace = _initial_space_re.match(line).group(0) | ||||
# If we're mid-command, put it back on the next prompt for the user. | ||||
next_input = line.rstrip('?') if line.strip() != m.group(0) else None | ||||
return _make_help_call(target, esc, lspace, next_input) | ||||
Thomas Kluyver
|
r10100 | |||
Thomas Kluyver
|
r10102 | @CoroutineInputTransformer.wrap | ||
Thomas Kluyver
|
r10252 | def cellmagic(end_on_blank_line=False): | ||
Thomas Kluyver
|
r10101 | """Captures & transforms cell magics. | ||
After a cell magic is started, this stores up any lines it gets until it is | ||||
reset (sent None). | ||||
""" | ||||
Thomas Kluyver
|
r10090 | tpl = 'get_ipython().run_cell_magic(%r, %r, %r)' | ||
Thomas Kluyver
|
r10097 | cellmagic_help_re = re.compile('%%\w+\?') | ||
Thomas Kluyver
|
r10090 | line = '' | ||
while True: | ||||
line = (yield line) | ||||
Thomas Kluyver
|
r10091 | if (not line) or (not line.startswith(ESC_MAGIC2)): | ||
Thomas Kluyver
|
r10090 | continue | ||
Thomas Kluyver
|
r10097 | if cellmagic_help_re.match(line): | ||
# This case will be handled by help_end | ||||
continue | ||||
Thomas Kluyver
|
r10090 | first = line | ||
body = [] | ||||
line = (yield None) | ||||
Thomas Kluyver
|
r10252 | while (line is not None) and \ | ||
((line.strip() != '') or not end_on_blank_line): | ||||
Thomas Kluyver
|
r10090 | body.append(line) | ||
line = (yield None) | ||||
# Output | ||||
magic_name, _, first = first.partition(' ') | ||||
magic_name = magic_name.lstrip(ESC_MAGIC2) | ||||
Thomas Kluyver
|
r10097 | line = tpl % (magic_name, first, u'\n'.join(body)) | ||
Thomas Kluyver
|
r10091 | |||
Thomas Kluyver
|
r10100 | |||
Thomas Kluyver
|
r10652 | def _strip_prompts(prompt_re): | ||
Thomas Kluyver
|
r10091 | """Remove matching input prompts from a block of input.""" | ||
line = '' | ||||
while True: | ||||
line = (yield line) | ||||
Thomas Kluyver
|
r10652 | # First line of cell | ||
Thomas Kluyver
|
r10091 | if line is None: | ||
continue | ||||
Thomas Kluyver
|
r10652 | out, n1 = prompt_re.subn('', line, count=1) | ||
line = (yield out) | ||||
# Second line of cell, because people often copy from just after the | ||||
# first prompt, so we might not see it in the first line. | ||||
if line is None: | ||||
continue | ||||
out, n2 = prompt_re.subn('', line, count=1) | ||||
line = (yield out) | ||||
if n1 or n2: | ||||
# Found the input prompt in the first two lines - check for it in | ||||
# the rest of the cell as well. | ||||
while line is not None: | ||||
line = (yield prompt_re.sub('', line, count=1)) | ||||
Thomas Kluyver
|
r10091 | |||
else: | ||||
# Prompts not in input - wait for reset | ||||
while line is not None: | ||||
line = (yield line) | ||||
Thomas Kluyver
|
r10102 | @CoroutineInputTransformer.wrap | ||
Thomas Kluyver
|
r10091 | def classic_prompt(): | ||
Thomas Kluyver
|
r10101 | """Strip the >>>/... prompts of the Python interactive shell.""" | ||
Thomas Kluyver
|
r11056 | prompt_re = re.compile(r'^(>>> ?|^\.\.\. ?)') | ||
Thomas Kluyver
|
r10652 | return _strip_prompts(prompt_re) | ||
Thomas Kluyver
|
r10091 | |||
Thomas Kluyver
|
r10102 | @CoroutineInputTransformer.wrap | ||
Thomas Kluyver
|
r10091 | def ipy_prompt(): | ||
Thomas Kluyver
|
r10101 | """Strip IPython's In [1]:/...: prompts.""" | ||
Thomas Kluyver
|
r10652 | prompt_re = re.compile(r'^(In \[\d+\]: |^\ \ \ \.\.\.+: )') | ||
return _strip_prompts(prompt_re) | ||||
Thomas Kluyver
|
r10091 | |||
Thomas Kluyver
|
r10100 | |||
Thomas Kluyver
|
r10102 | @CoroutineInputTransformer.wrap | ||
Thomas Kluyver
|
r10091 | def leading_indent(): | ||
Thomas Kluyver
|
r10101 | """Remove leading indentation. | ||
If the first line starts with a spaces or tabs, the same whitespace will be | ||||
removed from each following line until it is reset. | ||||
""" | ||||
Thomas Kluyver
|
r10091 | space_re = re.compile(r'^[ \t]+') | ||
line = '' | ||||
while True: | ||||
line = (yield line) | ||||
if line is None: | ||||
continue | ||||
m = space_re.match(line) | ||||
if m: | ||||
space = m.group(0) | ||||
while line is not None: | ||||
if line.startswith(space): | ||||
line = line[len(space):] | ||||
line = (yield line) | ||||
else: | ||||
# No leading spaces - wait for reset | ||||
while line is not None: | ||||
line = (yield line) | ||||
Thomas Kluyver
|
r10100 | |||
Thomas Kluyver
|
r10107 | assign_system_re = re.compile(r'(?P<lhs>(\s*)([\w\.]+)((\s*,\s*[\w\.]+)*))' | ||
r'\s*=\s*!\s*(?P<cmd>.*)') | ||||
assign_system_template = '%s = get_ipython().getoutput(%r)' | ||||
@StatelessInputTransformer.wrap | ||||
def assign_from_system(line): | ||||
Thomas Kluyver
|
r10101 | """Transform assignment from system commands (e.g. files = !ls)""" | ||
Thomas Kluyver
|
r10107 | m = assign_system_re.match(line) | ||
if m is None: | ||||
return line | ||||
return assign_system_template % m.group('lhs', 'cmd') | ||||
Thomas Kluyver
|
r10092 | |||
Thomas Kluyver
|
r10107 | assign_magic_re = re.compile(r'(?P<lhs>(\s*)([\w\.]+)((\s*,\s*[\w\.]+)*))' | ||
r'\s*=\s*%\s*(?P<cmd>.*)') | ||||
assign_magic_template = '%s = get_ipython().magic(%r)' | ||||
@StatelessInputTransformer.wrap | ||||
def assign_from_magic(line): | ||||
Thomas Kluyver
|
r10101 | """Transform assignment from magic commands (e.g. a = %who_ls)""" | ||
Thomas Kluyver
|
r10107 | m = assign_magic_re.match(line) | ||
if m is None: | ||||
return line | ||||
return assign_magic_template % m.group('lhs', 'cmd') | ||||