##// END OF EJS Templates
Refactoring kernel restarting.
Brian Granger -
Show More
@@ -0,0 +1,50 b''
1 """A kernel manager with ioloop based logic."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 from __future__ import absolute_import
15
16 import zmq
17 from zmq.eventloop import ioloop
18
19 from IPython.utils.traitlets import (
20 Instance
21 )
22
23 from .blockingkernelmanager import BlockingKernelManager
24 from .ioloopkernelrestarter import IOLoopKernelRestarter
25
26 #-----------------------------------------------------------------------------
27 # Code
28 #-----------------------------------------------------------------------------
29
30 class IOLoopKernelManager(BlockingKernelManager):
31
32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
33 def _loop_default(self):
34 return ioloop.IOLoop.instance()
35
36 _restarter = Instance('IPython.kernel.ioloopkernelrestarter.IOLoopKernelRestarter')
37
38 def start_restarter(self):
39 if self.autorestart and self.has_kernel:
40 if self._restarter is None:
41 self._restarter = IOLoopKernelRestarter(
42 kernel_manager=self, loop=self.loop,
43 config=self.config, log=self.log
44 )
45 self._restarter.start()
46
47 def stop_restarter(self):
48 if self.autorestart:
49 if self._restarter is not None:
50 self._restarter.stop()
@@ -407,6 +407,9 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
407 return jsonapi.dumps(msg, default=date_default)
407 return jsonapi.dumps(msg, default=date_default)
408
408
409 def _on_zmq_reply(self, msg_list):
409 def _on_zmq_reply(self, msg_list):
410 # Sometimes this gets triggered when the on_close method is scheduled in the
411 # eventloop but hasn't been called.
412 if self.stream.closed(): return
410 try:
413 try:
411 msg = self._reserialize_reply(msg_list)
414 msg = self._reserialize_reply(msg_list)
412 except Exception:
415 except Exception:
@@ -81,7 +81,7 b' class BlockingHBChannel(HBChannel):'
81
81
82
82
83 class BlockingKernelManager(KernelManager):
83 class BlockingKernelManager(KernelManager):
84
84
85 # The classes to use for the various channels.
85 # The classes to use for the various channels.
86 shell_channel_class = Type(BlockingShellChannel)
86 shell_channel_class = Type(BlockingShellChannel)
87 iopub_channel_class = Type(BlockingIOPubChannel)
87 iopub_channel_class = Type(BlockingIOPubChannel)
@@ -30,7 +30,7 b' from IPython.utils.traitlets import ('
30 # Code
30 # Code
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 class KernelRestarter(LoggingConfigurable):
33 class IOLoopKernelRestarter(LoggingConfigurable):
34 """Monitor and autorestart a kernel."""
34 """Monitor and autorestart a kernel."""
35
35
36 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
36 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
@@ -45,14 +45,11 b' class KernelRestarter(LoggingConfigurable):'
45
45
46 _pcallback = None
46 _pcallback = None
47
47
48 def __init__(self, **kwargs):
49 super(KernelRestarter, self).__init__(**kwargs)
50
51 def start(self):
48 def start(self):
52 """Start the polling of the kernel."""
49 """Start the polling of the kernel."""
53 if self._pcallback is None:
50 if self._pcallback is None:
54 self._pcallback = ioloop.PeriodicCallback(
51 self._pcallback = ioloop.PeriodicCallback(
55 self._poll, 1000*self.time_to_dead, self.ioloop
52 self._poll, 1000*self.time_to_dead, self.loop
56 )
53 )
57 self._pcallback.start()
54 self._pcallback.start()
58
55
@@ -68,12 +65,10 b' class KernelRestarter(LoggingConfigurable):'
68 self._pcallback = None
65 self._pcallback = None
69
66
70 def _poll(self):
67 def _poll(self):
68 self.log.info('Polling kernel...')
71 if not self.kernel_manager.is_alive():
69 if not self.kernel_manager.is_alive():
72 self.stop()
73 # This restart event should leave the connection file in place so
70 # This restart event should leave the connection file in place so
74 # the ports are the same. Because this takes place below the
71 # the ports are the same. Because this takes place below the
75 # MappingKernelManager, the kernel_id will also remain the same.
72 # MappingKernelManager, the kernel_id will also remain the same.
76 self.log('KernelRestarter: restarting kernel')
73 self.log.info('KernelRestarter: restarting kernel')
77 self.kernel_manager.restart_kernel(now=True);
74 self.kernel_manager.restart_kernel(now=True);
78 self.start()
79
@@ -21,14 +21,12 b' from __future__ import absolute_import'
21 import atexit
21 import atexit
22 import errno
22 import errno
23 import json
23 import json
24 from subprocess import Popen
25 import os
24 import os
26 import signal
25 import signal
27 import sys
26 import sys
28 from threading import Thread
27 from threading import Thread
29 import time
28 import time
30
29
31 # System library imports
32 import zmq
30 import zmq
33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
@@ -37,9 +35,11 b' from zmq.eventloop import ioloop, zmqstream'
37
35
38 # Local imports
36 # Local imports
39 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.utils.importstring import import_item
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 from IPython.utils.traitlets import (
40 from IPython.utils.traitlets import (
42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
41 Any, Instance, Type, Unicode, List, Integer, Bool,
42 CaselessStrEnum, DottedObjectName
43 )
43 )
44 from IPython.utils.py3compat import str_to_bytes
44 from IPython.utils.py3compat import str_to_bytes
45 from IPython.kernel import (
45 from IPython.kernel import (
@@ -54,7 +54,6 b' from .kernelmanagerabc import ('
54 KernelManagerABC
54 KernelManagerABC
55 )
55 )
56
56
57
58 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
59 # Constants and exceptions
58 # Constants and exceptions
60 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
@@ -691,11 +690,11 b' class KernelManager(Configurable):'
691 Override this if you have a custom
690 Override this if you have a custom
692 """
691 """
693 )
692 )
693
694 def _kernel_cmd_changed(self, name, old, new):
694 def _kernel_cmd_changed(self, name, old, new):
695 self.ipython_kernel = False
695 self.ipython_kernel = False
696
696
697 ipython_kernel = Bool(True)
697 ipython_kernel = Bool(True)
698
699
698
700 # The addresses for the communication channels.
699 # The addresses for the communication channels.
701 connection_file = Unicode('')
700 connection_file = Unicode('')
@@ -708,6 +707,7 b' class KernelManager(Configurable):'
708 Consoles on other machines will be able to connect
707 Consoles on other machines will be able to connect
709 to the Kernel, so be careful!"""
708 to the Kernel, so be careful!"""
710 )
709 )
710
711 def _ip_default(self):
711 def _ip_default(self):
712 if self.transport == 'ipc':
712 if self.transport == 'ipc':
713 if self.connection_file:
713 if self.connection_file:
@@ -716,9 +716,11 b' class KernelManager(Configurable):'
716 return 'kernel-ipc'
716 return 'kernel-ipc'
717 else:
717 else:
718 return LOCALHOST
718 return LOCALHOST
719
719 def _ip_changed(self, name, old, new):
720 def _ip_changed(self, name, old, new):
720 if new == '*':
721 if new == '*':
721 self.ip = '0.0.0.0'
722 self.ip = '0.0.0.0'
723
722 shell_port = Integer(0)
724 shell_port = Integer(0)
723 iopub_port = Integer(0)
725 iopub_port = Integer(0)
724 stdin_port = Integer(0)
726 stdin_port = Integer(0)
@@ -738,6 +740,10 b' class KernelManager(Configurable):'
738 _hb_channel = Any
740 _hb_channel = Any
739 _connection_file_written=Bool(False)
741 _connection_file_written=Bool(False)
740
742
743 autorestart = Bool(False, config=True,
744 help="""Should we autorestart the kernel if it dies."""
745 )
746
741 def __del__(self):
747 def __del__(self):
742 self.cleanup_connection_file()
748 self.cleanup_connection_file()
743
749
@@ -895,9 +901,19 b' class KernelManager(Configurable):'
895 self._connection_file_written = True
901 self._connection_file_written = True
896
902
897 #--------------------------------------------------------------------------
903 #--------------------------------------------------------------------------
904 # Kernel restarter
905 #--------------------------------------------------------------------------
906
907 def start_restarter(self):
908 pass
909
910 def stop_restarter(self):
911 pass
912
913 #--------------------------------------------------------------------------
898 # Kernel management
914 # Kernel management
899 #--------------------------------------------------------------------------
915 #--------------------------------------------------------------------------
900
916
901 def format_kernel_cmd(self, **kw):
917 def format_kernel_cmd(self, **kw):
902 """format templated args (e.g. {connection_file})"""
918 """format templated args (e.g. {connection_file})"""
903 if self.kernel_cmd:
919 if self.kernel_cmd:
@@ -948,6 +964,7 b' class KernelManager(Configurable):'
948 self.kernel = self._launch_kernel(kernel_cmd,
964 self.kernel = self._launch_kernel(kernel_cmd,
949 ipython_kernel=self.ipython_kernel,
965 ipython_kernel=self.ipython_kernel,
950 **kw)
966 **kw)
967 self.start_restarter()
951
968
952 def shutdown_kernel(self, now=False, restart=False):
969 def shutdown_kernel(self, now=False, restart=False):
953 """Attempts to the stop the kernel process cleanly.
970 """Attempts to the stop the kernel process cleanly.
@@ -967,15 +984,19 b' class KernelManager(Configurable):'
967 Will this kernel be restarted after it is shutdown. When this
984 Will this kernel be restarted after it is shutdown. When this
968 is True, connection files will not be cleaned up.
985 is True, connection files will not be cleaned up.
969 """
986 """
970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
971 if sys.platform == 'win32':
972 self._kill_kernel()
973 return
974
987
975 # Pause the heart beat channel if it exists.
988 # Pause the heart beat channel if it exists.
976 if self._hb_channel is not None:
989 if self._hb_channel is not None:
977 self._hb_channel.pause()
990 self._hb_channel.pause()
978
991
992 # Stop monitoring for restarting while we shutdown.
993 self.stop_restarter()
994
995 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
996 if sys.platform == 'win32':
997 self._kill_kernel()
998 return
999
979 if now:
1000 if now:
980 if self.has_kernel:
1001 if self.has_kernel:
981 self._kill_kernel()
1002 self._kill_kernel()
@@ -1047,9 +1068,6 b' class KernelManager(Configurable):'
1047 This is a private method, callers should use shutdown_kernel(now=True).
1068 This is a private method, callers should use shutdown_kernel(now=True).
1048 """
1069 """
1049 if self.has_kernel:
1070 if self.has_kernel:
1050 # Pause the heart beat channel if it exists.
1051 if self._hb_channel is not None:
1052 self._hb_channel.pause()
1053
1071
1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1072 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1055 # TerminateProcess() on Win32).
1073 # TerminateProcess() on Win32).
@@ -23,14 +23,12 b' import uuid'
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop import ioloop
27
26
28 from IPython.config.configurable import LoggingConfigurable
27 from IPython.config.configurable import LoggingConfigurable
29 from IPython.utils.importstring import import_item
28 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import (
29 from IPython.utils.traitlets import (
31 Instance, Dict, Unicode, Any, DottedObjectName, Bool
30 Instance, Dict, Unicode, Any, DottedObjectName, Bool
32 )
31 )
33 # from IPython.kernel.kernelrestarter import KernelRestarter
34
32
35 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
36 # Classes
34 # Classes
@@ -44,7 +42,7 b' class MultiKernelManager(LoggingConfigurable):'
44 """A class for managing multiple kernels."""
42 """A class for managing multiple kernels."""
45
43
46 kernel_manager_class = DottedObjectName(
44 kernel_manager_class = DottedObjectName(
47 "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True,
45 "IPython.kernel.ioloopkernelmanager.IOLoopKernelManager", config=True,
48 help="""The kernel manager class. This is configurable to allow
46 help="""The kernel manager class. This is configurable to allow
49 subclassing of the KernelManager for customized behavior.
47 subclassing of the KernelManager for customized behavior.
50 """
48 """
@@ -60,18 +58,9 b' class MultiKernelManager(LoggingConfigurable):'
60 def _context_default(self):
58 def _context_default(self):
61 return zmq.Context.instance()
59 return zmq.Context.instance()
62
60
63 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
64 def _loop_default(self):
65 return ioloop.IOLoop.instance()
66
67 autorestart = Bool(False, config=True,
68 help="""Should we autorestart kernels that die."""
69 )
70
71 connection_dir = Unicode('')
61 connection_dir = Unicode('')
72
62
73 _kernels = Dict()
63 _kernels = Dict()
74 _restarters = Dict()
75
64
76 def list_kernel_ids(self):
65 def list_kernel_ids(self):
77 """Return a list of the kernel ids of the active kernels."""
66 """Return a list of the kernel ids of the active kernels."""
@@ -86,30 +75,6 b' class MultiKernelManager(LoggingConfigurable):'
86 def __contains__(self, kernel_id):
75 def __contains__(self, kernel_id):
87 return kernel_id in self._kernels
76 return kernel_id in self._kernels
88
77
89 def start_restarter(self, kernel_id):
90 km = self.get_kernel(kernel_id)
91 if self.autorestart:
92 kr = self._restarters.get(kernel_id, None)
93 if kr is None:
94 kr = KernelRestarter(
95 kernel_manager=km, loop=self.loop,
96 config=self.config, log=self.log
97 )
98 self._restarters[kernel_id] = kr
99 kr.start()
100
101 def stop_restarter(self, kernel_id):
102 if self.autorestart:
103 kr = self._restarters.get(kernel_id, None)
104 if kr is not None:
105 kr.stop()
106
107 def clear_restarter(self, kernel_id):
108 if self.autorestart:
109 kr = self._restarters.pop(kernel_id, None)
110 if kr is not None:
111 kr.stop()
112
113 def start_kernel(self, **kwargs):
78 def start_kernel(self, **kwargs):
114 """Start a new kernel.
79 """Start a new kernel.
115
80
@@ -129,13 +94,12 b' class MultiKernelManager(LoggingConfigurable):'
129 # including things like its transport and ip.
94 # including things like its transport and ip.
130 km = self.kernel_manager_factory(connection_file=os.path.join(
95 km = self.kernel_manager_factory(connection_file=os.path.join(
131 self.connection_dir, "kernel-%s.json" % kernel_id),
96 self.connection_dir, "kernel-%s.json" % kernel_id),
132 config=self.config,
97 config=self.config, autorestart=True, log=self.log
133 )
98 )
134 km.start_kernel(**kwargs)
99 km.start_kernel(**kwargs)
135 # start just the shell channel, needed for graceful restart
100 # start just the shell channel, needed for graceful restart
136 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
101 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
137 self._kernels[kernel_id] = km
102 self._kernels[kernel_id] = km
138 self.start_restarter(kernel_id)
139 return kernel_id
103 return kernel_id
140
104
141 def shutdown_kernel(self, kernel_id, now=False):
105 def shutdown_kernel(self, kernel_id, now=False):
@@ -149,11 +113,9 b' class MultiKernelManager(LoggingConfigurable):'
149 Should the kernel be shutdown forcibly using a signal.
113 Should the kernel be shutdown forcibly using a signal.
150 """
114 """
151 k = self.get_kernel(kernel_id)
115 k = self.get_kernel(kernel_id)
152 self.stop_restarter(kernel_id)
153 k.shutdown_kernel(now=now)
116 k.shutdown_kernel(now=now)
154 k.shell_channel.stop()
117 k.shell_channel.stop()
155 del self._kernels[kernel_id]
118 del self._kernels[kernel_id]
156 self.clear_restarter(kernel_id)
157
119
158 def shutdown_all(self, now=False):
120 def shutdown_all(self, now=False):
159 """Shutdown all kernels."""
121 """Shutdown all kernels."""
@@ -192,9 +154,7 b' class MultiKernelManager(LoggingConfigurable):'
192 The id of the kernel to interrupt.
154 The id of the kernel to interrupt.
193 """
155 """
194 km = self.get_kernel(kernel_id)
156 km = self.get_kernel(kernel_id)
195 self.stop_restarter(kernel_id)
196 km.restart_kernel()
157 km.restart_kernel()
197 self.start_restarter(kernel_id)
198
158
199 def is_alive(self, kernel_id):
159 def is_alive(self, kernel_id):
200 """Is the kernel alive.
160 """Is the kernel alive.
@@ -12,12 +12,16 b' from IPython.kernel.kernelmanager import KernelManager'
12 class TestKernelManager(TestCase):
12 class TestKernelManager(TestCase):
13
13
14 def _get_tcp_km(self):
14 def _get_tcp_km(self):
15 return KernelManager()
15 c = Config()
16 # c.KernelManager.autorestart=False
17 km = KernelManager(config=c)
18 return km
16
19
17 def _get_ipc_km(self):
20 def _get_ipc_km(self):
18 c = Config()
21 c = Config()
19 c.KernelManager.transport = 'ipc'
22 c.KernelManager.transport = 'ipc'
20 c.KernelManager.ip = 'test'
23 c.KernelManager.ip = 'test'
24 # c.KernelManager.autorestart=False
21 km = KernelManager(config=c)
25 km = KernelManager(config=c)
22 return km
26 return km
23
27
@@ -14,12 +14,16 b' from IPython.kernel.multikernelmanager import MultiKernelManager'
14 class TestKernelManager(TestCase):
14 class TestKernelManager(TestCase):
15
15
16 def _get_tcp_km(self):
16 def _get_tcp_km(self):
17 return MultiKernelManager()
17 c = Config()
18 # c.KernelManager.autorestart=False
19 km = MultiKernelManager(config=c)
20 return km
18
21
19 def _get_ipc_km(self):
22 def _get_ipc_km(self):
20 c = Config()
23 c = Config()
21 c.KernelManager.transport = 'ipc'
24 c.KernelManager.transport = 'ipc'
22 c.KernelManager.ip = 'test'
25 c.KernelManager.ip = 'test'
26 # c.KernelManager.autorestart=False
23 km = MultiKernelManager(config=c)
27 km = MultiKernelManager(config=c)
24 return km
28 return km
25
29
General Comments 0
You need to be logged in to leave comments. Login now