##// END OF EJS Templates
Backport PR #2924: safe_run_module: Silence SystemExit codes 0 and None....
Backport PR #2924: safe_run_module: Silence SystemExit codes 0 and None. In `safe_execfile` we ignore SystemExit exceptions with codes 0 and 1. We don't do this for `safe_run_module` which leads to the following mismatch of tracebacks between Python and IPython: ``` $ cat > exit0.py import sys sys.exit(0) $ python -m exit0 $ ipython -m exit0 --------------------------------------------------------------------------- SystemExit Traceback (most recent call last) /usr/lib/python2.7/runpy.pyc in run_module(mod_name, init_globals, run_name, alter_sys) 174 if alter_sys: 175 return _run_module_code(code, init_globals, run_name, --> 176 fname, loader, pkg_name) 177 else: 178 # Leave the sys module alone /usr/lib/python2.7/runpy.pyc in _run_module_code(code, init_globals, mod_name, mod_fname, mod_loader, pkg_name) 80 mod_globals = temp_module.module.__dict__ 81 _run_code(code, mod_globals, init_globals, ---> 82 mod_name, mod_fname, mod_loader, pkg_name) 83 # Copy the globals of the temporary module, as they 84 # may be cleared when the temporary module goes away /usr/lib/python2.7/runpy.pyc in _run_code(code, run_globals, init_globals, mod_name, mod_fname, mod_loader, pkg_name) 70 __loader__ = mod_loader, 71 __package__ = pkg_name) ---> 72 exec code in run_globals 73 return run_globals 74 /tmp/exit0.py in <module>() 1 import sys ----> 2 sys.exit(0) SystemExit: 0 WARNING: Unknown failure executing module: <exit0> ``` The attached pull request silences SystemExit exceptions with codes 0 and None.

File last commit:

r7120:18f8b27c
r9972:8ab632a4
Show More
test_embed_kernel.py
193 lines | 5.6 KiB | text/x-python | PythonLexer
/ IPython / zmq / tests / test_embed_kernel.py
"""test IPython.embed_kernel()"""
#-------------------------------------------------------------------------------
# Copyright (C) 2012 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import os
import shutil
import sys
import tempfile
import time
from contextlib import contextmanager
from subprocess import Popen, PIPE
import nose.tools as nt
from IPython.zmq.blockingkernelmanager import BlockingKernelManager
from IPython.utils import path, py3compat
#-------------------------------------------------------------------------------
# Tests
#-------------------------------------------------------------------------------
def setup():
"""setup temporary IPYTHONDIR for tests"""
global IPYTHONDIR
global env
global save_get_ipython_dir
IPYTHONDIR = tempfile.mkdtemp()
env = os.environ.copy()
env["IPYTHONDIR"] = IPYTHONDIR
save_get_ipython_dir = path.get_ipython_dir
path.get_ipython_dir = lambda : IPYTHONDIR
def teardown():
path.get_ipython_dir = save_get_ipython_dir
try:
shutil.rmtree(IPYTHONDIR)
except (OSError, IOError):
# no such file
pass
@contextmanager
def setup_kernel(cmd):
"""start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
kernel_manager: connected KernelManager instance
"""
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
connection_file = os.path.join(IPYTHONDIR,
'profile_default',
'security',
'kernel-%i.json' % kernel.pid
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
time.sleep(0.1)
if kernel.poll() is not None:
o,e = kernel.communicate()
e = py3compat.cast_unicode(e)
raise IOError("Kernel failed to start:\n%s" % e)
if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
km = BlockingKernelManager(connection_file=connection_file)
km.load_connection_file()
km.start_channels()
try:
yield km
finally:
km.stop_channels()
kernel.terminate()
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
msg_id = shell.execute("c=a*2")
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_equals(content['status'], u'ok')
# oinfo c (should be 10)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'10')
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
# oinfo a (int)
msg_id = shell.object_info('a')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'5')
# oinfo b (str)
msg_id = shell.object_info('b')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], u'hi there')
# oinfo c (undefined)
msg_id = shell.object_info('c')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_false(content['found'])
def test_embed_kernel_reentrant():
"""IPython.embed_kernel() can be called multiple times"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'count = 0',
'def go():',
' global count',
' embed_kernel()',
' count = count + 1',
'',
'while True:'
' go()',
'',
])
with setup_kernel(cmd) as km:
shell = km.shell_channel
for i in range(5):
msg_id = shell.object_info('count')
msg = shell.get_msg(block=True, timeout=2)
content = msg['content']
nt.assert_true(content['found'])
nt.assert_equals(content['string_form'], unicode(i))
# exit from embed_kernel
shell.execute("get_ipython().exit_now = True")
msg = shell.get_msg(block=True, timeout=2)
time.sleep(0.2)