|
|
"""test IPython.embed_kernel()"""
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2012 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
import os
|
|
|
import shutil
|
|
|
import sys
|
|
|
import tempfile
|
|
|
import time
|
|
|
|
|
|
from contextlib import contextmanager
|
|
|
from subprocess import Popen, PIPE
|
|
|
|
|
|
import nose.tools as nt
|
|
|
|
|
|
from IPython.kernel import BlockingKernelClient
|
|
|
from IPython.utils import path, py3compat
|
|
|
from IPython.utils.py3compat import unicode_type
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Tests
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
SETUP_TIMEOUT = 60
|
|
|
TIMEOUT = 15
|
|
|
|
|
|
def setup():
|
|
|
"""setup temporary IPYTHONDIR for tests"""
|
|
|
global IPYTHONDIR
|
|
|
global env
|
|
|
global save_get_ipython_dir
|
|
|
|
|
|
IPYTHONDIR = tempfile.mkdtemp()
|
|
|
|
|
|
env = os.environ.copy()
|
|
|
env["IPYTHONDIR"] = IPYTHONDIR
|
|
|
|
|
|
save_get_ipython_dir = path.get_ipython_dir
|
|
|
path.get_ipython_dir = lambda : IPYTHONDIR
|
|
|
|
|
|
|
|
|
def teardown():
|
|
|
path.get_ipython_dir = save_get_ipython_dir
|
|
|
|
|
|
try:
|
|
|
shutil.rmtree(IPYTHONDIR)
|
|
|
except (OSError, IOError):
|
|
|
# no such file
|
|
|
pass
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
def setup_kernel(cmd):
|
|
|
"""start an embedded kernel in a subprocess, and wait for it to be ready
|
|
|
|
|
|
Returns
|
|
|
-------
|
|
|
kernel_manager: connected KernelManager instance
|
|
|
"""
|
|
|
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
|
|
|
connection_file = os.path.join(IPYTHONDIR,
|
|
|
'profile_default',
|
|
|
'security',
|
|
|
'kernel-%i.json' % kernel.pid
|
|
|
)
|
|
|
# wait for connection file to exist, timeout after 5s
|
|
|
tic = time.time()
|
|
|
while not os.path.exists(connection_file) \
|
|
|
and kernel.poll() is None \
|
|
|
and time.time() < tic + SETUP_TIMEOUT:
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
if kernel.poll() is not None:
|
|
|
o,e = kernel.communicate()
|
|
|
e = py3compat.cast_unicode(e)
|
|
|
raise IOError("Kernel failed to start:\n%s" % e)
|
|
|
|
|
|
if not os.path.exists(connection_file):
|
|
|
if kernel.poll() is None:
|
|
|
kernel.terminate()
|
|
|
raise IOError("Connection file %r never arrived" % connection_file)
|
|
|
|
|
|
client = BlockingKernelClient(connection_file=connection_file)
|
|
|
client.load_connection_file()
|
|
|
client.start_channels()
|
|
|
|
|
|
try:
|
|
|
yield client
|
|
|
finally:
|
|
|
client.stop_channels()
|
|
|
kernel.terminate()
|
|
|
|
|
|
def test_embed_kernel_basic():
|
|
|
"""IPython.embed_kernel() is basically functional"""
|
|
|
cmd = '\n'.join([
|
|
|
'from IPython import embed_kernel',
|
|
|
'def go():',
|
|
|
' a=5',
|
|
|
' b="hi there"',
|
|
|
' embed_kernel()',
|
|
|
'go()',
|
|
|
'',
|
|
|
])
|
|
|
|
|
|
with setup_kernel(cmd) as client:
|
|
|
# oinfo a (int)
|
|
|
msg_id = client.object_info('a')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_true(content['found'])
|
|
|
|
|
|
msg_id = client.execute("c=a*2")
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_equal(content['status'], u'ok')
|
|
|
|
|
|
# oinfo c (should be 10)
|
|
|
msg_id = client.object_info('c')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_true(content['found'])
|
|
|
text = content['data']['text/plain']
|
|
|
nt.assert_in('10', text)
|
|
|
|
|
|
def test_embed_kernel_namespace():
|
|
|
"""IPython.embed_kernel() inherits calling namespace"""
|
|
|
cmd = '\n'.join([
|
|
|
'from IPython import embed_kernel',
|
|
|
'def go():',
|
|
|
' a=5',
|
|
|
' b="hi there"',
|
|
|
' embed_kernel()',
|
|
|
'go()',
|
|
|
'',
|
|
|
])
|
|
|
|
|
|
with setup_kernel(cmd) as client:
|
|
|
# oinfo a (int)
|
|
|
msg_id = client.object_info('a')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_true(content['found'])
|
|
|
text = content['data']['text/plain']
|
|
|
nt.assert_in(u'5', text)
|
|
|
|
|
|
# oinfo b (str)
|
|
|
msg_id = client.object_info('b')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_true(content['found'])
|
|
|
text = content['data']['text/plain']
|
|
|
nt.assert_in(u'hi there', text)
|
|
|
|
|
|
# oinfo c (undefined)
|
|
|
msg_id = client.object_info('c')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_false(content['found'])
|
|
|
|
|
|
def test_embed_kernel_reentrant():
|
|
|
"""IPython.embed_kernel() can be called multiple times"""
|
|
|
cmd = '\n'.join([
|
|
|
'from IPython import embed_kernel',
|
|
|
'count = 0',
|
|
|
'def go():',
|
|
|
' global count',
|
|
|
' embed_kernel()',
|
|
|
' count = count + 1',
|
|
|
'',
|
|
|
'while True:'
|
|
|
' go()',
|
|
|
'',
|
|
|
])
|
|
|
|
|
|
with setup_kernel(cmd) as client:
|
|
|
for i in range(5):
|
|
|
msg_id = client.object_info('count')
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
content = msg['content']
|
|
|
nt.assert_true(content['found'])
|
|
|
text = content['data']['text/plain']
|
|
|
nt.assert_in(unicode_type(i), text)
|
|
|
|
|
|
# exit from embed_kernel
|
|
|
client.execute("get_ipython().exit_now = True")
|
|
|
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
|
|
|
time.sleep(0.2)
|
|
|
|
|
|
|
|
|
|