##// END OF EJS Templates
remove unnecessarily specific extract_module_locals_above
remove unnecessarily specific extract_module_locals_above

File last commit:

r6569:940b5fbe
r6570:d29d623a
Show More
test_embed_kernel.py
153 lines | 4.4 KiB | text/x-python | PythonLexer
/ IPython / zmq / tests / test_embed_kernel.py
"""test IPython.embed_kernel()"""
#-------------------------------------------------------------------------------
# Copyright (C) 2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
import time
from subprocess import Popen, PIPE
import nose.tools as nt
from IPython.zmq.blockingkernelmanager import BlockingKernelManager
from IPython.utils import path
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
env = dict(IPYTHONDIR=IPYTHONDIR)
save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
def _launch_kernel(cmd):
"""start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
kernel, kernel_manager: Popen instance and connected KernelManager
"""
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
connection_file = os.path.join(IPYTHONDIR,
'profile_default',
'security',
'kernel-%i.json' % kernel.pid
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
time.sleep(0.1)
if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
if kernel.poll() is not None:
raise IOError("Kernel failed to start")
km = BlockingKernelManager(connection_file=connection_file)
km.load_connection_file()
km.start_channels()
return kernel, km
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
kernel, km = _launch_kernel(cmd)
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
msg_id = shell.execute("c=a*2")
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_equals(content['status'], u'ok')
# oinfo c (should be 10)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'10')
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
kernel, km = _launch_kernel(cmd)
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'5')
# oinfo b (str)
msg_id = shell.object_info('b')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'hi there')
# oinfo c (undefined)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_false(content['found'])