test_async_helpers.py
316 lines
| 8.7 KiB
| text/x-python
|
PythonLexer
Matthias Bussonnier
|
r24463 | """ | ||
Test for async helpers. | ||||
Should only trigger on python 3.5+ or will have syntax errors. | ||||
""" | ||||
Paul Ganssle
|
r24489 | from itertools import chain, repeat | ||
Matthias Bussonnier
|
r24463 | import nose.tools as nt | ||
Paul Ganssle
|
r24486 | from textwrap import dedent, indent | ||
Matthias Bussonnier
|
r24463 | from unittest import TestCase | ||
Matthias Bussonnier
|
r24491 | from IPython.testing.decorators import skip_without | ||
Matthias Bussonnier
|
r25722 | import sys | ||
Matthias Bussonnier
|
r25094 | |||
Paul Ganssle
|
r24488 | iprc = lambda x: ip.run_cell(dedent(x)).raise_error() | ||
Matthias Bussonnier
|
r24496 | iprc_nr = lambda x: ip.run_cell(dedent(x)) | ||
Matthias Bussonnier
|
r24463 | |||
kousik
|
r25246 | from IPython.core.async_helpers import _should_be_async | ||
class AsyncTest(TestCase): | ||||
def test_should_be_async(self): | ||||
nt.assert_false(_should_be_async("False")) | ||||
nt.assert_true(_should_be_async("await bar()")) | ||||
nt.assert_true(_should_be_async("x = await bar()")) | ||||
nt.assert_false( | ||||
_should_be_async( | ||||
dedent( | ||||
""" | ||||
async def awaitable(): | ||||
pass | ||||
""" | ||||
Matthias Bussonnier
|
r24478 | ) | ||
) | ||||
kousik
|
r25246 | ) | ||
def _get_top_level_cases(self): | ||||
# These are test cases that should be valid in a function | ||||
# but invalid outside of a function. | ||||
test_cases = [] | ||||
test_cases.append(('basic', "{val}")) | ||||
# Note, in all conditional cases, I use True instead of | ||||
# False so that the peephole optimizer won't optimize away | ||||
# the return, so CPython will see this as a syntax error: | ||||
# | ||||
# while True: | ||||
# break | ||||
# return | ||||
# | ||||
# But not this: | ||||
# | ||||
# while False: | ||||
# return | ||||
# | ||||
# See https://bugs.python.org/issue1875 | ||||
test_cases.append(('if', dedent(""" | ||||
if True: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('while', dedent(""" | ||||
while True: | ||||
{val} | ||||
break | ||||
"""))) | ||||
test_cases.append(('try', dedent(""" | ||||
try: | ||||
{val} | ||||
except: | ||||
pass | ||||
"""))) | ||||
test_cases.append(('except', dedent(""" | ||||
try: | ||||
pass | ||||
except: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('finally', dedent(""" | ||||
try: | ||||
pass | ||||
except: | ||||
pass | ||||
finally: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('for', dedent(""" | ||||
for _ in range(4): | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('nested', dedent(""" | ||||
if True: | ||||
while True: | ||||
Paul Ganssle
|
r24486 | {val} | ||
kousik
|
r25246 | break | ||
"""))) | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | test_cases.append(('deep-nested', dedent(""" | ||
if True: | ||||
Paul Ganssle
|
r24486 | while True: | ||
break | ||||
kousik
|
r25246 | for x in range(3): | ||
if True: | ||||
while True: | ||||
for x in range(3): | ||||
{val} | ||||
"""))) | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | return test_cases | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | def _get_ry_syntax_errors(self): | ||
# This is a mix of tests that should be a syntax error if | ||||
# return or yield whether or not they are in a function | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | test_cases = [] | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | test_cases.append(('class', dedent(""" | ||
class V: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('nested-class', dedent(""" | ||||
class V: | ||||
class C: | ||||
Paul Ganssle
|
r24486 | {val} | ||
kousik
|
r25246 | """))) | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | return test_cases | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | def test_top_level_return_error(self): | ||
tl_err_test_cases = self._get_top_level_cases() | ||||
tl_err_test_cases.extend(self._get_ry_syntax_errors()) | ||||
vals = ('return', 'yield', 'yield from (_ for _ in range(3))', | ||||
dedent(''' | ||||
def f(): | ||||
pass | ||||
return | ||||
'''), | ||||
) | ||||
for test_name, test_case in tl_err_test_cases: | ||||
# This example should work if 'pass' is used as the value | ||||
with self.subTest((test_name, 'pass')): | ||||
iprc(test_case.format(val='pass')) | ||||
# It should fail with all the values | ||||
for val in vals: | ||||
with self.subTest((test_name, val)): | ||||
msg = "Syntax error not raised for %s, %s" % (test_name, val) | ||||
with self.assertRaises(SyntaxError, msg=msg): | ||||
iprc(test_case.format(val=val)) | ||||
def test_in_func_no_error(self): | ||||
# Test that the implementation of top-level return/yield | ||||
# detection isn't *too* aggressive, and works inside a function | ||||
func_contexts = [] | ||||
func_contexts.append(('func', False, dedent(""" | ||||
def f():"""))) | ||||
func_contexts.append(('method', False, dedent(""" | ||||
class MyClass: | ||||
def __init__(self): | ||||
"""))) | ||||
func_contexts.append(('async-func', True, dedent(""" | ||||
async def f():"""))) | ||||
func_contexts.append(('async-method', True, dedent(""" | ||||
class MyClass: | ||||
async def f(self):"""))) | ||||
func_contexts.append(('closure', False, dedent(""" | ||||
def f(): | ||||
def g(): | ||||
"""))) | ||||
def nest_case(context, case): | ||||
# Detect indentation | ||||
lines = context.strip().splitlines() | ||||
prefix_len = 0 | ||||
for c in lines[-1]: | ||||
if c != ' ': | ||||
Paul Ganssle
|
r24486 | break | ||
kousik
|
r25246 | prefix_len += 1 | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | indented_case = indent(case, ' ' * (prefix_len + 4)) | ||
return context + '\n' + indented_case | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | # Gather and run the tests | ||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | # yield is allowed in async functions, starting in Python 3.6, | ||
# and yield from is not allowed in any version | ||||
vals = ('return', 'yield', 'yield from (_ for _ in range(3))') | ||||
async_safe = (True, | ||||
True, | ||||
False) | ||||
vals = tuple(zip(vals, async_safe)) | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | success_tests = zip(self._get_top_level_cases(), repeat(False)) | ||
failure_tests = zip(self._get_ry_syntax_errors(), repeat(True)) | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | tests = chain(success_tests, failure_tests) | ||
felixzhuologist
|
r24667 | |||
kousik
|
r25246 | for context_name, async_func, context in func_contexts: | ||
for (test_name, test_case), should_fail in tests: | ||||
nested_case = nest_case(context, test_case) | ||||
Paul Ganssle
|
r24486 | |||
kousik
|
r25246 | for val, async_safe in vals: | ||
val_should_fail = (should_fail or | ||||
(async_func and not async_safe)) | ||||
Matthias Bussonnier
|
r24463 | |||
kousik
|
r25246 | test_id = (context_name, test_name, val) | ||
cell = nested_case.format(val=val) | ||||
Matthias Bussonnier
|
r24463 | |||
kousik
|
r25246 | with self.subTest(test_id): | ||
if val_should_fail: | ||||
msg = ("SyntaxError not raised for %s" % | ||||
str(test_id)) | ||||
with self.assertRaises(SyntaxError, msg=msg): | ||||
iprc(cell) | ||||
Matthias Bussonnier
|
r24463 | |||
kousik
|
r25246 | print(cell) | ||
else: | ||||
iprc(cell) | ||||
Matthias Bussonnier
|
r24463 | |||
kousik
|
r25246 | def test_nonlocal(self): | ||
# fails if outer scope is not a function scope or if var not defined | ||||
with self.assertRaises(SyntaxError): | ||||
iprc("nonlocal x") | ||||
iprc(""" | ||||
x = 1 | ||||
def f(): | ||||
nonlocal x | ||||
x = 10000 | ||||
yield x | ||||
Matthias Bussonnier
|
r24496 | """) | ||
kousik
|
r25246 | iprc(""" | ||
def f(): | ||||
def g(): | ||||
nonlocal x | ||||
x = 10000 | ||||
yield x | ||||
Matthias Bussonnier
|
r24496 | """) | ||
kousik
|
r25246 | # works if outer scope is a function scope and var exists | ||
iprc(""" | ||||
def f(): | ||||
x = 20 | ||||
def g(): | ||||
nonlocal x | ||||
x = 10000 | ||||
yield x | ||||
""") | ||||
def test_execute(self): | ||||
iprc(""" | ||||
import asyncio | ||||
await asyncio.sleep(0.001) | ||||
""" | ||||
) | ||||
def test_autoawait(self): | ||||
iprc("%autoawait False") | ||||
iprc("%autoawait True") | ||||
iprc(""" | ||||
from asyncio import sleep | ||||
await sleep(0.1) | ||||
""" | ||||
) | ||||
Matthias Bussonnier
|
r25722 | |||
if sys.version_info < (3,9): | ||||
# new pgen parser in 3.9 does not raise MemoryError on too many nested | ||||
# parens anymore | ||||
def test_memory_error(self): | ||||
with self.assertRaises(MemoryError): | ||||
iprc("(" * 200 + ")" * 200) | ||||
yangyang
|
r25430 | |||
kousik
|
r25246 | @skip_without('curio') | ||
def test_autoawait_curio(self): | ||||
iprc("%autoawait curio") | ||||
@skip_without('trio') | ||||
def test_autoawait_trio(self): | ||||
iprc("%autoawait trio") | ||||
@skip_without('trio') | ||||
def test_autoawait_trio_wrong_sleep(self): | ||||
iprc("%autoawait trio") | ||||
res = iprc_nr(""" | ||||
import asyncio | ||||
await asyncio.sleep(0) | ||||
""") | ||||
with nt.assert_raises(TypeError): | ||||
res.raise_error() | ||||
@skip_without('trio') | ||||
def test_autoawait_asyncio_wrong_sleep(self): | ||||
iprc("%autoawait asyncio") | ||||
res = iprc_nr(""" | ||||
import trio | ||||
await trio.sleep(0) | ||||
""") | ||||
with nt.assert_raises(RuntimeError): | ||||
res.raise_error() | ||||
def tearDown(self): | ||||
ip.loop_runner = "asyncio" | ||||