"""test IPython.embed_kernel()""" #------------------------------------------------------------------------------- # Copyright (C) 2012 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import os import shutil import sys import tempfile import time from contextlib import contextmanager from subprocess import Popen, PIPE import nose.tools as nt from jupyter_client import BlockingKernelClient from IPython.utils import path, py3compat from IPython.utils.py3compat import unicode_type #------------------------------------------------------------------------------- # Tests #------------------------------------------------------------------------------- SETUP_TIMEOUT = 60 TIMEOUT = 15 def setup(): """setup temporary IPYTHONDIR for tests""" global IPYTHONDIR global env global save_get_ipython_dir IPYTHONDIR = tempfile.mkdtemp() env = os.environ.copy() env["IPYTHONDIR"] = IPYTHONDIR save_get_ipython_dir = path.get_ipython_dir path.get_ipython_dir = lambda : IPYTHONDIR def teardown(): path.get_ipython_dir = save_get_ipython_dir try: shutil.rmtree(IPYTHONDIR) except (OSError, IOError): # no such file pass @contextmanager def setup_kernel(cmd): """start an embedded kernel in a subprocess, and wait for it to be ready Returns ------- kernel_manager: connected KernelManager instance """ kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env) connection_file = os.path.join(IPYTHONDIR, 'profile_default', 'security', 'kernel-%i.json' % kernel.pid ) # wait for connection file to exist, timeout after 5s tic = time.time() while not os.path.exists(connection_file) \ and kernel.poll() is None \ and time.time() < tic + SETUP_TIMEOUT: time.sleep(0.1) if kernel.poll() is not None: o,e = kernel.communicate() e = py3compat.cast_unicode(e) raise IOError("Kernel failed to start:\n%s" % e) if not os.path.exists(connection_file): if kernel.poll() is None: kernel.terminate() raise IOError("Connection file %r never arrived" % connection_file) client = BlockingKernelClient(connection_file=connection_file) client.load_connection_file() client.start_channels() client.wait_for_ready() try: yield client finally: client.stop_channels() kernel.terminate() def test_embed_kernel_basic(): """IPython.embed_kernel() is basically functional""" cmd = '\n'.join([ 'from IPython import embed_kernel', 'def go():', ' a=5', ' b="hi there"', ' embed_kernel()', 'go()', '', ]) with setup_kernel(cmd) as client: # oinfo a (int) msg_id = client.inspect('a') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_true(content['found']) msg_id = client.execute("c=a*2") msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_equal(content['status'], u'ok') # oinfo c (should be 10) msg_id = client.inspect('c') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_true(content['found']) text = content['data']['text/plain'] nt.assert_in('10', text) def test_embed_kernel_namespace(): """IPython.embed_kernel() inherits calling namespace""" cmd = '\n'.join([ 'from IPython import embed_kernel', 'def go():', ' a=5', ' b="hi there"', ' embed_kernel()', 'go()', '', ]) with setup_kernel(cmd) as client: # oinfo a (int) msg_id = client.inspect('a') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_true(content['found']) text = content['data']['text/plain'] nt.assert_in(u'5', text) # oinfo b (str) msg_id = client.inspect('b') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_true(content['found']) text = content['data']['text/plain'] nt.assert_in(u'hi there', text) # oinfo c (undefined) msg_id = client.inspect('c') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_false(content['found']) def test_embed_kernel_reentrant(): """IPython.embed_kernel() can be called multiple times""" cmd = '\n'.join([ 'from IPython import embed_kernel', 'count = 0', 'def go():', ' global count', ' embed_kernel()', ' count = count + 1', '', 'while True:' ' go()', '', ]) with setup_kernel(cmd) as client: for i in range(5): msg_id = client.inspect('count') msg = client.get_shell_msg(block=True, timeout=TIMEOUT) content = msg['content'] nt.assert_true(content['found']) text = content['data']['text/plain'] nt.assert_in(unicode_type(i), text) # exit from embed_kernel client.execute("get_ipython().exit_now = True") msg = client.get_shell_msg(block=True, timeout=TIMEOUT) time.sleep(0.2)