##// END OF EJS Templates
Merge pull request #1732 from fperez/cellmagics...
Merge pull request #1732 from fperez/cellmagics Refactoring of the magics system and implementation of cell magics. This PR completely refactors the magic system, finally moving the magic objects to standalone, independent objects instead of being the mixin class we'd had since the beginning of IPython. Now, a separate base class is provided in IPython.core.magic.Magics that users can subclass to create their own magics. Decorators are also provided to create magics from simple functions without the need for object orientation. All builtin magics now exist in a few subclasses that group together related functionality, and the new IPython.core.magics package has been created to organize this into smaller files. This cleanup was the last major piece of deep refactoring needed from the original 2001 codebase. Secondly, this PR introduces a new type of magic function, prefixed with `%%` instead of `%`, which operates at the cell level. A cell magic receives two arguments: the line it is called on (like a line magic) and the body of the cell below it. Cell magics are most natural in the notebook, but they also work in the terminal and qt console, with the usual approach of using a blank line to signal cell termination. This PR closes #1611, or IPEP 1, where the design had been discussed.

File last commit:

r6990:681e926d
r7011:61eb2ffe merge
Show More
test_embed_kernel.py
192 lines | 5.7 KiB | text/x-python | PythonLexer
/ IPython / zmq / tests / test_embed_kernel.py
"""test IPython.embed_kernel()"""
#-------------------------------------------------------------------------------
# Copyright (C) 2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
import time
from contextlib import contextmanager
from subprocess import Popen, PIPE
import nose.tools as nt
from IPython.zmq.blockingkernelmanager import BlockingKernelManager
from IPython.utils import path, py3compat
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
env = dict(IPYTHONDIR=IPYTHONDIR)
if 'PYTHONPATH' in os.environ:
env['PYTHONPATH'] = os.environ['PYTHONPATH']
save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
@contextmanager
def setup_kernel(cmd):
"""start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
kernel_manager: connected KernelManager instance
"""
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
connection_file = os.path.join(IPYTHONDIR,
'profile_default',
'security',
'kernel-%i.json' % kernel.pid
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
time.sleep(0.1)
if kernel.poll() is not None:
o,e = kernel.communicate()
e = py3compat.cast_unicode(e)
raise IOError("Kernel failed to start:\n%s" % e)
if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
km = BlockingKernelManager(connection_file=connection_file)
km.load_connection_file()
km.start_channels()
try:
yield km
finally:
km.stop_channels()
kernel.terminate()
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
msg_id = shell.execute("c=a*2")
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_equals(content['status'], u'ok')
# oinfo c (should be 10)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'10')
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'5')
# oinfo b (str)
msg_id = shell.object_info('b')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'hi there')
# oinfo c (undefined)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_false(content['found'])
def test_embed_kernel_reentrant():
"""IPython.embed_kernel() can be called multiple times"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'count = 0',
'def go():',
' global count',
' embed_kernel()',
' count = count + 1',
'',
'while True:'
' go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
for i in range(5):
msg_id = shell.object_info('count')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], unicode(i))
# exit from embed_kernel
shell.execute("get_ipython().exit_now = True")
msg = shell.get_msg(block=True, timeout=2)
time.sleep(0.2)