test_kernelmanager.py
54 lines
| 1.7 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r9151 | """Tests for the notebook kernel and session manager""" | ||
Brian Granger
|
r9126 | |||
Brian E. Granger
|
r9132 | from subprocess import PIPE | ||
Brian E. Granger
|
r9131 | import time | ||
Brian Granger
|
r9126 | from unittest import TestCase | ||
MinRK
|
r9156 | from IPython.testing import decorators as dec | ||
Brian Granger
|
r9126 | from IPython.config.loader import Config | ||
MinRK
|
r9370 | from IPython.kernel.kernelmanager import KernelManager | ||
Brian Granger
|
r9126 | |||
class TestKernelManager(TestCase): | ||||
def _get_tcp_km(self): | ||||
Brian Granger
|
r10282 | c = Config() | ||
# c.KernelManager.autorestart=False | ||||
km = KernelManager(config=c) | ||||
return km | ||||
Brian Granger
|
r9126 | |||
def _get_ipc_km(self): | ||||
c = Config() | ||||
c.KernelManager.transport = 'ipc' | ||||
c.KernelManager.ip = 'test' | ||||
Brian Granger
|
r10282 | # c.KernelManager.autorestart=False | ||
Brian Granger
|
r9126 | km = KernelManager(config=c) | ||
return km | ||||
def _run_lifecycle(self, km): | ||||
Brian E. Granger
|
r9132 | km.start_kernel(stdout=PIPE, stderr=PIPE) | ||
Brian E. Granger
|
r10275 | self.assertTrue(km.is_alive()) | ||
Brian Granger
|
r9126 | km.start_channels(shell=True, iopub=False, stdin=False, hb=False) | ||
km.restart_kernel() | ||||
Brian E. Granger
|
r10275 | self.assertTrue(km.is_alive()) | ||
Brian E. Granger
|
r9131 | # We need a delay here to give the restarting kernel a chance to | ||
# restart. Otherwise, the interrupt will kill it, causing the test | ||||
# suite to hang. The reason it *hangs* is that the shutdown | ||||
# message for the restart sometimes hasn't been sent to the kernel. | ||||
# Because linger is oo on the shell channel, the context can't | ||||
# close until the message is sent to the kernel, which is not dead. | ||||
Brian E. Granger
|
r9132 | time.sleep(1.0) | ||
Brian Granger
|
r9126 | km.interrupt_kernel() | ||
self.assertTrue(isinstance(km, KernelManager)) | ||||
km.shutdown_kernel() | ||||
km.shell_channel.stop() | ||||
Brian E. Granger
|
r9131 | def test_tcp_lifecycle(self): | ||
Brian Granger
|
r9126 | km = self._get_tcp_km() | ||
self._run_lifecycle(km) | ||||
MinRK
|
r9156 | @dec.skip_win32 | ||
MinRK
|
r9376 | def test_ipc_lifecycle(self): | ||
Brian Granger
|
r9126 | km = self._get_ipc_km() | ||
self._run_lifecycle(km) | ||||
Brian E. Granger
|
r9131 | |||