test_embed_kernel.py
198 lines
| 5.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r6569 | """test IPython.embed_kernel()""" | ||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2012 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import os | ||||
import shutil | ||||
import sys | ||||
import tempfile | ||||
import time | ||||
MinRK
|
r6633 | from contextlib import contextmanager | ||
MinRK
|
r6569 | from subprocess import Popen, PIPE | ||
import nose.tools as nt | ||||
MinRK
|
r10291 | from IPython.kernel import BlockingKernelClient | ||
MinRK
|
r6904 | from IPython.utils import path, py3compat | ||
Thomas Kluyver
|
r13353 | from IPython.utils.py3compat import unicode_type | ||
MinRK
|
r6569 | |||
#------------------------------------------------------------------------------- | ||||
# Tests | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r11802 | SETUP_TIMEOUT = 60 | ||
TIMEOUT = 15 | ||||
MinRK
|
r6569 | def setup(): | ||
"""setup temporary IPYTHONDIR for tests""" | ||||
global IPYTHONDIR | ||||
global env | ||||
global save_get_ipython_dir | ||||
IPYTHONDIR = tempfile.mkdtemp() | ||||
Jörgen Stenarson
|
r7119 | |||
Jörgen Stenarson
|
r7120 | env = os.environ.copy() | ||
env["IPYTHONDIR"] = IPYTHONDIR | ||||
Jörgen Stenarson
|
r7119 | |||
MinRK
|
r6569 | save_get_ipython_dir = path.get_ipython_dir | ||
path.get_ipython_dir = lambda : IPYTHONDIR | ||||
def teardown(): | ||||
path.get_ipython_dir = save_get_ipython_dir | ||||
try: | ||||
shutil.rmtree(IPYTHONDIR) | ||||
except (OSError, IOError): | ||||
# no such file | ||||
pass | ||||
MinRK
|
r6633 | @contextmanager | ||
def setup_kernel(cmd): | ||||
MinRK
|
r6569 | """start an embedded kernel in a subprocess, and wait for it to be ready | ||
Returns | ||||
------- | ||||
MinRK
|
r6633 | kernel_manager: connected KernelManager instance | ||
MinRK
|
r6569 | """ | ||
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env) | ||||
connection_file = os.path.join(IPYTHONDIR, | ||||
'profile_default', | ||||
'security', | ||||
'kernel-%i.json' % kernel.pid | ||||
) | ||||
# wait for connection file to exist, timeout after 5s | ||||
tic = time.time() | ||||
MinRK
|
r11802 | while not os.path.exists(connection_file) \ | ||
and kernel.poll() is None \ | ||||
and time.time() < tic + SETUP_TIMEOUT: | ||||
MinRK
|
r6569 | time.sleep(0.1) | ||
MinRK
|
r6904 | if kernel.poll() is not None: | ||
o,e = kernel.communicate() | ||||
e = py3compat.cast_unicode(e) | ||||
raise IOError("Kernel failed to start:\n%s" % e) | ||||
MinRK
|
r6569 | if not os.path.exists(connection_file): | ||
if kernel.poll() is None: | ||||
kernel.terminate() | ||||
raise IOError("Connection file %r never arrived" % connection_file) | ||||
MinRK
|
r10291 | client = BlockingKernelClient(connection_file=connection_file) | ||
client.load_connection_file() | ||||
client.start_channels() | ||||
MinRK
|
r6569 | |||
MinRK
|
r6633 | try: | ||
MinRK
|
r10291 | yield client | ||
MinRK
|
r6633 | finally: | ||
MinRK
|
r10291 | client.stop_channels() | ||
MinRK
|
r6759 | kernel.terminate() | ||
MinRK
|
r6569 | |||
def test_embed_kernel_basic(): | ||||
"""IPython.embed_kernel() is basically functional""" | ||||
cmd = '\n'.join([ | ||||
'from IPython import embed_kernel', | ||||
'def go():', | ||||
' a=5', | ||||
' b="hi there"', | ||||
' embed_kernel()', | ||||
'go()', | ||||
'', | ||||
]) | ||||
MinRK
|
r10291 | with setup_kernel(cmd) as client: | ||
MinRK
|
r6633 | # oinfo a (int) | ||
MinRK
|
r16587 | msg_id = client.inspect('a') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
nt.assert_true(content['found']) | ||||
MinRK
|
r6569 | |||
MinRK
|
r10294 | msg_id = client.execute("c=a*2") | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
Bradley M. Froehle
|
r7875 | nt.assert_equal(content['status'], u'ok') | ||
MinRK
|
r6633 | |||
# oinfo c (should be 10) | ||||
MinRK
|
r16587 | msg_id = client.inspect('c') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
nt.assert_true(content['found']) | ||||
MinRK
|
r16580 | text = content['data']['text/plain'] | ||
nt.assert_in('10', text) | ||||
MinRK
|
r6569 | |||
def test_embed_kernel_namespace(): | ||||
"""IPython.embed_kernel() inherits calling namespace""" | ||||
cmd = '\n'.join([ | ||||
'from IPython import embed_kernel', | ||||
'def go():', | ||||
' a=5', | ||||
' b="hi there"', | ||||
' embed_kernel()', | ||||
'go()', | ||||
'', | ||||
]) | ||||
MinRK
|
r10291 | with setup_kernel(cmd) as client: | ||
MinRK
|
r6633 | # oinfo a (int) | ||
MinRK
|
r16587 | msg_id = client.inspect('a') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
nt.assert_true(content['found']) | ||||
MinRK
|
r16580 | text = content['data']['text/plain'] | ||
nt.assert_in(u'5', text) | ||||
MinRK
|
r6633 | |||
# oinfo b (str) | ||||
MinRK
|
r16587 | msg_id = client.inspect('b') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
nt.assert_true(content['found']) | ||||
MinRK
|
r16580 | text = content['data']['text/plain'] | ||
nt.assert_in(u'hi there', text) | ||||
MinRK
|
r6633 | |||
# oinfo c (undefined) | ||||
MinRK
|
r16587 | msg_id = client.inspect('c') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6633 | content = msg['content'] | ||
nt.assert_false(content['found']) | ||||
MinRK
|
r6569 | |||
MinRK
|
r6830 | def test_embed_kernel_reentrant(): | ||
"""IPython.embed_kernel() can be called multiple times""" | ||||
cmd = '\n'.join([ | ||||
'from IPython import embed_kernel', | ||||
'count = 0', | ||||
'def go():', | ||||
' global count', | ||||
' embed_kernel()', | ||||
' count = count + 1', | ||||
'', | ||||
'while True:' | ||||
' go()', | ||||
'', | ||||
]) | ||||
MinRK
|
r10291 | with setup_kernel(cmd) as client: | ||
MinRK
|
r6830 | for i in range(5): | ||
MinRK
|
r16587 | msg_id = client.inspect('count') | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6830 | content = msg['content'] | ||
nt.assert_true(content['found']) | ||||
MinRK
|
r16580 | text = content['data']['text/plain'] | ||
nt.assert_in(unicode_type(i), text) | ||||
MinRK
|
r6830 | |||
# exit from embed_kernel | ||||
MinRK
|
r10294 | client.execute("get_ipython().exit_now = True") | ||
MinRK
|
r11802 | msg = client.get_shell_msg(block=True, timeout=TIMEOUT) | ||
MinRK
|
r6830 | time.sleep(0.2) | ||