test_kernelmanager.py
80 lines
| 2.7 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r4344 | """Tests for the notebook kernel and session manager.""" | ||
Brian E. Granger
|
r4343 | |||
Brian E. Granger
|
r9132 | from subprocess import PIPE | ||
Brian E. Granger
|
r9131 | import time | ||
Brian E. Granger
|
r4343 | from unittest import TestCase | ||
MinRK
|
r9156 | from IPython.testing import decorators as dec | ||
Brian E. Granger
|
r9116 | from IPython.config.loader import Config | ||
MinRK
|
r4960 | from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager | ||
W. Trevor King
|
r9252 | from IPython.utils.localinterfaces import LOCALHOST | ||
Brian E. Granger
|
r9116 | from IPython.zmq.kernelmanager import KernelManager | ||
Brian E. Granger
|
r4343 | |||
class TestKernelManager(TestCase): | ||||
Brian E. Granger
|
r9116 | def _get_tcp_km(self): | ||
return MultiKernelManager() | ||||
def _get_ipc_km(self): | ||||
c = Config() | ||||
c.KernelManager.transport = 'ipc' | ||||
c.KernelManager.ip = 'test' | ||||
km = MultiKernelManager(config=c) | ||||
return km | ||||
def _run_lifecycle(self, km): | ||||
Brian E. Granger
|
r9132 | kid = km.start_kernel(stdout=PIPE, stderr=PIPE) | ||
Bradley M. Froehle
|
r7876 | self.assertTrue(kid in km) | ||
Brian E. Granger
|
r9112 | self.assertTrue(kid in km.list_kernel_ids()) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(km),1) | ||
Brian E. Granger
|
r9113 | km.restart_kernel(kid) | ||
self.assertTrue(kid in km.list_kernel_ids()) | ||||
Brian E. Granger
|
r9131 | # We need a delay here to give the restarting kernel a chance to | ||
# restart. Otherwise, the interrupt will kill it, causing the test | ||||
# suite to hang. The reason it *hangs* is that the shutdown | ||||
# message for the restart sometimes hasn't been sent to the kernel. | ||||
# Because linger is oo on the shell channel, the context can't | ||||
# close until the message is sent to the kernel, which is not dead. | ||||
time.sleep(1.0) | ||||
Brian E. Granger
|
r9112 | km.interrupt_kernel(kid) | ||
Brian E. Granger
|
r9116 | k = km.get_kernel(kid) | ||
self.assertTrue(isinstance(k, KernelManager)) | ||||
km.shutdown_kernel(kid) | ||||
Bradley M. Froehle
|
r7876 | self.assertTrue(not kid in km) | ||
Brian E. Granger
|
r4343 | |||
Brian Granger
|
r9124 | def _run_cinfo(self, km, transport, ip): | ||
Brian E. Granger
|
r9132 | kid = km.start_kernel(stdout=PIPE, stderr=PIPE) | ||
Brian E. Granger
|
r9116 | k = km.get_kernel(kid) | ||
cinfo = km.get_connection_info(kid) | ||||
Brian Granger
|
r9124 | self.assertEqual(transport, cinfo['transport']) | ||
self.assertEqual(ip, cinfo['ip']) | ||||
Brian E. Granger
|
r9116 | self.assertTrue('stdin_port' in cinfo) | ||
self.assertTrue('iopub_port' in cinfo) | ||||
MinRK
|
r9158 | stream = km.create_iopub_stream(kid) | ||
stream.close() | ||||
Brian E. Granger
|
r9116 | self.assertTrue('shell_port' in cinfo) | ||
MinRK
|
r9158 | stream = km.create_shell_stream(kid) | ||
stream.close() | ||||
Brian E. Granger
|
r9116 | self.assertTrue('hb_port' in cinfo) | ||
MinRK
|
r9158 | stream = km.create_hb_stream(kid) | ||
stream.close() | ||||
Brian E. Granger
|
r9116 | km.shutdown_kernel(kid) | ||
Brian E. Granger
|
r4343 | |||
Brian E. Granger
|
r9131 | def test_tcp_lifecycle(self): | ||
Brian Granger
|
r9124 | km = self._get_tcp_km() | ||
self._run_lifecycle(km) | ||||
MinRK
|
r9156 | @dec.skip_win32 | ||
Brian Granger
|
r9124 | def test_tcp_cinfo(self): | ||
km = self._get_tcp_km() | ||||
W. Trevor King
|
r9252 | self._run_cinfo(km, 'tcp', LOCALHOST) | ||
Brian Granger
|
r9124 | |||
Brian E. Granger
|
r9131 | def test_ipc_lifecycle(self): | ||
Brian Granger
|
r9124 | km = self._get_ipc_km() | ||
self._run_lifecycle(km) | ||||
Brian E. Granger
|
r9116 | def test_ipc_cinfo(self): | ||
km = self._get_ipc_km() | ||||
Brian Granger
|
r9124 | self._run_cinfo(km, 'ipc', 'test') | ||
Brian E. Granger
|
r9131 | |||