test_kernelmanager.py
108 lines
| 3.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r16580 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
epatters
|
r8475 | |||
from __future__ import print_function | ||||
import unittest | ||||
MinRK
|
r10298 | from IPython.kernel.inprocess.blocking import BlockingInProcessKernelClient | ||
from IPython.kernel.inprocess.manager import InProcessKernelManager | ||||
epatters
|
r8475 | |||
#----------------------------------------------------------------------------- | ||||
# Test case | ||||
#----------------------------------------------------------------------------- | ||||
class InProcessKernelManagerTestCase(unittest.TestCase): | ||||
MinRK
|
r10298 | def test_interface(self): | ||
epatters
|
r8475 | """ Does the in-process kernel manager implement the basic KM interface? | ||
""" | ||||
MinRK
|
r10298 | km = InProcessKernelManager() | ||
epatters
|
r8475 | self.assert_(not km.has_kernel) | ||
km.start_kernel() | ||||
self.assert_(km.has_kernel) | ||||
self.assert_(km.kernel is not None) | ||||
MinRK
|
r10298 | kc = BlockingInProcessKernelClient(kernel=km.kernel) | ||
self.assert_(not kc.channels_running) | ||||
kc.start_channels() | ||||
self.assert_(kc.channels_running) | ||||
epatters
|
r8475 | old_kernel = km.kernel | ||
km.restart_kernel() | ||||
Thomas Kluyver
|
r18013 | self.assertIsNotNone(km.kernel) | ||
epatters
|
r8475 | self.assertNotEquals(km.kernel, old_kernel) | ||
km.shutdown_kernel() | ||||
self.assert_(not km.has_kernel) | ||||
self.assertRaises(NotImplementedError, km.interrupt_kernel) | ||||
self.assertRaises(NotImplementedError, km.signal_kernel, 9) | ||||
MinRK
|
r10298 | kc.stop_channels() | ||
self.assert_(not kc.channels_running) | ||||
epatters
|
r8475 | |||
def test_execute(self): | ||||
""" Does executing code in an in-process kernel work? | ||||
""" | ||||
MinRK
|
r10298 | km = InProcessKernelManager() | ||
epatters
|
r8475 | km.start_kernel() | ||
MinRK
|
r10298 | kc = BlockingInProcessKernelClient(kernel=km.kernel) | ||
kc.start_channels() | ||||
Thomas Kluyver
|
r19223 | kc.wait_for_ready() | ||
MinRK
|
r10298 | kc.execute('foo = 1') | ||
epatters
|
r8475 | self.assertEquals(km.kernel.shell.user_ns['foo'], 1) | ||
epatters
|
r8481 | def test_complete(self): | ||
""" Does requesting completion from an in-process kernel work? | ||||
""" | ||||
MinRK
|
r10298 | km = InProcessKernelManager() | ||
epatters
|
r8481 | km.start_kernel() | ||
MinRK
|
r10298 | kc = BlockingInProcessKernelClient(kernel=km.kernel) | ||
kc.start_channels() | ||||
Thomas Kluyver
|
r19223 | kc.wait_for_ready() | ||
epatters
|
r8481 | km.kernel.shell.push({'my_bar': 0, 'my_baz': 1}) | ||
MinRK
|
r16580 | kc.complete('my_ba', 5) | ||
MinRK
|
r10298 | msg = kc.get_shell_msg() | ||
self.assertEqual(msg['header']['msg_type'], 'complete_reply') | ||||
self.assertEqual(sorted(msg['content']['matches']), | ||||
epatters
|
r8481 | ['my_bar', 'my_baz']) | ||
MinRK
|
r16587 | def test_inspect(self): | ||
epatters
|
r8475 | """ Does requesting object information from an in-process kernel work? | ||
""" | ||||
MinRK
|
r10298 | km = InProcessKernelManager() | ||
epatters
|
r8475 | km.start_kernel() | ||
MinRK
|
r10298 | kc = BlockingInProcessKernelClient(kernel=km.kernel) | ||
kc.start_channels() | ||||
Thomas Kluyver
|
r19223 | kc.wait_for_ready() | ||
epatters
|
r8475 | km.kernel.shell.user_ns['foo'] = 1 | ||
MinRK
|
r16587 | kc.inspect('foo') | ||
MinRK
|
r10298 | msg = kc.get_shell_msg() | ||
MinRK
|
r16587 | self.assertEqual(msg['header']['msg_type'], 'inspect_reply') | ||
MinRK
|
r16580 | content = msg['content'] | ||
assert content['found'] | ||||
text = content['data']['text/plain'] | ||||
self.assertIn('int', text) | ||||
epatters
|
r8475 | |||
epatters
|
r8481 | def test_history(self): | ||
""" Does requesting history from an in-process kernel work? | ||||
""" | ||||
MinRK
|
r10298 | km = InProcessKernelManager() | ||
epatters
|
r8481 | km.start_kernel() | ||
MinRK
|
r10298 | kc = BlockingInProcessKernelClient(kernel=km.kernel) | ||
kc.start_channels() | ||||
Thomas Kluyver
|
r19223 | kc.wait_for_ready() | ||
MinRK
|
r10298 | kc.execute('%who') | ||
kc.history(hist_access_type='tail', n=1) | ||||
msg = kc.shell_channel.get_msgs()[-1] | ||||
epatters
|
r8481 | self.assertEquals(msg['header']['msg_type'], 'history_reply') | ||
history = msg['content']['history'] | ||||
self.assertEquals(len(history), 1) | ||||
self.assertEquals(history[0][2], '%who') | ||||
epatters
|
r8475 | |||
if __name__ == '__main__': | ||||
unittest.main() | ||||