##// END OF EJS Templates
Updated tests to try flavors
Updated tests to try flavors

File last commit:

r10294:a1e189cc
r11738:c59b47d0
Show More
test_embed_kernel.py
188 lines | 5.6 KiB | text/x-python | PythonLexer
/ IPython / kernel / zmq / tests / test_embed_kernel.py
MinRK
add basic embed_kernel tests
r6569 """test IPython.embed_kernel()"""
#-------------------------------------------------------------------------------
# Copyright (C) 2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
import time
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 from contextlib import contextmanager
MinRK
add basic embed_kernel tests
r6569 from subprocess import Popen, PIPE
import nose.tools as nt
MinRK
update embed_kernel test to Client API
r10291 from IPython.kernel import BlockingKernelClient
MinRK
switch order of failed-to-start errors in test_embed_kernel...
r6904 from IPython.utils import path, py3compat
MinRK
add basic embed_kernel tests
r6569
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
Jörgen Stenarson
fix for #1809, failing tests in IPython.zmq...
r7119
Jörgen Stenarson
change to pass whole env + temporary IPYTHONDIR
r7120 env = os.environ.copy()
env["IPYTHONDIR"] = IPYTHONDIR
Jörgen Stenarson
fix for #1809, failing tests in IPython.zmq...
r7119
MinRK
add basic embed_kernel tests
r6569 save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 @contextmanager
def setup_kernel(cmd):
MinRK
add basic embed_kernel tests
r6569 """start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 kernel_manager: connected KernelManager instance
MinRK
add basic embed_kernel tests
r6569 """
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
connection_file = os.path.join(IPYTHONDIR,
'profile_default',
'security',
'kernel-%i.json' % kernel.pid
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
MinRK
switch order of failed-to-start errors in test_embed_kernel...
r6904 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
MinRK
add basic embed_kernel tests
r6569 time.sleep(0.1)
MinRK
switch order of failed-to-start errors in test_embed_kernel...
r6904 if kernel.poll() is not None:
o,e = kernel.communicate()
e = py3compat.cast_unicode(e)
raise IOError("Kernel failed to start:\n%s" % e)
MinRK
add basic embed_kernel tests
r6569 if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
MinRK
update embed_kernel test to Client API
r10291 client = BlockingKernelClient(connection_file=connection_file)
client.load_connection_file()
client.start_channels()
MinRK
add basic embed_kernel tests
r6569
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 try:
MinRK
update embed_kernel test to Client API
r10291 yield client
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 finally:
MinRK
update embed_kernel test to Client API
r10291 client.stop_channels()
MinRK
terminate kernel after embed_kernel tests
r6759 kernel.terminate()
MinRK
add basic embed_kernel tests
r6569
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 # oinfo a (int)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('a')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
MinRK
add basic embed_kernel tests
r6569
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.execute("c=a*2")
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['status'], u'ok')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo c (should be 10)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('c')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'10')
MinRK
add basic embed_kernel tests
r6569
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 # oinfo a (int)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('a')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'5')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo b (str)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('b')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'hi there')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo c (undefined)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('c')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_false(content['found'])
MinRK
add basic embed_kernel tests
r6569
MinRK
test re-entrant embed_kernel
r6830 def test_embed_kernel_reentrant():
"""IPython.embed_kernel() can be called multiple times"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'count = 0',
'def go():',
' global count',
' embed_kernel()',
' count = count + 1',
'',
'while True:'
' go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
test re-entrant embed_kernel
r6830 for i in range(5):
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('count')
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
test re-entrant embed_kernel
r6830 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], unicode(i))
MinRK
test re-entrant embed_kernel
r6830
# exit from embed_kernel
MinRK
expose shell channel methods at the client level
r10294 client.execute("get_ipython().exit_now = True")
msg = client.get_shell_msg(block=True, timeout=2)
MinRK
test re-entrant embed_kernel
r6830 time.sleep(0.2)