""" Async helper function that are invalid syntax on Python 3.5 and below. Known limitation and possible improvement. Top level code that contain a return statement (instead of, or in addition to await) will be detected as requiring being wrapped in async calls. This should be prevented as early return will not work. """ import ast import sys import inspect from textwrap import dedent, indent from types import CodeType def _asyncio_runner(coro): """ Handler for asyncio autoawait """ import asyncio return asyncio.get_event_loop().run_until_complete(coro) def _curio_runner(coroutine): """ handler for curio autoawait """ import curio return curio.run(coroutine) def _trio_runner(async_fn): import trio async def loc(coro): """ We need the dummy no-op async def to protect from trio's internal. See https://github.com/python-trio/trio/issues/89 """ return await coro return trio.run(loc, async_fn) def _pseudo_sync_runner(coro): """ A runner that does not really allow async execution, and just advance the coroutine. See discussion in https://github.com/python-trio/trio/issues/608, Credit to Nathaniel Smith """ try: coro.send(None) except StopIteration as exc: return exc.value else: # TODO: do not raise but return an execution result with the right info. raise RuntimeError("{coro_name!r} needs a real async loop".format(coro_name=coro.__name__)) def _asyncify(code: str) -> str: """wrap code in async def definition. And setup a bit of context to run it later. """ res = dedent( """ async def __wrapper__(): try: {usercode} finally: locals() """ ).format(usercode=indent(code, " " * 8)[8:]) return res class _AsyncSyntaxErrorVisitor(ast.NodeVisitor): """ Find syntax errors that would be an error in an async repl, but because the implementation involves wrapping the repl in an async function, it is erroneously allowed (e.g. yield or return at the top level) """ def generic_visit(self, node): func_types = (ast.FunctionDef, ast.AsyncFunctionDef) invalid_types = (ast.Return, ast.Yield, ast.YieldFrom) if isinstance(node, func_types): return # Don't recurse into functions elif isinstance(node, invalid_types): raise SyntaxError() else: super().generic_visit(node) def _async_parse_cell(cell: str) -> ast.AST: """ This is a compatibility shim for pre-3.7 when async outside of a function is a syntax error at the parse stage. It will return an abstract syntax tree parsed as if async and await outside of a function were not a syntax error. """ if sys.version_info < (3, 7): # Prior to 3.7 you need to asyncify before parse wrapped_parse_tree = ast.parse(_asyncify(cell)) return wrapped_parse_tree.body[0].body[0] else: return ast.parse(cell) def _should_be_async(cell: str) -> bool: """Detect if a block of code need to be wrapped in an `async def` Attempt to parse the block of code, it it compile we're fine. Otherwise we wrap if and try to compile. If it works, assume it should be async. Otherwise Return False. Not handled yet: If the block of code has a return statement as the top level, it will be seen as async. This is a know limitation. """ try: # we can't limit ourself to ast.parse, as it __accepts__ to parse on # 3.7+, but just does not _compile_ compile(cell, "<>", "exec") return False except SyntaxError: try: parse_tree = _async_parse_cell(cell) # Raise a SyntaxError if there are top-level return or yields v = _AsyncSyntaxErrorVisitor() v.visit(parse_tree) except SyntaxError: return False return True return False