test_async_helpers.py
277 lines
| 8.4 KiB
| text/x-python
|
PythonLexer
Matthias Bussonnier
|
r24463 | """ | ||
Test for async helpers. | ||||
Should only trigger on python 3.5+ or will have syntax errors. | ||||
""" | ||||
import sys | ||||
Paul Ganssle
|
r24489 | from itertools import chain, repeat | ||
Matthias Bussonnier
|
r24463 | import nose.tools as nt | ||
Paul Ganssle
|
r24486 | from textwrap import dedent, indent | ||
Matthias Bussonnier
|
r24463 | from unittest import TestCase | ||
Matthias Bussonnier
|
r24491 | from IPython.testing.decorators import skip_without | ||
Matthias Bussonnier
|
r24463 | |||
ip = get_ipython() | ||||
Paul Ganssle
|
r24488 | iprc = lambda x: ip.run_cell(dedent(x)).raise_error() | ||
Matthias Bussonnier
|
r24496 | iprc_nr = lambda x: ip.run_cell(dedent(x)) | ||
Matthias Bussonnier
|
r24463 | |||
Matthias Bussonnier
|
r24478 | if sys.version_info > (3, 5): | ||
Matthias Bussonnier
|
r24463 | from IPython.core.async_helpers import _should_be_async | ||
class AsyncTest(TestCase): | ||||
def test_should_be_async(self): | ||||
nt.assert_false(_should_be_async("False")) | ||||
nt.assert_true(_should_be_async("await bar()")) | ||||
nt.assert_true(_should_be_async("x = await bar()")) | ||||
Matthias Bussonnier
|
r24478 | nt.assert_false( | ||
_should_be_async( | ||||
dedent( | ||||
""" | ||||
Matthias Bussonnier
|
r24463 | async def awaitable(): | ||
pass | ||||
Matthias Bussonnier
|
r24478 | """ | ||
) | ||||
) | ||||
) | ||||
Matthias Bussonnier
|
r24463 | |||
Paul Ganssle
|
r24486 | def _get_top_level_cases(self): | ||
# These are test cases that should be valid in a function | ||||
# but invalid outside of a function. | ||||
test_cases = [] | ||||
test_cases.append(('basic', "{val}")) | ||||
# Note, in all conditional cases, I use True instead of | ||||
# False so that the peephole optimizer won't optimize away | ||||
# the return, so CPython will see this as a syntax error: | ||||
# | ||||
# while True: | ||||
# break | ||||
# return | ||||
# | ||||
# But not this: | ||||
# | ||||
# while False: | ||||
# return | ||||
# | ||||
# See https://bugs.python.org/issue1875 | ||||
test_cases.append(('if', dedent(""" | ||||
if True: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('while', dedent(""" | ||||
while True: | ||||
{val} | ||||
break | ||||
"""))) | ||||
test_cases.append(('try', dedent(""" | ||||
try: | ||||
{val} | ||||
except: | ||||
pass | ||||
"""))) | ||||
test_cases.append(('except', dedent(""" | ||||
try: | ||||
pass | ||||
except: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('finally', dedent(""" | ||||
try: | ||||
pass | ||||
except: | ||||
pass | ||||
finally: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('for', dedent(""" | ||||
for _ in range(4): | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('nested', dedent(""" | ||||
if True: | ||||
while True: | ||||
{val} | ||||
break | ||||
"""))) | ||||
test_cases.append(('deep-nested', dedent(""" | ||||
if True: | ||||
while True: | ||||
break | ||||
for x in range(3): | ||||
if True: | ||||
while True: | ||||
for x in range(3): | ||||
{val} | ||||
"""))) | ||||
return test_cases | ||||
def _get_ry_syntax_errors(self): | ||||
# This is a mix of tests that should be a syntax error if | ||||
# return or yield whether or not they are in a function | ||||
test_cases = [] | ||||
test_cases.append(('class', dedent(""" | ||||
class V: | ||||
{val} | ||||
"""))) | ||||
test_cases.append(('nested-class', dedent(""" | ||||
class V: | ||||
class C: | ||||
{val} | ||||
"""))) | ||||
return test_cases | ||||
def test_top_level_return_error(self): | ||||
tl_err_test_cases = self._get_top_level_cases() | ||||
tl_err_test_cases.extend(self._get_ry_syntax_errors()) | ||||
vals = ('return', 'yield', 'yield from (_ for _ in range(3))') | ||||
for test_name, test_case in tl_err_test_cases: | ||||
# This example should work if 'pass' is used as the value | ||||
with self.subTest((test_name, 'pass')): | ||||
Paul Ganssle
|
r24488 | iprc(test_case.format(val='pass')) | ||
Paul Ganssle
|
r24486 | |||
# It should fail with all the values | ||||
for val in vals: | ||||
with self.subTest((test_name, val)): | ||||
msg = "Syntax error not raised for %s, %s" % (test_name, val) | ||||
with self.assertRaises(SyntaxError, msg=msg): | ||||
Paul Ganssle
|
r24488 | iprc(test_case.format(val=val)) | ||
Paul Ganssle
|
r24486 | |||
def test_in_func_no_error(self): | ||||
# Test that the implementation of top-level return/yield | ||||
# detection isn't *too* aggressive, and works inside a function | ||||
func_contexts = [] | ||||
Paul Ganssle
|
r24489 | func_contexts.append(('func', False, dedent(""" | ||
Paul Ganssle
|
r24486 | def f():"""))) | ||
Paul Ganssle
|
r24489 | func_contexts.append(('method', False, dedent(""" | ||
Paul Ganssle
|
r24486 | class MyClass: | ||
def __init__(self): | ||||
"""))) | ||||
Paul Ganssle
|
r24489 | func_contexts.append(('async-func', True, dedent(""" | ||
Paul Ganssle
|
r24486 | async def f():"""))) | ||
Paul Ganssle
|
r24489 | func_contexts.append(('async-method', True, dedent(""" | ||
class MyClass: | ||||
async def f(self):"""))) | ||||
func_contexts.append(('closure', False, dedent(""" | ||||
Paul Ganssle
|
r24486 | def f(): | ||
def g(): | ||||
"""))) | ||||
def nest_case(context, case): | ||||
# Detect indentation | ||||
lines = context.strip().splitlines() | ||||
prefix_len = 0 | ||||
for c in lines[-1]: | ||||
if c != ' ': | ||||
break | ||||
prefix_len += 1 | ||||
indented_case = indent(case, ' ' * (prefix_len + 4)) | ||||
return context + '\n' + indented_case | ||||
# Gather and run the tests | ||||
Paul Ganssle
|
r24489 | # yield is allowed in async functions, starting in Python 3.6, | ||
# and yield from is not allowed in any version | ||||
vals = ('return', 'yield', 'yield from (_ for _ in range(3))') | ||||
async_safe = (True, | ||||
sys.version_info >= (3, 6), | ||||
False) | ||||
vals = tuple(zip(vals, async_safe)) | ||||
Paul Ganssle
|
r24486 | |||
Paul Ganssle
|
r24489 | success_tests = zip(self._get_top_level_cases(), repeat(False)) | ||
failure_tests = zip(self._get_ry_syntax_errors(), repeat(True)) | ||||
Paul Ganssle
|
r24486 | |||
Paul Ganssle
|
r24489 | tests = chain(success_tests, failure_tests) | ||
Paul Ganssle
|
r24486 | |||
Paul Ganssle
|
r24489 | for context_name, async_func, context in func_contexts: | ||
for (test_name, test_case), should_fail in tests: | ||||
Paul Ganssle
|
r24486 | nested_case = nest_case(context, test_case) | ||
Paul Ganssle
|
r24489 | for val, async_safe in vals: | ||
val_should_fail = (should_fail or | ||||
(async_func and not async_safe)) | ||||
test_id = (context_name, test_name, val) | ||||
cell = nested_case.format(val=val) | ||||
with self.subTest(test_id): | ||||
if val_should_fail: | ||||
msg = ("SyntaxError not raised for %s" % | ||||
str(test_id)) | ||||
with self.assertRaises(SyntaxError, msg=msg): | ||||
iprc(cell) | ||||
print(cell) | ||||
else: | ||||
iprc(cell) | ||||
Paul Ganssle
|
r24486 | |||
Matthias Bussonnier
|
r24463 | def test_execute(self): | ||
Paul Ganssle
|
r24488 | iprc(""" | ||
Matthias Bussonnier
|
r24463 | import asyncio | ||
await asyncio.sleep(0.001) | ||||
Matthias Bussonnier
|
r24478 | """ | ||
) | ||||
Matthias Bussonnier
|
r24463 | |||
def test_autoawait(self): | ||||
Paul Ganssle
|
r24488 | iprc("%autoawait False") | ||
iprc("%autoawait True") | ||||
iprc(""" | ||||
from asyncio import sleep | ||||
await sleep(0.1) | ||||
Matthias Bussonnier
|
r24478 | """ | ||
) | ||||
Matthias Bussonnier
|
r24463 | |||
Matthias Bussonnier
|
r24491 | @skip_without('curio') | ||
Matthias Bussonnier
|
r24463 | def test_autoawait_curio(self): | ||
Paul Ganssle
|
r24488 | iprc("%autoawait curio") | ||
Matthias Bussonnier
|
r24463 | |||
Matthias Bussonnier
|
r24491 | @skip_without('trio') | ||
Matthias Bussonnier
|
r24463 | def test_autoawait_trio(self): | ||
Paul Ganssle
|
r24488 | iprc("%autoawait trio") | ||
Matthias Bussonnier
|
r24463 | |||
Matthias Bussonnier
|
r24496 | @skip_without('trio') | ||
def test_autoawait_trio_wrong_sleep(self): | ||||
iprc("%autoawait trio") | ||||
res = iprc_nr(""" | ||||
import asyncio | ||||
await asyncio.sleep(0) | ||||
""") | ||||
with nt.assert_raises(TypeError): | ||||
res.raise_error() | ||||
@skip_without('trio') | ||||
def test_autoawait_asyncio_wrong_sleep(self): | ||||
iprc("%autoawait asyncio") | ||||
res = iprc_nr(""" | ||||
import trio | ||||
await trio.sleep(0) | ||||
""") | ||||
with nt.assert_raises(RuntimeError): | ||||
res.raise_error() | ||||
Matthias Bussonnier
|
r24463 | def tearDown(self): | ||
Matthias Bussonnier
|
r24478 | ip.loop_runner = "asyncio" | ||