##// END OF EJS Templates
Change $.post to this.post in the kernel js file...
Change $.post to this.post in the kernel js file This makes it easy to override the post function for custom communication requirements. Any replacement function, of course, needs to have the same semantics as $.post.

File last commit:

r13353:0ca701d5
r16277:4e3aea89
Show More
test_embed_kernel.py
194 lines | 5.8 KiB | text/x-python | PythonLexer
/ IPython / kernel / zmq / tests / test_embed_kernel.py
MinRK
add basic embed_kernel tests
r6569 """test IPython.embed_kernel()"""
#-------------------------------------------------------------------------------
# Copyright (C) 2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
import time
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 from contextlib import contextmanager
MinRK
add basic embed_kernel tests
r6569 from subprocess import Popen, PIPE
import nose.tools as nt
MinRK
update embed_kernel test to Client API
r10291 from IPython.kernel import BlockingKernelClient
MinRK
switch order of failed-to-start errors in test_embed_kernel...
r6904 from IPython.utils import path, py3compat
Thomas Kluyver
Replace references to unicode and basestring
r13353 from IPython.utils.py3compat import unicode_type
MinRK
add basic embed_kernel tests
r6569
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
MinRK
apply the same long-timeout logic to kernel tests...
r11802 SETUP_TIMEOUT = 60
TIMEOUT = 15
MinRK
add basic embed_kernel tests
r6569 def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
Jörgen Stenarson
fix for #1809, failing tests in IPython.zmq...
r7119
Jörgen Stenarson
change to pass whole env + temporary IPYTHONDIR
r7120 env = os.environ.copy()
env["IPYTHONDIR"] = IPYTHONDIR
Jörgen Stenarson
fix for #1809, failing tests in IPython.zmq...
r7119
MinRK
add basic embed_kernel tests
r6569 save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 @contextmanager
def setup_kernel(cmd):
MinRK
add basic embed_kernel tests
r6569 """start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 kernel_manager: connected KernelManager instance
MinRK
add basic embed_kernel tests
r6569 """
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
connection_file = os.path.join(IPYTHONDIR,
'profile_default',
'security',
'kernel-%i.json' % kernel.pid
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
MinRK
apply the same long-timeout logic to kernel tests...
r11802 while not os.path.exists(connection_file) \
and kernel.poll() is None \
and time.time() < tic + SETUP_TIMEOUT:
MinRK
add basic embed_kernel tests
r6569 time.sleep(0.1)
MinRK
switch order of failed-to-start errors in test_embed_kernel...
r6904 if kernel.poll() is not None:
o,e = kernel.communicate()
e = py3compat.cast_unicode(e)
raise IOError("Kernel failed to start:\n%s" % e)
MinRK
add basic embed_kernel tests
r6569 if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
MinRK
update embed_kernel test to Client API
r10291 client = BlockingKernelClient(connection_file=connection_file)
client.load_connection_file()
client.start_channels()
MinRK
add basic embed_kernel tests
r6569
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 try:
MinRK
update embed_kernel test to Client API
r10291 yield client
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 finally:
MinRK
update embed_kernel test to Client API
r10291 client.stop_channels()
MinRK
terminate kernel after embed_kernel tests
r6759 kernel.terminate()
MinRK
add basic embed_kernel tests
r6569
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 # oinfo a (int)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('a')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
MinRK
add basic embed_kernel tests
r6569
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.execute("c=a*2")
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['status'], u'ok')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo c (should be 10)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('c')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'10')
MinRK
add basic embed_kernel tests
r6569
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 # oinfo a (int)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('a')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'5')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo b (str)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('b')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_true(content['found'])
Bradley M. Froehle
s/nt.assert_equals/nt.assert_equal/
r7875 nt.assert_equal(content['string_form'], u'hi there')
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633
# oinfo c (undefined)
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('c')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
ensure kernels are cleaned up in embed_kernel tests
r6633 content = msg['content']
nt.assert_false(content['found'])
MinRK
add basic embed_kernel tests
r6569
MinRK
test re-entrant embed_kernel
r6830 def test_embed_kernel_reentrant():
"""IPython.embed_kernel() can be called multiple times"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'count = 0',
'def go():',
' global count',
' embed_kernel()',
' count = count + 1',
'',
'while True:'
' go()',
'',
])
MinRK
update embed_kernel test to Client API
r10291 with setup_kernel(cmd) as client:
MinRK
test re-entrant embed_kernel
r6830 for i in range(5):
MinRK
expose shell channel methods at the client level
r10294 msg_id = client.object_info('count')
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
test re-entrant embed_kernel
r6830 content = msg['content']
nt.assert_true(content['found'])
Thomas Kluyver
Replace references to unicode and basestring
r13353 nt.assert_equal(content['string_form'], unicode_type(i))
MinRK
test re-entrant embed_kernel
r6830
# exit from embed_kernel
MinRK
expose shell channel methods at the client level
r10294 client.execute("get_ipython().exit_now = True")
MinRK
apply the same long-timeout logic to kernel tests...
r11802 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
MinRK
test re-entrant embed_kernel
r6830 time.sleep(0.2)