|
|
"""
|
|
|
Test for async helpers.
|
|
|
|
|
|
Should only trigger on python 3.5+ or will have syntax errors.
|
|
|
"""
|
|
|
|
|
|
import sys
|
|
|
import nose.tools as nt
|
|
|
from textwrap import dedent, indent
|
|
|
from unittest import TestCase
|
|
|
|
|
|
ip = get_ipython()
|
|
|
iprc = lambda x: ip.run_cell(dedent(x))
|
|
|
iprc_err = lambda x: iprc(x).raise_error()
|
|
|
|
|
|
if sys.version_info > (3, 5):
|
|
|
from IPython.core.async_helpers import _should_be_async
|
|
|
|
|
|
class AsyncTest(TestCase):
|
|
|
def test_should_be_async(self):
|
|
|
nt.assert_false(_should_be_async("False"))
|
|
|
nt.assert_true(_should_be_async("await bar()"))
|
|
|
nt.assert_true(_should_be_async("x = await bar()"))
|
|
|
nt.assert_false(
|
|
|
_should_be_async(
|
|
|
dedent(
|
|
|
"""
|
|
|
async def awaitable():
|
|
|
pass
|
|
|
"""
|
|
|
)
|
|
|
)
|
|
|
)
|
|
|
|
|
|
def _get_top_level_cases(self):
|
|
|
# These are test cases that should be valid in a function
|
|
|
# but invalid outside of a function.
|
|
|
test_cases = []
|
|
|
test_cases.append(('basic', "{val}"))
|
|
|
|
|
|
# Note, in all conditional cases, I use True instead of
|
|
|
# False so that the peephole optimizer won't optimize away
|
|
|
# the return, so CPython will see this as a syntax error:
|
|
|
#
|
|
|
# while True:
|
|
|
# break
|
|
|
# return
|
|
|
#
|
|
|
# But not this:
|
|
|
#
|
|
|
# while False:
|
|
|
# return
|
|
|
#
|
|
|
# See https://bugs.python.org/issue1875
|
|
|
|
|
|
test_cases.append(('if', dedent("""
|
|
|
if True:
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('while', dedent("""
|
|
|
while True:
|
|
|
{val}
|
|
|
break
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('try', dedent("""
|
|
|
try:
|
|
|
{val}
|
|
|
except:
|
|
|
pass
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('except', dedent("""
|
|
|
try:
|
|
|
pass
|
|
|
except:
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('finally', dedent("""
|
|
|
try:
|
|
|
pass
|
|
|
except:
|
|
|
pass
|
|
|
finally:
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('for', dedent("""
|
|
|
for _ in range(4):
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
|
|
|
test_cases.append(('nested', dedent("""
|
|
|
if True:
|
|
|
while True:
|
|
|
{val}
|
|
|
break
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('deep-nested', dedent("""
|
|
|
if True:
|
|
|
while True:
|
|
|
break
|
|
|
for x in range(3):
|
|
|
if True:
|
|
|
while True:
|
|
|
for x in range(3):
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
return test_cases
|
|
|
|
|
|
def _get_ry_syntax_errors(self):
|
|
|
# This is a mix of tests that should be a syntax error if
|
|
|
# return or yield whether or not they are in a function
|
|
|
|
|
|
test_cases = []
|
|
|
|
|
|
test_cases.append(('class', dedent("""
|
|
|
class V:
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
test_cases.append(('nested-class', dedent("""
|
|
|
class V:
|
|
|
class C:
|
|
|
{val}
|
|
|
""")))
|
|
|
|
|
|
return test_cases
|
|
|
|
|
|
|
|
|
def test_top_level_return_error(self):
|
|
|
tl_err_test_cases = self._get_top_level_cases()
|
|
|
tl_err_test_cases.extend(self._get_ry_syntax_errors())
|
|
|
|
|
|
vals = ('return', 'yield', 'yield from (_ for _ in range(3))')
|
|
|
|
|
|
for test_name, test_case in tl_err_test_cases:
|
|
|
# This example should work if 'pass' is used as the value
|
|
|
with self.subTest((test_name, 'pass')):
|
|
|
iprc_err(test_case.format(val='pass'))
|
|
|
|
|
|
# It should fail with all the values
|
|
|
for val in vals:
|
|
|
with self.subTest((test_name, val)):
|
|
|
msg = "Syntax error not raised for %s, %s" % (test_name, val)
|
|
|
with self.assertRaises(SyntaxError, msg=msg):
|
|
|
iprc_err(test_case.format(val=val))
|
|
|
|
|
|
def test_in_func_no_error(self):
|
|
|
# Test that the implementation of top-level return/yield
|
|
|
# detection isn't *too* aggressive, and works inside a function
|
|
|
func_contexts = []
|
|
|
|
|
|
func_contexts.append(('func', dedent("""
|
|
|
def f():""")))
|
|
|
|
|
|
func_contexts.append(('method', dedent("""
|
|
|
class MyClass:
|
|
|
def __init__(self):
|
|
|
""")))
|
|
|
|
|
|
func_contexts.append(('async-func', dedent("""
|
|
|
async def f():""")))
|
|
|
|
|
|
func_contexts.append(('closure', dedent("""
|
|
|
def f():
|
|
|
def g():
|
|
|
""")))
|
|
|
|
|
|
def nest_case(context, case):
|
|
|
# Detect indentation
|
|
|
lines = context.strip().splitlines()
|
|
|
prefix_len = 0
|
|
|
for c in lines[-1]:
|
|
|
if c != ' ':
|
|
|
break
|
|
|
prefix_len += 1
|
|
|
|
|
|
indented_case = indent(case, ' ' * (prefix_len + 4))
|
|
|
return context + '\n' + indented_case
|
|
|
|
|
|
# Gather and run the tests
|
|
|
vals = ('return', 'yield')
|
|
|
|
|
|
success_tests = self._get_top_level_cases()
|
|
|
failure_tests = self._get_ry_syntax_errors()
|
|
|
|
|
|
for context_name, context in func_contexts:
|
|
|
# These tests should now successfully run
|
|
|
for test_name, test_case in success_tests:
|
|
|
nested_case = nest_case(context, test_case)
|
|
|
|
|
|
for val in vals:
|
|
|
with self.subTest((test_name, context_name, val)):
|
|
|
iprc_err(nested_case.format(val=val))
|
|
|
|
|
|
# These tests should still raise a SyntaxError
|
|
|
for test_name, test_case in failure_tests:
|
|
|
nested_case = nest_case(context, test_case)
|
|
|
|
|
|
for val in vals:
|
|
|
with self.subTest((test_name, context_name, val)):
|
|
|
with self.assertRaises(SyntaxError):
|
|
|
iprc_err(nested_case.format(val=val))
|
|
|
|
|
|
|
|
|
def test_execute(self):
|
|
|
iprc(
|
|
|
"""
|
|
|
import asyncio
|
|
|
await asyncio.sleep(0.001)
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
def test_autoawait(self):
|
|
|
ip.run_cell("%autoawait False")
|
|
|
ip.run_cell("%autoawait True")
|
|
|
iprc(
|
|
|
"""
|
|
|
from asyncio import sleep
|
|
|
await.sleep(0.1)
|
|
|
"""
|
|
|
)
|
|
|
|
|
|
def test_autoawait_curio(self):
|
|
|
ip.run_cell("%autoawait curio")
|
|
|
|
|
|
def test_autoawait_trio(self):
|
|
|
ip.run_cell("%autoawait trio")
|
|
|
|
|
|
def tearDown(self):
|
|
|
ip.loop_runner = "asyncio"
|
|
|
|