test_kernelmanager.py
61 lines
| 1.9 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r9151 | """Tests for the notebook kernel and session manager""" | ||
Brian Granger
|
r9126 | |||
Brian E. Granger
|
r9132 | from subprocess import PIPE | ||
Brian E. Granger
|
r9131 | import time | ||
Brian Granger
|
r9126 | from unittest import TestCase | ||
MinRK
|
r9156 | from IPython.testing import decorators as dec | ||
Brian Granger
|
r9126 | from IPython.config.loader import Config | ||
MinRK
|
r10284 | from IPython.kernel import KernelManager | ||
Brian Granger
|
r9126 | |||
class TestKernelManager(TestCase): | ||||
def _get_tcp_km(self): | ||||
Brian Granger
|
r10282 | c = Config() | ||
km = KernelManager(config=c) | ||||
return km | ||||
Brian Granger
|
r9126 | |||
def _get_ipc_km(self): | ||||
c = Config() | ||||
c.KernelManager.transport = 'ipc' | ||||
c.KernelManager.ip = 'test' | ||||
km = KernelManager(config=c) | ||||
return km | ||||
def _run_lifecycle(self, km): | ||||
Brian E. Granger
|
r9132 | km.start_kernel(stdout=PIPE, stderr=PIPE) | ||
Brian E. Granger
|
r10275 | self.assertTrue(km.is_alive()) | ||
Brian Granger
|
r9126 | km.restart_kernel() | ||
Brian E. Granger
|
r10275 | self.assertTrue(km.is_alive()) | ||
Brian E. Granger
|
r9131 | # We need a delay here to give the restarting kernel a chance to | ||
# restart. Otherwise, the interrupt will kill it, causing the test | ||||
# suite to hang. The reason it *hangs* is that the shutdown | ||||
# message for the restart sometimes hasn't been sent to the kernel. | ||||
# Because linger is oo on the shell channel, the context can't | ||||
# close until the message is sent to the kernel, which is not dead. | ||||
Brian E. Granger
|
r9132 | time.sleep(1.0) | ||
Brian Granger
|
r9126 | km.interrupt_kernel() | ||
self.assertTrue(isinstance(km, KernelManager)) | ||||
km.shutdown_kernel() | ||||
Brian E. Granger
|
r9131 | def test_tcp_lifecycle(self): | ||
Brian Granger
|
r9126 | km = self._get_tcp_km() | ||
self._run_lifecycle(km) | ||||
MinRK
|
r9156 | @dec.skip_win32 | ||
MinRK
|
r9376 | def test_ipc_lifecycle(self): | ||
Brian Granger
|
r9126 | km = self._get_ipc_km() | ||
self._run_lifecycle(km) | ||||
MinRK
|
r11840 | |||
def test_get_connect_info(self): | ||||
km = self._get_tcp_km() | ||||
cinfo = km.get_connection_info() | ||||
keys = sorted(cinfo.keys()) | ||||
expected = sorted([ | ||||
'ip', 'transport', | ||||
'hb_port', 'shell_port', 'stdin_port', 'iopub_port', 'control_port', | ||||
'key', 'signature_scheme', | ||||
]) | ||||
self.assertEqual(keys, expected) | ||||
Brian E. Granger
|
r9131 | |||