##// END OF EJS Templates
Load the link models
Load the link models

File last commit:

r16591:6a91fb65
r18056:f59d9a16
Show More
test_kernelmanager.py
104 lines | 3.5 KiB | text/x-python | PythonLexer
MinRK
update completion_ and objection_info_request...
r16580 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475
from __future__ import print_function
import unittest
MinRK
update inprocess kernel to new layout...
r10298 from IPython.kernel.inprocess.blocking import BlockingInProcessKernelClient
from IPython.kernel.inprocess.manager import InProcessKernelManager
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475
#-----------------------------------------------------------------------------
# Test case
#-----------------------------------------------------------------------------
class InProcessKernelManagerTestCase(unittest.TestCase):
MinRK
update inprocess kernel to new layout...
r10298 def test_interface(self):
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 """ Does the in-process kernel manager implement the basic KM interface?
"""
MinRK
update inprocess kernel to new layout...
r10298 km = InProcessKernelManager()
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 self.assert_(not km.has_kernel)
km.start_kernel()
self.assert_(km.has_kernel)
self.assert_(km.kernel is not None)
MinRK
update inprocess kernel to new layout...
r10298 kc = BlockingInProcessKernelClient(kernel=km.kernel)
self.assert_(not kc.channels_running)
kc.start_channels()
self.assert_(kc.channels_running)
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 old_kernel = km.kernel
km.restart_kernel()
self.assert_(km.kernel is not None)
self.assertNotEquals(km.kernel, old_kernel)
km.shutdown_kernel()
self.assert_(not km.has_kernel)
self.assertRaises(NotImplementedError, km.interrupt_kernel)
self.assertRaises(NotImplementedError, km.signal_kernel, 9)
MinRK
update inprocess kernel to new layout...
r10298 kc.stop_channels()
self.assert_(not kc.channels_running)
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475
def test_execute(self):
""" Does executing code in an in-process kernel work?
"""
MinRK
update inprocess kernel to new layout...
r10298 km = InProcessKernelManager()
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 km.start_kernel()
MinRK
update inprocess kernel to new layout...
r10298 kc = BlockingInProcessKernelClient(kernel=km.kernel)
kc.start_channels()
kc.execute('foo = 1')
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 self.assertEquals(km.kernel.shell.user_ns['foo'], 1)
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 def test_complete(self):
""" Does requesting completion from an in-process kernel work?
"""
MinRK
update inprocess kernel to new layout...
r10298 km = InProcessKernelManager()
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 km.start_kernel()
MinRK
update inprocess kernel to new layout...
r10298 kc = BlockingInProcessKernelClient(kernel=km.kernel)
kc.start_channels()
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 km.kernel.shell.push({'my_bar': 0, 'my_baz': 1})
MinRK
update completion_ and objection_info_request...
r16580 kc.complete('my_ba', 5)
MinRK
update inprocess kernel to new layout...
r10298 msg = kc.get_shell_msg()
self.assertEqual(msg['header']['msg_type'], 'complete_reply')
self.assertEqual(sorted(msg['content']['matches']),
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 ['my_bar', 'my_baz'])
MinRK
s/object_info_request/inspect_request
r16587 def test_inspect(self):
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 """ Does requesting object information from an in-process kernel work?
"""
MinRK
update inprocess kernel to new layout...
r10298 km = InProcessKernelManager()
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 km.start_kernel()
MinRK
update inprocess kernel to new layout...
r10298 kc = BlockingInProcessKernelClient(kernel=km.kernel)
kc.start_channels()
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475 km.kernel.shell.user_ns['foo'] = 1
MinRK
s/object_info_request/inspect_request
r16587 kc.inspect('foo')
MinRK
update inprocess kernel to new layout...
r10298 msg = kc.get_shell_msg()
MinRK
s/object_info_request/inspect_request
r16587 self.assertEqual(msg['header']['msg_type'], 'inspect_reply')
MinRK
update completion_ and objection_info_request...
r16580 content = msg['content']
assert content['found']
text = content['data']['text/plain']
self.assertIn('int', text)
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 def test_history(self):
""" Does requesting history from an in-process kernel work?
"""
MinRK
update inprocess kernel to new layout...
r10298 km = InProcessKernelManager()
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 km.start_kernel()
MinRK
update inprocess kernel to new layout...
r10298 kc = BlockingInProcessKernelClient(kernel=km.kernel)
kc.start_channels()
kc.execute('%who')
kc.history(hist_access_type='tail', n=1)
msg = kc.shell_channel.get_msgs()[-1]
epatters
TST: Add more unit tests for in-process kernel manager.
r8481 self.assertEquals(msg['header']['msg_type'], 'history_reply')
history = msg['content']['history']
self.assertEquals(len(history), 1)
self.assertEquals(history[0][2], '%who')
epatters
TST: Add some unit tests for in-process kernel and kernel manager.
r8475
if __name__ == '__main__':
unittest.main()