##// END OF EJS Templates
Merge pull request #2775 from ellisonbg/kernelid...
Min RK -
r9155:ef8974d5 merge
parent child Browse files
Show More
@@ -0,0 +1,70 b''
1 """Tests for the notebook kernel and session manager."""
2
3 from subprocess import PIPE
4 import time
5 from unittest import TestCase
6
7 from IPython.config.loader import Config
8 from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager
9 from IPython.zmq.kernelmanager import KernelManager
10
11 class TestKernelManager(TestCase):
12
13 def _get_tcp_km(self):
14 return MultiKernelManager()
15
16 def _get_ipc_km(self):
17 c = Config()
18 c.KernelManager.transport = 'ipc'
19 c.KernelManager.ip = 'test'
20 km = MultiKernelManager(config=c)
21 return km
22
23 def _run_lifecycle(self, km):
24 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
25 self.assertTrue(kid in km)
26 self.assertTrue(kid in km.list_kernel_ids())
27 self.assertEqual(len(km),1)
28 km.restart_kernel(kid)
29 self.assertTrue(kid in km.list_kernel_ids())
30 # We need a delay here to give the restarting kernel a chance to
31 # restart. Otherwise, the interrupt will kill it, causing the test
32 # suite to hang. The reason it *hangs* is that the shutdown
33 # message for the restart sometimes hasn't been sent to the kernel.
34 # Because linger is oo on the shell channel, the context can't
35 # close until the message is sent to the kernel, which is not dead.
36 time.sleep(1.0)
37 km.interrupt_kernel(kid)
38 k = km.get_kernel(kid)
39 self.assertTrue(isinstance(k, KernelManager))
40 km.shutdown_kernel(kid)
41 self.assertTrue(not kid in km)
42
43 def _run_cinfo(self, km, transport, ip):
44 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
45 k = km.get_kernel(kid)
46 cinfo = km.get_connection_info(kid)
47 self.assertEqual(transport, cinfo['transport'])
48 self.assertEqual(ip, cinfo['ip'])
49 self.assertTrue('stdin_port' in cinfo)
50 self.assertTrue('iopub_port' in cinfo)
51 self.assertTrue('shell_port' in cinfo)
52 self.assertTrue('hb_port' in cinfo)
53 km.shutdown_kernel(kid)
54
55 def test_tcp_lifecycle(self):
56 km = self._get_tcp_km()
57 self._run_lifecycle(km)
58
59 def test_tcp_cinfo(self):
60 km = self._get_tcp_km()
61 self._run_cinfo(km, 'tcp', '127.0.0.1')
62
63 def test_ipc_lifecycle(self):
64 km = self._get_ipc_km()
65 self._run_lifecycle(km)
66
67 def test_ipc_cinfo(self):
68 km = self._get_ipc_km()
69 self._run_cinfo(km, 'ipc', 'test')
70
@@ -0,0 +1,226 b''
1 """Abstract base classes for kernel manager and channels."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports.
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.zmq.kernelmanager.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.zmq.kernelmanager.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.zmq.kernelmanager.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.zmq.kernelmanager.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelManagerABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.zmq.kernelmanager.KernelManager`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
194
195 #--------------------------------------------------------------------------
196 # Kernel management
197 #--------------------------------------------------------------------------
198
199 @abc.abstractmethod
200 def start_kernel(self, **kw):
201 pass
202
203 @abc.abstractmethod
204 def shutdown_kernel(self, now=False, restart=False):
205 pass
206
207 @abc.abstractmethod
208 def restart_kernel(self, now=False, **kw):
209 pass
210
211 @abc.abstractproperty
212 def has_kernel(self):
213 pass
214
215 @abc.abstractmethod
216 def interrupt_kernel(self):
217 pass
218
219 @abc.abstractmethod
220 def signal_kernel(self, signum):
221 pass
222
223 @abc.abstractproperty
224 def is_alive(self):
225 pass
226
@@ -0,0 +1,45 b''
1 """Tests for the notebook kernel and session manager"""
2
3 from subprocess import PIPE
4 import time
5 from unittest import TestCase
6
7 from IPython.config.loader import Config
8 from IPython.zmq.kernelmanager import KernelManager
9
10 class TestKernelManager(TestCase):
11
12 def _get_tcp_km(self):
13 return KernelManager()
14
15 def _get_ipc_km(self):
16 c = Config()
17 c.KernelManager.transport = 'ipc'
18 c.KernelManager.ip = 'test'
19 km = KernelManager(config=c)
20 return km
21
22 def _run_lifecycle(self, km):
23 km.start_kernel(stdout=PIPE, stderr=PIPE)
24 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
25 km.restart_kernel()
26 # We need a delay here to give the restarting kernel a chance to
27 # restart. Otherwise, the interrupt will kill it, causing the test
28 # suite to hang. The reason it *hangs* is that the shutdown
29 # message for the restart sometimes hasn't been sent to the kernel.
30 # Because linger is oo on the shell channel, the context can't
31 # close until the message is sent to the kernel, which is not dead.
32 time.sleep(1.0)
33 km.interrupt_kernel()
34 self.assertTrue(isinstance(km, KernelManager))
35 km.shutdown_kernel()
36 km.shell_channel.stop()
37
38 def test_tcp_lifecycle(self):
39 km = self._get_tcp_km()
40 self._run_lifecycle(km)
41
42 def testipc_lifecycle(self):
43 km = self._get_ipc_km()
44 self._run_lifecycle(km)
45
@@ -330,7 +330,7 b' class MainKernelHandler(AuthenticatedHandler):'
330 330 @web.authenticated
331 331 def get(self):
332 332 km = self.application.kernel_manager
333 self.finish(jsonapi.dumps(km.kernel_ids))
333 self.finish(jsonapi.dumps(km.list_kernel_ids()))
334 334
335 335 @web.authenticated
336 336 def post(self):
@@ -364,9 +364,9 b' class KernelActionHandler(AuthenticatedHandler):'
364 364 km.interrupt_kernel(kernel_id)
365 365 self.set_status(204)
366 366 if action == 'restart':
367 new_kernel_id = km.restart_kernel(kernel_id)
368 data = {'ws_url':self.ws_url,'kernel_id':new_kernel_id}
369 self.set_header('Location', '/'+new_kernel_id)
367 km.restart_kernel(kernel_id)
368 data = {'ws_url':self.ws_url, 'kernel_id':kernel_id}
369 self.set_header('Location', '/'+kernel_id)
370 370 self.write(jsonapi.dumps(data))
371 371 self.finish()
372 372
@@ -416,7 +416,7 b' class AuthenticatedZMQStreamHandler(ZMQStreamHandler):'
416 416 def open(self, kernel_id):
417 417 self.kernel_id = kernel_id.decode('ascii')
418 418 try:
419 cfg = self.application.ipython_app.config
419 cfg = self.application.config
420 420 except AttributeError:
421 421 # protect from the case where this is run from something other than
422 422 # the notebook app:
@@ -541,9 +541,13 b' class IOPubHandler(AuthenticatedZMQStreamHandler):'
541 541 if not self.hb_stream.closed():
542 542 self.hb_stream.on_recv(None)
543 543
544 def kernel_died(self):
544 def _delete_kernel_data(self):
545 """Remove the kernel data and notebook mapping."""
545 546 self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
546 self.application.log.error("Kernel %s failed to respond to heartbeat", self.kernel_id)
547
548 def kernel_died(self):
549 self._delete_kernel_data()
550 self.application.log.error("Kernel died: %s" % self.kernel_id)
547 551 self.write_message(
548 552 {'header': {'msg_type': 'status'},
549 553 'parent_header': {},
@@ -17,8 +17,6 b' Authors:'
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 import signal
21 import sys
22 20 import uuid
23 21
24 22 import zmq
@@ -63,56 +61,65 b' class MultiKernelManager(LoggingConfigurable):'
63 61
64 62 _kernels = Dict()
65 63
66 @property
67 def kernel_ids(self):
64 def list_kernel_ids(self):
68 65 """Return a list of the kernel ids of the active kernels."""
69 return self._kernels.keys()
66 # Create a copy so we can iterate over kernels in operations
67 # that delete keys.
68 return list(self._kernels.keys())
70 69
71 70 def __len__(self):
72 71 """Return the number of running kernels."""
73 return len(self.kernel_ids)
72 return len(self.list_kernel_ids())
74 73
75 74 def __contains__(self, kernel_id):
76 if kernel_id in self.kernel_ids:
77 return True
78 else:
79 return False
75 return kernel_id in self._kernels
80 76
81 77 def start_kernel(self, **kwargs):
82 """Start a new kernel."""
83 kernel_id = unicode(uuid.uuid4())
84 # use base KernelManager for each Kernel
78 """Start a new kernel.
79
80 The caller can pick a kernel_id by passing one in as a keyword arg,
81 otherwise one will be picked using a uuid.
82
83 To silence the kernel's stdout/stderr, call this using::
84
85 km.start_kernel(stdout=PIPE, stderr=PIPE)
86
87 """
88 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
89 if kernel_id in self:
90 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
91 # kernel_manager_factory is the constructor for the KernelManager
92 # subclass we are using. It can be configured as any Configurable,
93 # including things like its transport and ip.
85 94 km = self.kernel_manager_factory(connection_file=os.path.join(
86 95 self.connection_dir, "kernel-%s.json" % kernel_id),
87 96 config=self.config,
88 97 )
89 98 km.start_kernel(**kwargs)
90 99 # start just the shell channel, needed for graceful restart
91 km.start_channels(shell=True, sub=False, stdin=False, hb=False)
100 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
92 101 self._kernels[kernel_id] = km
93 102 return kernel_id
94 103
95 def shutdown_kernel(self, kernel_id):
104 def shutdown_kernel(self, kernel_id, now=False):
96 105 """Shutdown a kernel by its kernel uuid.
97 106
98 107 Parameters
99 108 ==========
100 109 kernel_id : uuid
101 110 The id of the kernel to shutdown.
111 now : bool
112 Should the kernel be shutdown forcibly using a signal.
102 113 """
103 self.get_kernel(kernel_id).shutdown_kernel()
114 k = self.get_kernel(kernel_id)
115 k.shutdown_kernel(now=now)
116 k.shell_channel.stop()
104 117 del self._kernels[kernel_id]
105 118
106 def kill_kernel(self, kernel_id):
107 """Kill a kernel by its kernel uuid.
108
109 Parameters
110 ==========
111 kernel_id : uuid
112 The id of the kernel to kill.
113 """
114 self.get_kernel(kernel_id).kill_kernel()
115 del self._kernels[kernel_id]
119 def shutdown_all(self, now=False):
120 """Shutdown all kernels."""
121 for kid in self.list_kernel_ids():
122 self.shutdown_kernel(kid, now=now)
116 123
117 124 def interrupt_kernel(self, kernel_id):
118 125 """Interrupt (SIGINT) the kernel by its uuid.
@@ -125,7 +132,7 b' class MultiKernelManager(LoggingConfigurable):'
125 132 return self.get_kernel(kernel_id).interrupt_kernel()
126 133
127 134 def signal_kernel(self, kernel_id, signum):
128 """ Sends a signal to the kernel by its uuid.
135 """Sends a signal to the kernel by its uuid.
129 136
130 137 Note that since only SIGTERM is supported on Windows, this function
131 138 is only useful on Unix systems.
@@ -161,8 +168,8 b' class MultiKernelManager(LoggingConfigurable):'
161 168 else:
162 169 raise KeyError("Kernel with id not found: %s" % kernel_id)
163 170
164 def get_kernel_ports(self, kernel_id):
165 """Return a dictionary of ports for a kernel.
171 def get_connection_info(self, kernel_id):
172 """Return a dictionary of connection data for a kernel.
166 173
167 174 Parameters
168 175 ==========
@@ -171,21 +178,39 b' class MultiKernelManager(LoggingConfigurable):'
171 178
172 179 Returns
173 180 =======
174 port_dict : dict
175 A dict of key, value pairs where the keys are the names
176 (stdin_port,iopub_port,shell_port) and the values are the
177 integer port numbers for those channels.
181 connection_dict : dict
182 A dict of the information needed to connect to a kernel.
183 This includes the ip address and the integer port
184 numbers of the different channels (stdin_port, iopub_port,
185 shell_port, hb_port).
178 186 """
179 # this will raise a KeyError if not found:
180 187 km = self.get_kernel(kernel_id)
181 return dict(shell_port=km.shell_port,
188 return dict(transport=km.transport,
189 ip=km.ip,
190 shell_port=km.shell_port,
182 191 iopub_port=km.iopub_port,
183 192 stdin_port=km.stdin_port,
184 193 hb_port=km.hb_port,
185 )
194 )
195
196 def _make_url(self, transport, ip, port):
197 """Make a ZeroMQ URL for a given transport, ip and port."""
198 if transport == 'tcp':
199 return "tcp://%s:%i" % (ip, port)
200 else:
201 return "%s://%s-%s" % (transport, ip, port)
202
203 def _create_connected_stream(self, kernel_id, socket_type):
204 """Create a connected ZMQStream for a kernel."""
205 cinfo = self.get_connection_info(kernel_id)
206 url = self._make_url(cinfo['transport'], cinfo['ip'], cinfo['port'])
207 sock = self.context.socket(socket_type)
208 self.log.info("Connecting to: %s" % url)
209 sock.connect(url)
210 return ZMQStream(sock)
186 211
187 def get_kernel_ip(self, kernel_id):
188 """Return ip address for a kernel.
212 def create_iopub_stream(self, kernel_id):
213 """Return a ZMQStream object connected to the iopub channel.
189 214
190 215 Parameters
191 216 ==========
@@ -194,35 +219,40 b' class MultiKernelManager(LoggingConfigurable):'
194 219
195 220 Returns
196 221 =======
197 ip : str
198 The ip address of the kernel.
222 stream : ZMQStream
199 223 """
200 return self.get_kernel(kernel_id).ip
201
202 def create_connected_stream(self, ip, port, socket_type):
203 sock = self.context.socket(socket_type)
204 addr = "tcp://%s:%i" % (ip, port)
205 self.log.info("Connecting to: %s" % addr)
206 sock.connect(addr)
207 return ZMQStream(sock)
208
209 def create_iopub_stream(self, kernel_id):
210 ip = self.get_kernel_ip(kernel_id)
211 ports = self.get_kernel_ports(kernel_id)
212 iopub_stream = self.create_connected_stream(ip, ports['iopub_port'], zmq.SUB)
224 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB)
213 225 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
214 226 return iopub_stream
215 227
216 228 def create_shell_stream(self, kernel_id):
217 ip = self.get_kernel_ip(kernel_id)
218 ports = self.get_kernel_ports(kernel_id)
219 shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.DEALER)
229 """Return a ZMQStream object connected to the shell channel.
230
231 Parameters
232 ==========
233 kernel_id : uuid
234 The id of the kernel.
235
236 Returns
237 =======
238 stream : ZMQStream
239 """
240 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER)
220 241 return shell_stream
221 242
222 243 def create_hb_stream(self, kernel_id):
223 ip = self.get_kernel_ip(kernel_id)
224 ports = self.get_kernel_ports(kernel_id)
225 hb_stream = self.create_connected_stream(ip, ports['hb_port'], zmq.REQ)
244 """Return a ZMQStream object connected to the hb channel.
245
246 Parameters
247 ==========
248 kernel_id : uuid
249 The id of the kernel.
250
251 Returns
252 =======
253 stream : ZMQStream
254 """
255 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ)
226 256 return hb_stream
227 257
228 258
@@ -289,20 +319,15 b' class MappingKernelManager(MultiKernelManager):'
289 319 self.log.info("Using existing kernel: %s" % kernel_id)
290 320 return kernel_id
291 321
292 def shutdown_kernel(self, kernel_id):
322 def shutdown_kernel(self, kernel_id, now=False):
293 323 """Shutdown a kernel and remove its notebook association."""
294 324 self._check_kernel_id(kernel_id)
295 super(MappingKernelManager, self).shutdown_kernel(kernel_id)
325 super(MappingKernelManager, self).shutdown_kernel(
326 kernel_id, now=now
327 )
296 328 self.delete_mapping_for_kernel(kernel_id)
297 329 self.log.info("Kernel shutdown: %s" % kernel_id)
298 330
299 def kill_kernel(self, kernel_id):
300 """Kill a kernel and remove its notebook association."""
301 self._check_kernel_id(kernel_id)
302 super(MappingKernelManager, self).kill_kernel(kernel_id)
303 self.delete_mapping_for_kernel(kernel_id)
304 self.log.info("Kernel killed: %s" % kernel_id)
305
306 331 def interrupt_kernel(self, kernel_id):
307 332 """Interrupt a kernel."""
308 333 self._check_kernel_id(kernel_id)
@@ -314,7 +339,6 b' class MappingKernelManager(MultiKernelManager):'
314 339 self._check_kernel_id(kernel_id)
315 340 super(MappingKernelManager, self).restart_kernel(kernel_id)
316 341 self.log.info("Kernel restarted: %s" % kernel_id)
317 return kernel_id
318 342
319 343 def create_iopub_stream(self, kernel_id):
320 344 """Create a new iopub stream."""
@@ -187,6 +187,7 b' class NotebookWebApplication(web.Application):'
187 187 self.cluster_manager = cluster_manager
188 188 self.ipython_app = ipython_app
189 189 self.read_only = self.ipython_app.read_only
190 self.config = self.ipython_app.config
190 191 self.log = log
191 192 self.jinja2_env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), "templates")))
192 193
@@ -591,16 +592,13 b' class NotebookApp(BaseIPythonApplication):'
591 592 self.init_signal()
592 593
593 594 def cleanup_kernels(self):
594 """shutdown all kernels
595 """Shutdown all kernels.
595 596
596 597 The kernels will shutdown themselves when this process no longer exists,
597 598 but explicit shutdown allows the KernelManagers to cleanup the connection files.
598 599 """
599 600 self.log.info('Shutting down kernels')
600 km = self.kernel_manager
601 # copy list, since shutdown_kernel deletes keys
602 for kid in list(km.kernel_ids):
603 km.shutdown_kernel(kid)
601 self.kernel_manager.shutdown_all()
604 602
605 603 def start(self):
606 604 ip = self.ip if self.ip else '[all ip addresses on your system]'
@@ -30,7 +30,7 b' class BaseFrontendMixin(object):'
30 30 old_manager.stopped_channels.disconnect(self._stopped_channels)
31 31
32 32 # Disconnect the old kernel manager's channels.
33 old_manager.sub_channel.message_received.disconnect(self._dispatch)
33 old_manager.iopub_channel.message_received.disconnect(self._dispatch)
34 34 old_manager.shell_channel.message_received.disconnect(self._dispatch)
35 35 old_manager.stdin_channel.message_received.disconnect(self._dispatch)
36 36 old_manager.hb_channel.kernel_died.disconnect(
@@ -51,7 +51,7 b' class BaseFrontendMixin(object):'
51 51 kernel_manager.stopped_channels.connect(self._stopped_channels)
52 52
53 53 # Connect the new kernel manager's channels.
54 kernel_manager.sub_channel.message_received.connect(self._dispatch)
54 kernel_manager.iopub_channel.message_received.connect(self._dispatch)
55 55 kernel_manager.shell_channel.message_received.connect(self._dispatch)
56 56 kernel_manager.stdin_channel.message_received.connect(self._dispatch)
57 57 kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died)
@@ -69,7 +69,7 b' class QtShellChannelMixin(ChannelQObject):'
69 69 _handlers_called = False
70 70
71 71 #---------------------------------------------------------------------------
72 # 'ShellSocketChannel' interface
72 # 'ShellChannel' interface
73 73 #---------------------------------------------------------------------------
74 74
75 75 def call_handlers(self, msg):
@@ -98,7 +98,7 b' class QtShellChannelMixin(ChannelQObject):'
98 98 self._handlers_called = False
99 99
100 100
101 class QtSubChannelMixin(ChannelQObject):
101 class QtIOPubChannelMixin(ChannelQObject):
102 102
103 103 # Emitted when any message is received.
104 104 message_received = QtCore.Signal(object)
@@ -126,7 +126,7 b' class QtSubChannelMixin(ChannelQObject):'
126 126 shutdown_reply_received = QtCore.Signal(object)
127 127
128 128 #---------------------------------------------------------------------------
129 # 'SubSocketChannel' interface
129 # 'IOPubChannel' interface
130 130 #---------------------------------------------------------------------------
131 131
132 132 def call_handlers(self, msg):
@@ -145,7 +145,7 b' class QtSubChannelMixin(ChannelQObject):'
145 145 def flush(self):
146 146 """ Reimplemented to ensure that signals are dispatched immediately.
147 147 """
148 super(QtSubChannelMixin, self).flush()
148 super(QtIOPubChannelMixin, self).flush()
149 149 QtCore.QCoreApplication.instance().processEvents()
150 150
151 151
@@ -158,7 +158,7 b' class QtStdInChannelMixin(ChannelQObject):'
158 158 input_requested = QtCore.Signal(object)
159 159
160 160 #---------------------------------------------------------------------------
161 # 'StdInSocketChannel' interface
161 # 'StdInChannel' interface
162 162 #---------------------------------------------------------------------------
163 163
164 164 def call_handlers(self, msg):
@@ -179,7 +179,7 b' class QtHBChannelMixin(ChannelQObject):'
179 179 kernel_died = QtCore.Signal(object)
180 180
181 181 #---------------------------------------------------------------------------
182 # 'HBSocketChannel' interface
182 # 'HBChannel' interface
183 183 #---------------------------------------------------------------------------
184 184
185 185 def call_handlers(self, since_last_heartbeat):
@@ -205,7 +205,7 b' class QtKernelManagerMixin(HasTraits, SuperQObject):'
205 205 stopped_channels = QtCore.Signal()
206 206
207 207 # Use Qt-specific channel classes that emit signals.
208 sub_channel_class = Type(QtSubChannelMixin)
208 iopub_channel_class = Type(QtIOPubChannelMixin)
209 209 shell_channel_class = Type(QtShellChannelMixin)
210 210 stdin_channel_class = Type(QtStdInChannelMixin)
211 211 hb_channel_class = Type(QtHBChannelMixin)
@@ -396,7 +396,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
396 396 if info and info.kind == 'user' and not self._hidden:
397 397 # Make sure that all output from the SUB channel has been processed
398 398 # before writing a new prompt.
399 self.kernel_manager.sub_channel.flush()
399 self.kernel_manager.iopub_channel.flush()
400 400
401 401 # Reset the ANSI style information to prevent bad text in stdout
402 402 # from messing up our colors. We're not a true terminal so we're
@@ -431,7 +431,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
431 431
432 432 # Make sure that all output from the SUB channel has been processed
433 433 # before entering readline mode.
434 self.kernel_manager.sub_channel.flush()
434 self.kernel_manager.iopub_channel.flush()
435 435
436 436 def callback(line):
437 437 self.kernel_manager.stdin_channel.input(line)
@@ -3,23 +3,23 b''
3 3
4 4 # Local imports.
5 5 from IPython.inprocess.kernelmanager import \
6 ShellInProcessChannel, SubInProcessChannel, StdInInProcessChannel, \
7 HBInProcessChannel, InProcessKernelManager
6 InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, \
7 InProcessHBChannel, InProcessKernelManager
8 8 from IPython.utils.traitlets import Type
9 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
9 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
10 10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 11
12 12
13 class QtShellInProcessChannel(QtShellChannelMixin, ShellInProcessChannel):
13 class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel):
14 14 pass
15 15
16 class QtSubInProcessChannel(QtSubChannelMixin, SubInProcessChannel):
16 class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel):
17 17 pass
18 18
19 class QtStdInInProcessChannel(QtStdInChannelMixin, StdInInProcessChannel):
19 class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel):
20 20 pass
21 21
22 class QtHBInProcessChannel(QtHBChannelMixin, HBInProcessChannel):
22 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
23 23 pass
24 24
25 25
@@ -27,7 +27,7 b' class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):'
27 27 """ An in-process KernelManager with signals and slots.
28 28 """
29 29
30 sub_channel_class = Type(QtSubInProcessChannel)
31 shell_channel_class = Type(QtShellInProcessChannel)
32 stdin_channel_class = Type(QtStdInInProcessChannel)
33 hb_channel_class = Type(QtHBInProcessChannel)
30 iopub_channel_class = Type(QtInProcessIOPubChannel)
31 shell_channel_class = Type(QtInProcessShellChannel)
32 stdin_channel_class = Type(QtInProcessStdInChannel)
33 hb_channel_class = Type(QtInProcessHBChannel)
@@ -3,22 +3,22 b''
3 3
4 4 # Local imports.
5 5 from IPython.utils.traitlets import Type
6 from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
7 StdInSocketChannel, HBSocketChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
6 from IPython.zmq.kernelmanager import ShellChannel, IOPubChannel, \
7 StdInChannel, HBChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
9 9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
10 10
11 11
12 class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
12 class QtShellChannel(QtShellChannelMixin, ShellChannel):
13 13 pass
14 14
15 class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
15 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
16 16 pass
17 17
18 class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
18 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
19 19 pass
20 20
21 class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
21 class QtHBChannel(QtHBChannelMixin, HBChannel):
22 22 pass
23 23
24 24
@@ -26,7 +26,7 b' class QtKernelManager(QtKernelManagerMixin, KernelManager):'
26 26 """ A KernelManager that provides signals and slots.
27 27 """
28 28
29 sub_channel_class = Type(QtSubSocketChannel)
30 shell_channel_class = Type(QtShellSocketChannel)
31 stdin_channel_class = Type(QtStdInSocketChannel)
32 hb_channel_class = Type(QtHBSocketChannel)
29 iopub_channel_class = Type(QtIOPubChannel)
30 shell_channel_class = Type(QtShellChannel)
31 stdin_channel_class = Type(QtStdInChannel)
32 hb_channel_class = Type(QtHBChannel)
@@ -211,8 +211,8 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
211 211 sub_msg: message receive from kernel in the sub socket channel
212 212 capture by kernel manager.
213 213 """
214 while self.km.sub_channel.msg_ready():
215 sub_msg = self.km.sub_channel.get_msg()
214 while self.km.iopub_channel.msg_ready():
215 sub_msg = self.km.iopub_channel.get_msg()
216 216 msg_type = sub_msg['header']['msg_type']
217 217 parent = sub_msg["parent_header"]
218 218 if (not parent) or self.session_id == parent['session']:
@@ -14,58 +14,25 b' Useful for test suites and blocking terminal interfaces.'
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 # Standard library imports.
18 import Queue
19 from threading import Event
20
21 17 # Local imports.
22 18 from IPython.utils.io import raw_print
23 19 from IPython.utils.traitlets import Type
24 from kernelmanager import InProcessKernelManager, ShellInProcessChannel, \
25 SubInProcessChannel, StdInInProcessChannel
26
27 #-----------------------------------------------------------------------------
28 # Utility classes
29 #-----------------------------------------------------------------------------
20 from kernelmanager import InProcessKernelManager, InProcessShellChannel, \
21 InProcessIOPubChannel, InProcessStdInChannel
22 from IPython.zmq.blockingkernelmanager import BlockingChannelMixin
30 23
31 class BlockingChannelMixin(object):
32
33 def __init__(self, *args, **kwds):
34 super(BlockingChannelMixin, self).__init__(*args, **kwds)
35 self._in_queue = Queue.Queue()
36
37 def call_handlers(self, msg):
38 self._in_queue.put(msg)
39
40 def get_msg(self, block=True, timeout=None):
41 """ Gets a message if there is one that is ready. """
42 return self._in_queue.get(block, timeout)
43
44 def get_msgs(self):
45 """ Get all messages that are currently ready. """
46 msgs = []
47 while True:
48 try:
49 msgs.append(self.get_msg(block=False))
50 except Queue.Empty:
51 break
52 return msgs
53
54 def msg_ready(self):
55 """ Is there a message that has been received? """
56 return not self._in_queue.empty()
57 24
58 25 #-----------------------------------------------------------------------------
59 26 # Blocking kernel manager
60 27 #-----------------------------------------------------------------------------
61 28
62 class BlockingShellInProcessChannel(BlockingChannelMixin, ShellInProcessChannel):
29 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
63 30 pass
64 31
65 class BlockingSubInProcessChannel(BlockingChannelMixin, SubInProcessChannel):
32 class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
66 33 pass
67 34
68 class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel):
35 class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):
69 36
70 37 def call_handlers(self, msg):
71 38 """ Overridden for the in-process channel.
@@ -82,6 +49,6 b' class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel)'
82 49 class BlockingInProcessKernelManager(InProcessKernelManager):
83 50
84 51 # The classes to use for the various channels.
85 shell_channel_class = Type(BlockingShellInProcessChannel)
86 sub_channel_class = Type(BlockingSubInProcessChannel)
87 stdin_channel_class = Type(BlockingStdInInProcessChannel)
52 shell_channel_class = Type(BlockingInProcessShellChannel)
53 iopub_channel_class = Type(BlockingInProcessIOPubChannel)
54 stdin_channel_class = Type(BlockingInProcessStdInChannel)
@@ -123,7 +123,7 b' class InProcessKernel(Kernel):'
123 123 """
124 124 ident, msg = self.session.recv(self.iopub_socket, copy=False)
125 125 for frontend in self.frontends:
126 frontend.sub_channel.call_handlers(msg)
126 frontend.iopub_channel.call_handlers(msg)
127 127
128 128 #------ Trait initializers -----------------------------------------------
129 129
@@ -12,17 +12,21 b''
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # Local imports.
15 from IPython.config.loader import Config
15 from IPython.config.configurable import Configurable
16 16 from IPython.inprocess.socket import DummySocket
17 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
17 from IPython.utils.traitlets import Any, Instance, Type
18 from IPython.zmq.kernelmanagerabc import (
19 ShellChannelABC, IOPubChannelABC,
20 HBChannelABC, StdInChannelABC,
21 KernelManagerABC
22 )
18 23
19 24 #-----------------------------------------------------------------------------
20 25 # Channel classes
21 26 #-----------------------------------------------------------------------------
22 27
23 28 class InProcessChannel(object):
24 """ Base class for in-process channels.
25 """
29 """Base class for in-process channels."""
26 30
27 31 def __init__(self, manager):
28 32 super(InProcessChannel, self).__init__()
@@ -71,9 +75,8 b' class InProcessChannel(object):'
71 75 raise NotImplementedError
72 76
73 77
74 class ShellInProcessChannel(InProcessChannel):
75 """The DEALER channel for issues request/replies to the kernel.
76 """
78 class InProcessShellChannel(InProcessChannel):
79 """See `IPython.zmq.kernelmanager.ShellChannel` for docstrings."""
77 80
78 81 # flag for whether execute requests should be allowed to call raw_input
79 82 allow_stdin = True
@@ -84,42 +87,6 b' class ShellInProcessChannel(InProcessChannel):'
84 87
85 88 def execute(self, code, silent=False, store_history=True,
86 89 user_variables=[], user_expressions={}, allow_stdin=None):
87 """Execute code in the kernel.
88
89 Parameters
90 ----------
91 code : str
92 A string of Python code.
93
94 silent : bool, optional (default False)
95 If set, the kernel will execute the code as quietly possible, and
96 will force store_history to be False.
97
98 store_history : bool, optional (default True)
99 If set, the kernel will store command history. This is forced
100 to be False if silent is True.
101
102 user_variables : list, optional
103 A list of variable names to pull from the user's namespace. They
104 will come back as a dict with these names as keys and their
105 :func:`repr` as values.
106
107 user_expressions : dict, optional
108 A dict mapping names to expressions to be evaluated in the user's
109 dict. The expression values are returned as strings formatted using
110 :func:`repr`.
111
112 allow_stdin : bool, optional (default self.allow_stdin)
113 Flag for whether the kernel can send stdin requests to frontends.
114
115 Some frontends (e.g. the Notebook) do not support stdin requests.
116 If raw_input is called from code executed from such a frontend, a
117 StdinNotImplementedError will be raised.
118
119 Returns
120 -------
121 The msg_id of the message sent.
122 """
123 90 if allow_stdin is None:
124 91 allow_stdin = self.allow_stdin
125 92 content = dict(code=code, silent=silent, store_history=store_history,
@@ -131,81 +98,18 b' class ShellInProcessChannel(InProcessChannel):'
131 98 return msg['header']['msg_id']
132 99
133 100 def complete(self, text, line, cursor_pos, block=None):
134 """Tab complete text in the kernel's namespace.
135
136 Parameters
137 ----------
138 text : str
139 The text to complete.
140 line : str
141 The full line of text that is the surrounding context for the
142 text to complete.
143 cursor_pos : int
144 The position of the cursor in the line where the completion was
145 requested.
146 block : str, optional
147 The full block of code in which the completion is being requested.
148
149 Returns
150 -------
151 The msg_id of the message sent.
152 """
153 101 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
154 102 msg = self.manager.session.msg('complete_request', content)
155 103 self._dispatch_to_kernel(msg)
156 104 return msg['header']['msg_id']
157 105
158 106 def object_info(self, oname, detail_level=0):
159 """Get metadata information about an object.
160
161 Parameters
162 ----------
163 oname : str
164 A string specifying the object name.
165 detail_level : int, optional
166 The level of detail for the introspection (0-2)
167
168 Returns
169 -------
170 The msg_id of the message sent.
171 """
172 107 content = dict(oname=oname, detail_level=detail_level)
173 108 msg = self.manager.session.msg('object_info_request', content)
174 109 self._dispatch_to_kernel(msg)
175 110 return msg['header']['msg_id']
176 111
177 112 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
178 """Get entries from the history list.
179
180 Parameters
181 ----------
182 raw : bool
183 If True, return the raw input.
184 output : bool
185 If True, then return the output as well.
186 hist_access_type : str
187 'range' (fill in session, start and stop params), 'tail' (fill in n)
188 or 'search' (fill in pattern param).
189
190 session : int
191 For a range request, the session from which to get lines. Session
192 numbers are positive integers; negative ones count back from the
193 current session.
194 start : int
195 The first line number of a history range.
196 stop : int
197 The final (excluded) line number of a history range.
198
199 n : int
200 The number of lines of history to get for a tail request.
201
202 pattern : str
203 The glob-syntax pattern for a search request.
204
205 Returns
206 -------
207 The msg_id of the message sent.
208 """
209 113 content = dict(raw=raw, output=output,
210 114 hist_access_type=hist_access_type, **kwds)
211 115 msg = self.manager.session.msg('history_request', content)
@@ -213,10 +117,6 b' class ShellInProcessChannel(InProcessChannel):'
213 117 return msg['header']['msg_id']
214 118
215 119 def shutdown(self, restart=False):
216 """ Request an immediate kernel shutdown.
217
218 A dummy method for the in-process kernel.
219 """
220 120 # FIXME: What to do here?
221 121 raise NotImplementedError('Cannot shutdown in-process kernel')
222 122
@@ -240,49 +140,39 b' class ShellInProcessChannel(InProcessChannel):'
240 140 self.call_handlers_later(reply_msg)
241 141
242 142
243 class SubInProcessChannel(InProcessChannel):
244 """The SUB channel which listens for messages that the kernel publishes.
245 """
143 class InProcessIOPubChannel(InProcessChannel):
144 """See `IPython.zmq.kernelmanager.IOPubChannel` for docstrings."""
246 145
247 146 def flush(self, timeout=1.0):
248 """ Immediately processes all pending messages on the SUB channel.
249
250 A dummy method for the in-process kernel.
251 """
252 147 pass
253 148
254 149
255 class StdInInProcessChannel(InProcessChannel):
256 """ A reply channel to handle raw_input requests that the kernel makes. """
150 class InProcessStdInChannel(InProcessChannel):
151 """See `IPython.zmq.kernelmanager.StdInChannel` for docstrings."""
257 152
258 153 def input(self, string):
259 """ Send a string of raw input to the kernel.
260 """
261 154 kernel = self.manager.kernel
262 155 if kernel is None:
263 156 raise RuntimeError('Cannot send input reply. No kernel exists.')
264 157 kernel.raw_input_str = string
265 158
266 159
267 class HBInProcessChannel(InProcessChannel):
268 """ A dummy heartbeat channel. """
160 class InProcessHBChannel(InProcessChannel):
161 """See `IPython.zmq.kernelmanager.HBChannel` for docstrings."""
269 162
270 163 time_to_dead = 3.0
271 164
272 165 def __init__(self, *args, **kwds):
273 super(HBInProcessChannel, self).__init__(*args, **kwds)
166 super(InProcessHBChannel, self).__init__(*args, **kwds)
274 167 self._pause = True
275 168
276 169 def pause(self):
277 """ Pause the heartbeat. """
278 170 self._pause = True
279 171
280 172 def unpause(self):
281 """ Unpause the heartbeat. """
282 173 self._pause = False
283 174
284 175 def is_beating(self):
285 """ Is the heartbeat running and responsive (and not paused). """
286 176 return not self._pause
287 177
288 178
@@ -290,15 +180,15 b' class HBInProcessChannel(InProcessChannel):'
290 180 # Main kernel manager class
291 181 #-----------------------------------------------------------------------------
292 182
293 class InProcessKernelManager(HasTraits):
294 """ A manager for an in-process kernel.
183 class InProcessKernelManager(Configurable):
184 """A manager for an in-process kernel.
295 185
296 This class implements most of the interface of
297 ``IPython.zmq.kernelmanager.KernelManager`` and allows (asynchronous)
298 frontends to be used seamlessly with an in-process kernel.
186 This class implements the interface of
187 `IPython.zmq.kernelmanagerabc.KernelManagerABC` and allows
188 (asynchronous) frontends to be used seamlessly with an in-process kernel.
189
190 See `IPython.zmq.kernelmanager.KernelManager` for docstrings.
299 191 """
300 # Config object for passing to child configurables
301 config = Instance(Config)
302 192
303 193 # The Session to use for building messages.
304 194 session = Instance('IPython.zmq.session.Session')
@@ -310,28 +200,26 b' class InProcessKernelManager(HasTraits):'
310 200 kernel = Instance('IPython.inprocess.ipkernel.InProcessKernel')
311 201
312 202 # The classes to use for the various channels.
313 shell_channel_class = Type(ShellInProcessChannel)
314 sub_channel_class = Type(SubInProcessChannel)
315 stdin_channel_class = Type(StdInInProcessChannel)
316 hb_channel_class = Type(HBInProcessChannel)
203 shell_channel_class = Type(InProcessShellChannel)
204 iopub_channel_class = Type(InProcessIOPubChannel)
205 stdin_channel_class = Type(InProcessStdInChannel)
206 hb_channel_class = Type(InProcessHBChannel)
317 207
318 208 # Protected traits.
319 209 _shell_channel = Any
320 _sub_channel = Any
210 _iopub_channel = Any
321 211 _stdin_channel = Any
322 212 _hb_channel = Any
323 213
324 214 #--------------------------------------------------------------------------
325 # Channel management methods:
215 # Channel management methods.
326 216 #--------------------------------------------------------------------------
327 217
328 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
329 """ Starts the channels for this kernel.
330 """
218 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
331 219 if shell:
332 220 self.shell_channel.start()
333 if sub:
334 self.sub_channel.start()
221 if iopub:
222 self.iopub_channel.start()
335 223 if stdin:
336 224 self.stdin_channel.start()
337 225 self.shell_channel.allow_stdin = True
@@ -341,12 +229,10 b' class InProcessKernelManager(HasTraits):'
341 229 self.hb_channel.start()
342 230
343 231 def stop_channels(self):
344 """ Stops all the running channels for this kernel.
345 """
346 232 if self.shell_channel.is_alive():
347 233 self.shell_channel.stop()
348 if self.sub_channel.is_alive():
349 self.sub_channel.stop()
234 if self.iopub_channel.is_alive():
235 self.iopub_channel.stop()
350 236 if self.stdin_channel.is_alive():
351 237 self.stdin_channel.stop()
352 238 if self.hb_channel.is_alive():
@@ -354,90 +240,74 b' class InProcessKernelManager(HasTraits):'
354 240
355 241 @property
356 242 def channels_running(self):
357 """ Are any of the channels created and running? """
358 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
243 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
359 244 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
360 245
246 @property
247 def shell_channel(self):
248 if self._shell_channel is None:
249 self._shell_channel = self.shell_channel_class(self)
250 return self._shell_channel
251
252 @property
253 def iopub_channel(self):
254 if self._iopub_channel is None:
255 self._iopub_channel = self.iopub_channel_class(self)
256 return self._iopub_channel
257
258 @property
259 def stdin_channel(self):
260 if self._stdin_channel is None:
261 self._stdin_channel = self.stdin_channel_class(self)
262 return self._stdin_channel
263
264 @property
265 def hb_channel(self):
266 if self._hb_channel is None:
267 self._hb_channel = self.hb_channel_class(self)
268 return self._hb_channel
269
361 270 #--------------------------------------------------------------------------
362 271 # Kernel management methods:
363 272 #--------------------------------------------------------------------------
364 273
365 274 def start_kernel(self, **kwds):
366 """ Starts a kernel process and configures the manager to use it.
367 """
368 275 from IPython.inprocess.ipkernel import InProcessKernel
369 276 self.kernel = InProcessKernel()
370 277 self.kernel.frontends.append(self)
371 278
372 279 def shutdown_kernel(self):
373 """ Attempts to the stop the kernel process cleanly. If the kernel
374 cannot be stopped and the kernel is local, it is killed.
375 """
376 self.kill_kernel()
280 self._kill_kernel()
377 281
378 282 def restart_kernel(self, now=False, **kwds):
379 """ Restarts a kernel with the arguments that were used to launch it.
380
381 The 'now' parameter is ignored.
382 """
383 283 self.shutdown_kernel()
384 284 self.start_kernel(**kwds)
385 285
386 286 @property
387 287 def has_kernel(self):
388 """ Returns whether a kernel process has been specified for the kernel
389 manager.
390 """
391 288 return self.kernel is not None
392 289
393 def kill_kernel(self):
394 """ Kill the running kernel.
395 """
290 def _kill_kernel(self):
396 291 self.kernel.frontends.remove(self)
397 292 self.kernel = None
398 293
399 294 def interrupt_kernel(self):
400 """ Interrupts the kernel. """
401 295 raise NotImplementedError("Cannot interrupt in-process kernel.")
402 296
403 297 def signal_kernel(self, signum):
404 """ Sends a signal to the kernel. """
405 298 raise NotImplementedError("Cannot signal in-process kernel.")
406 299
407 300 @property
408 301 def is_alive(self):
409 """ Is the kernel process still running? """
410 302 return True
411 303
412 #--------------------------------------------------------------------------
413 # Channels used for communication with the kernel:
414 #--------------------------------------------------------------------------
415 304
416 @property
417 def shell_channel(self):
418 """Get the REQ socket channel object to make requests of the kernel."""
419 if self._shell_channel is None:
420 self._shell_channel = self.shell_channel_class(self)
421 return self._shell_channel
422
423 @property
424 def sub_channel(self):
425 """Get the SUB socket channel object."""
426 if self._sub_channel is None:
427 self._sub_channel = self.sub_channel_class(self)
428 return self._sub_channel
429
430 @property
431 def stdin_channel(self):
432 """Get the REP socket channel object to handle stdin (raw_input)."""
433 if self._stdin_channel is None:
434 self._stdin_channel = self.stdin_channel_class(self)
435 return self._stdin_channel
305 #-----------------------------------------------------------------------------
306 # ABC Registration
307 #-----------------------------------------------------------------------------
436 308
437 @property
438 def hb_channel(self):
439 """Get the heartbeat socket channel object to check that the
440 kernel is alive."""
441 if self._hb_channel is None:
442 self._hb_channel = self.hb_channel_class(self)
443 return self._hb_channel
309 ShellChannelABC.register(InProcessShellChannel)
310 IOPubChannelABC.register(InProcessIOPubChannel)
311 HBChannelABC.register(InProcessHBChannel)
312 StdInChannelABC.register(InProcessStdInChannel)
313 KernelManagerABC.register(InProcessKernelManager)
@@ -80,7 +80,7 b' def get_stream_message(kernel_manager, timeout=5):'
80 80 """ Gets a single stream message synchronously from the sub channel.
81 81 """
82 82 while True:
83 msg = kernel_manager.sub_channel.get_msg(timeout=timeout)
83 msg = kernel_manager.iopub_channel.get_msg(timeout=timeout)
84 84 if msg['header']['msg_type'] == 'stream':
85 85 return msg
86 86
@@ -13,26 +13,58 b' Useful for test suites and blocking terminal interfaces.'
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 # Local imports.
17 from IPython.inprocess.blockingkernelmanager import BlockingChannelMixin
16 import Queue
17
18 18 from IPython.utils.traitlets import Type
19 from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
20 ShellSocketChannel, StdInSocketChannel
19 from kernelmanager import KernelManager, IOPubChannel, HBChannel, \
20 ShellChannel, StdInChannel
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # Blocking kernel manager
24 24 #-----------------------------------------------------------------------------
25 25
26 class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
26
27 class BlockingChannelMixin(object):
28
29 def __init__(self, *args, **kwds):
30 super(BlockingChannelMixin, self).__init__(*args, **kwds)
31 self._in_queue = Queue.Queue()
32
33 def call_handlers(self, msg):
34 self._in_queue.put(msg)
35
36 def get_msg(self, block=True, timeout=None):
37 """ Gets a message if there is one that is ready. """
38 return self._in_queue.get(block, timeout)
39
40 def get_msgs(self):
41 """ Get all messages that are currently ready. """
42 msgs = []
43 while True:
44 try:
45 msgs.append(self.get_msg(block=False))
46 except Queue.Empty:
47 break
48 return msgs
49
50 def msg_ready(self):
51 """ Is there a message that has been received? """
52 return not self._in_queue.empty()
53
54
55 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
27 56 pass
28 57
29 class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
58
59 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
30 60 pass
31 61
32 class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
62
63 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
33 64 pass
34 65
35 class BlockingHBSocketChannel(HBSocketChannel):
66
67 class BlockingHBChannel(HBChannel):
36 68
37 69 # This kernel needs quicker monitoring, shorten to 1 sec.
38 70 # less than 0.5s is unreliable, and will get occasional
@@ -43,11 +75,12 b' class BlockingHBSocketChannel(HBSocketChannel):'
43 75 """ Pause beating on missed heartbeat. """
44 76 pass
45 77
78
46 79 class BlockingKernelManager(KernelManager):
47 80
48 81 # The classes to use for the various channels.
49 shell_channel_class = Type(BlockingShellSocketChannel)
50 sub_channel_class = Type(BlockingSubSocketChannel)
51 stdin_channel_class = Type(BlockingStdInSocketChannel)
52 hb_channel_class = Type(BlockingHBSocketChannel)
53
82 shell_channel_class = Type(BlockingShellChannel)
83 iopub_channel_class = Type(BlockingIOPubChannel)
84 stdin_channel_class = Type(BlockingStdInChannel)
85 hb_channel_class = Type(BlockingHBChannel)
86
@@ -210,9 +210,9 b' class KernelApp(BaseIPythonApplication):'
210 210 except (IOError, OSError):
211 211 pass
212 212
213 self._cleanup_ipc_files()
213 self.cleanup_ipc_files()
214 214
215 def _cleanup_ipc_files(self):
215 def cleanup_ipc_files(self):
216 216 """cleanup ipc files if we wrote them"""
217 217 if self.transport != 'ipc':
218 218 return
@@ -34,14 +34,20 b' from zmq import ZMQError'
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 from IPython.config.loader import Config
37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 from IPython.zmq.kernelmanagerabc import (
46 ShellChannelABC, IOPubChannelABC,
47 HBChannelABC, StdInChannelABC,
48 KernelManagerABC
49 )
50
45 51
46 52 #-----------------------------------------------------------------------------
47 53 # Constants and exceptions
@@ -84,8 +90,7 b' def validate_string_dict(dct):'
84 90 #-----------------------------------------------------------------------------
85 91
86 92 class ZMQSocketChannel(Thread):
87 """The base class for the channels that use ZMQ sockets.
88 """
93 """The base class for the channels that use ZMQ sockets."""
89 94 context = None
90 95 session = None
91 96 socket = None
@@ -95,7 +100,7 b' class ZMQSocketChannel(Thread):'
95 100 _exiting = False
96 101
97 102 def __init__(self, context, session, address):
98 """Create a channel
103 """Create a channel.
99 104
100 105 Parameters
101 106 ----------
@@ -141,7 +146,7 b' class ZMQSocketChannel(Thread):'
141 146 break
142 147
143 148 def stop(self):
144 """Stop the channel's activity.
149 """Stop the channel's event loop and join its thread.
145 150
146 151 This calls :method:`Thread.join` and returns when the thread
147 152 terminates. :class:`RuntimeError` will be raised if
@@ -151,7 +156,9 b' class ZMQSocketChannel(Thread):'
151 156
152 157 @property
153 158 def address(self):
154 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
159 """Get the channel's address as a zmq url string.
160
161 These URLS have the form: 'tcp://127.0.0.1:5555'.
155 162 """
156 163 return self._address
157 164
@@ -170,25 +177,24 b' class ZMQSocketChannel(Thread):'
170 177 self.ioloop.add_callback(thread_send)
171 178
172 179 def _handle_recv(self, msg):
173 """callback for stream.on_recv
174
175 unpacks message, and calls handlers with it.
180 """Callback for stream.on_recv.
181
182 Unpacks message, and calls handlers with it.
176 183 """
177 184 ident,smsg = self.session.feed_identities(msg)
178 185 self.call_handlers(self.session.unserialize(smsg))
179 186
180 187
181 188
182 class ShellSocketChannel(ZMQSocketChannel):
183 """The DEALER channel for issues request/replies to the kernel.
184 """
189 class ShellChannel(ZMQSocketChannel):
190 """The shell channel for issuing request/replies to the kernel."""
185 191
186 192 command_queue = None
187 193 # flag for whether execute requests should be allowed to call raw_input:
188 194 allow_stdin = True
189 195
190 196 def __init__(self, context, session, address):
191 super(ShellSocketChannel, self).__init__(context, session, address)
197 super(ShellChannel, self).__init__(context, session, address)
192 198 self.ioloop = ioloop.IOLoop()
193 199
194 200 def run(self):
@@ -205,8 +211,9 b' class ShellSocketChannel(ZMQSocketChannel):'
205 211 pass
206 212
207 213 def stop(self):
214 """Stop the channel's event loop and join its thread."""
208 215 self.ioloop.stop()
209 super(ShellSocketChannel, self).stop()
216 super(ShellChannel, self).stop()
210 217
211 218 def call_handlers(self, msg):
212 219 """This method is called in the ioloop thread when a message arrives.
@@ -307,7 +314,7 b' class ShellSocketChannel(ZMQSocketChannel):'
307 314 return msg['header']['msg_id']
308 315
309 316 def object_info(self, oname, detail_level=0):
310 """Get metadata information about an object.
317 """Get metadata information about an object in the kernel's namespace.
311 318
312 319 Parameters
313 320 ----------
@@ -326,7 +333,7 b' class ShellSocketChannel(ZMQSocketChannel):'
326 333 return msg['header']['msg_id']
327 334
328 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
329 """Get entries from the history list.
336 """Get entries from the kernel's history list.
330 337
331 338 Parameters
332 339 ----------
@@ -388,12 +395,14 b' class ShellSocketChannel(ZMQSocketChannel):'
388 395
389 396
390 397
391 class SubSocketChannel(ZMQSocketChannel):
392 """The SUB channel which listens for messages that the kernel publishes.
398 class IOPubChannel(ZMQSocketChannel):
399 """The iopub channel which listens for messages that the kernel publishes.
400
401 This channel is where all output is published to frontends.
393 402 """
394 403
395 404 def __init__(self, context, session, address):
396 super(SubSocketChannel, self).__init__(context, session, address)
405 super(IOPubChannel, self).__init__(context, session, address)
397 406 self.ioloop = ioloop.IOLoop()
398 407
399 408 def run(self):
@@ -411,8 +420,9 b' class SubSocketChannel(ZMQSocketChannel):'
411 420 pass
412 421
413 422 def stop(self):
423 """Stop the channel's event loop and join its thread."""
414 424 self.ioloop.stop()
415 super(SubSocketChannel, self).stop()
425 super(IOPubChannel, self).stop()
416 426
417 427 def call_handlers(self, msg):
418 428 """This method is called in the ioloop thread when a message arrives.
@@ -425,7 +435,7 b' class SubSocketChannel(ZMQSocketChannel):'
425 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
426 436
427 437 def flush(self, timeout=1.0):
428 """Immediately processes all pending messages on the SUB channel.
438 """Immediately processes all pending messages on the iopub channel.
429 439
430 440 Callers should use this method to ensure that :method:`call_handlers`
431 441 has been called for all messages that have been received on the
@@ -454,13 +464,13 b' class SubSocketChannel(ZMQSocketChannel):'
454 464 self._flushed = True
455 465
456 466
457 class StdInSocketChannel(ZMQSocketChannel):
458 """A reply channel to handle raw_input requests that the kernel makes."""
467 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
459 469
460 470 msg_queue = None
461 471
462 472 def __init__(self, context, session, address):
463 super(StdInSocketChannel, self).__init__(context, session, address)
473 super(StdInChannel, self).__init__(context, session, address)
464 474 self.ioloop = ioloop.IOLoop()
465 475
466 476 def run(self):
@@ -475,11 +485,11 b' class StdInSocketChannel(ZMQSocketChannel):'
475 485 self.socket.close()
476 486 except:
477 487 pass
478
479 488
480 489 def stop(self):
490 """Stop the channel's event loop and join its thread."""
481 491 self.ioloop.stop()
482 super(StdInSocketChannel, self).stop()
492 super(StdInChannel, self).stop()
483 493
484 494 def call_handlers(self, msg):
485 495 """This method is called in the ioloop thread when a message arrives.
@@ -498,7 +508,7 b' class StdInSocketChannel(ZMQSocketChannel):'
498 508 self._queue_send(msg)
499 509
500 510
501 class HBSocketChannel(ZMQSocketChannel):
511 class HBChannel(ZMQSocketChannel):
502 512 """The heartbeat channel which monitors the kernel heartbeat.
503 513
504 514 Note that the heartbeat channel is paused by default. As long as you start
@@ -514,7 +524,7 b' class HBSocketChannel(ZMQSocketChannel):'
514 524 _beating = None
515 525
516 526 def __init__(self, context, session, address):
517 super(HBSocketChannel, self).__init__(context, session, address)
527 super(HBChannel, self).__init__(context, session, address)
518 528 self._running = False
519 529 self._pause =True
520 530 self.poller = zmq.Poller()
@@ -531,7 +541,7 b' class HBSocketChannel(ZMQSocketChannel):'
531 541 self.poller.register(self.socket, zmq.POLLIN)
532 542
533 543 def _poll(self, start_time):
534 """poll for heartbeat replies until we reach self.time_to_dead
544 """poll for heartbeat replies until we reach self.time_to_dead.
535 545
536 546 Ignores interrupts, and returns the result of poll(), which
537 547 will be an empty list if no messages arrived before the timeout,
@@ -620,8 +630,9 b' class HBSocketChannel(ZMQSocketChannel):'
620 630 return False
621 631
622 632 def stop(self):
633 """Stop the channel's event loop and join its thread."""
623 634 self._running = False
624 super(HBSocketChannel, self).stop()
635 super(HBChannel, self).stop()
625 636
626 637 def call_handlers(self, since_last_heartbeat):
627 638 """This method is called in the ioloop thread when a message arrives.
@@ -638,20 +649,23 b' class HBSocketChannel(ZMQSocketChannel):'
638 649 # Main kernel manager class
639 650 #-----------------------------------------------------------------------------
640 651
641 class KernelManager(HasTraits):
642 """ Manages a kernel for a frontend.
652 class KernelManager(Configurable):
653 """Manages a single kernel on this host along with its channels.
643 654
644 The SUB channel is for the frontend to receive messages published by the
645 kernel.
655 There are four channels associated with each kernel:
646 656
647 The REQ channel is for the frontend to make requests of the kernel.
657 * shell: for request/reply calls to the kernel.
658 * iopub: for the kernel to publish results to frontends.
659 * hb: for monitoring the kernel's heartbeat.
660 * stdin: for frontends to reply to raw_input calls in the kernel.
648 661
649 The REP channel is for the kernel to request stdin (raw_input) from the
650 frontend.
651 """
652 # config object for passing to child configurables
653 config = Instance(Config)
662 The usage of the channels that this class manages is optional. It is
663 entirely possible to connect to the kernels directly using ZeroMQ
664 sockets. These channels are useful primarily for talking to a kernel
665 whose :class:`KernelManager` is in the same process.
654 666
667 This version manages kernels started using Popen.
668 """
655 669 # The PyZMQ Context to use for communication with the kernel.
656 670 context = Instance(zmq.Context)
657 671 def _context_default(self):
@@ -668,10 +682,9 b' class KernelManager(HasTraits):'
668 682 # The addresses for the communication channels.
669 683 connection_file = Unicode('')
670 684
671 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp')
672
673
674 ip = Unicode(LOCALHOST)
685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686
687 ip = Unicode(LOCALHOST, config=True)
675 688 def _ip_changed(self, name, old, new):
676 689 if new == '*':
677 690 self.ip = '0.0.0.0'
@@ -681,38 +694,39 b' class KernelManager(HasTraits):'
681 694 hb_port = Integer(0)
682 695
683 696 # The classes to use for the various channels.
684 shell_channel_class = Type(ShellSocketChannel)
685 sub_channel_class = Type(SubSocketChannel)
686 stdin_channel_class = Type(StdInSocketChannel)
687 hb_channel_class = Type(HBSocketChannel)
697 shell_channel_class = Type(ShellChannel)
698 iopub_channel_class = Type(IOPubChannel)
699 stdin_channel_class = Type(StdInChannel)
700 hb_channel_class = Type(HBChannel)
688 701
689 702 # Protected traits.
690 703 _launch_args = Any
691 704 _shell_channel = Any
692 _sub_channel = Any
705 _iopub_channel = Any
693 706 _stdin_channel = Any
694 707 _hb_channel = Any
695 708 _connection_file_written=Bool(False)
696
697 def __del__(self):
709
710 def __del__(self):
698 711 self.cleanup_connection_file()
699
712
700 713 #--------------------------------------------------------------------------
701 714 # Channel management methods:
702 715 #--------------------------------------------------------------------------
703 716
704 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
717 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
705 718 """Starts the channels for this kernel.
706 719
707 720 This will create the channels if they do not exist and then start
708 them. If port numbers of 0 are being used (random ports) then you
709 must first call :method:`start_kernel`. If the channels have been
710 stopped and you call this, :class:`RuntimeError` will be raised.
721 them (their activity runs in a thread). If port numbers of 0 are
722 being used (random ports) then you must first call
723 :method:`start_kernel`. If the channels have been stopped and you
724 call this, :class:`RuntimeError` will be raised.
711 725 """
712 726 if shell:
713 727 self.shell_channel.start()
714 if sub:
715 self.sub_channel.start()
728 if iopub:
729 self.iopub_channel.start()
716 730 if stdin:
717 731 self.stdin_channel.start()
718 732 self.shell_channel.allow_stdin = True
@@ -723,11 +737,13 b' class KernelManager(HasTraits):'
723 737
724 738 def stop_channels(self):
725 739 """Stops all the running channels for this kernel.
740
741 This stops their event loops and joins their threads.
726 742 """
727 743 if self.shell_channel.is_alive():
728 744 self.shell_channel.stop()
729 if self.sub_channel.is_alive():
730 self.sub_channel.stop()
745 if self.iopub_channel.is_alive():
746 self.iopub_channel.stop()
731 747 if self.stdin_channel.is_alive():
732 748 self.stdin_channel.stop()
733 749 if self.hb_channel.is_alive():
@@ -736,15 +752,64 b' class KernelManager(HasTraits):'
736 752 @property
737 753 def channels_running(self):
738 754 """Are any of the channels created and running?"""
739 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
755 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
740 756 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
741 757
758 def _make_url(self, port):
759 """Make a zmq url with a port.
760
761 There are two cases that this handles:
762
763 * tcp: tcp://ip:port
764 * ipc: ipc://ip-port
765 """
766 if self.transport == 'tcp':
767 return "tcp://%s:%i" % (self.ip, port)
768 else:
769 return "%s://%s-%s" % (self.transport, self.ip, port)
770
771 @property
772 def shell_channel(self):
773 """Get the shell channel object for this kernel."""
774 if self._shell_channel is None:
775 self._shell_channel = self.shell_channel_class(
776 self.context, self.session, self._make_url(self.shell_port)
777 )
778 return self._shell_channel
779
780 @property
781 def iopub_channel(self):
782 """Get the iopub channel object for this kernel."""
783 if self._iopub_channel is None:
784 self._iopub_channel = self.iopub_channel_class(
785 self.context, self.session, self._make_url(self.iopub_port)
786 )
787 return self._iopub_channel
788
789 @property
790 def stdin_channel(self):
791 """Get the stdin channel object for this kernel."""
792 if self._stdin_channel is None:
793 self._stdin_channel = self.stdin_channel_class(
794 self.context, self.session, self._make_url(self.stdin_port)
795 )
796 return self._stdin_channel
797
798 @property
799 def hb_channel(self):
800 """Get the hb channel object for this kernel."""
801 if self._hb_channel is None:
802 self._hb_channel = self.hb_channel_class(
803 self.context, self.session, self._make_url(self.hb_port)
804 )
805 return self._hb_channel
806
742 807 #--------------------------------------------------------------------------
743 # Kernel process management methods:
808 # Connection and ipc file management
744 809 #--------------------------------------------------------------------------
745 810
746 811 def cleanup_connection_file(self):
747 """cleanup connection file *if we wrote it*
812 """Cleanup connection file *if we wrote it*
748 813
749 814 Will not raise if the connection file was already removed somehow.
750 815 """
@@ -755,11 +820,9 b' class KernelManager(HasTraits):'
755 820 os.remove(self.connection_file)
756 821 except (IOError, OSError):
757 822 pass
758
759 self._cleanup_ipc_files()
760 823
761 def _cleanup_ipc_files(self):
762 """cleanup ipc files if we wrote them"""
824 def cleanup_ipc_files(self):
825 """Cleanup ipc files if we wrote them."""
763 826 if self.transport != 'ipc':
764 827 return
765 828 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
@@ -768,12 +831,12 b' class KernelManager(HasTraits):'
768 831 os.remove(ipcfile)
769 832 except (IOError, OSError):
770 833 pass
771
834
772 835 def load_connection_file(self):
773 """load connection info from JSON dict in self.connection_file"""
836 """Load connection info from JSON dict in self.connection_file."""
774 837 with open(self.connection_file) as f:
775 838 cfg = json.loads(f.read())
776
839
777 840 from pprint import pprint
778 841 pprint(cfg)
779 842 self.transport = cfg.get('transport', 'tcp')
@@ -785,7 +848,7 b' class KernelManager(HasTraits):'
785 848 self.session.key = str_to_bytes(cfg['key'])
786 849
787 850 def write_connection_file(self):
788 """write connection info to JSON dict in self.connection_file"""
851 """Write connection info to JSON dict in self.connection_file."""
789 852 if self._connection_file_written:
790 853 return
791 854 self.connection_file,cfg = write_connection_file(self.connection_file,
@@ -799,9 +862,13 b' class KernelManager(HasTraits):'
799 862 self.hb_port = cfg['hb_port']
800 863
801 864 self._connection_file_written = True
802
865
866 #--------------------------------------------------------------------------
867 # Kernel management
868 #--------------------------------------------------------------------------
869
803 870 def start_kernel(self, **kw):
804 """Starts a kernel process and configures the manager to use it.
871 """Starts a kernel on this host in a separate process.
805 872
806 873 If random ports (port=0) are being used, this method must be called
807 874 before the channels are created.
@@ -814,7 +881,8 b' class KernelManager(HasTraits):'
814 881 it should not be necessary to use this parameter.
815 882
816 883 **kw : optional
817 See respective options for IPython and Python kernels.
884 keyword arguments that are passed down into the launcher
885 callable.
818 886 """
819 887 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
820 888 raise RuntimeError("Can only launch a kernel on a local interface. "
@@ -832,47 +900,62 b' class KernelManager(HasTraits):'
832 900 from ipkernel import launch_kernel
833 901 self.kernel = launch_kernel(fname=self.connection_file, **kw)
834 902
835 def shutdown_kernel(self, restart=False):
836 """ Attempts to the stop the kernel process cleanly.
903 def shutdown_kernel(self, now=False, restart=False):
904 """Attempts to the stop the kernel process cleanly.
905
906 This attempts to shutdown the kernels cleanly by:
907
908 1. Sending it a shutdown message over the shell channel.
909 2. If that fails, the kernel is shutdown forcibly by sending it
910 a signal.
837 911
838 If the kernel cannot be stopped and the kernel is local, it is killed.
912 Parameters:
913 -----------
914 now : bool
915 Should the kernel be forcible killed *now*. This skips the
916 first, nice shutdown attempt.
917 restart: bool
918 Will this kernel be restarted after it is shutdown. When this
919 is True, connection files will not be cleaned up.
839 920 """
840 921 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
841 922 if sys.platform == 'win32':
842 self.kill_kernel()
923 self._kill_kernel()
843 924 return
844 925
845 926 # Pause the heart beat channel if it exists.
846 927 if self._hb_channel is not None:
847 928 self._hb_channel.pause()
848 929
849 # Don't send any additional kernel kill messages immediately, to give
850 # the kernel a chance to properly execute shutdown actions. Wait for at
851 # most 1s, checking every 0.1s.
852 self.shell_channel.shutdown(restart=restart)
853 for i in range(10):
854 if self.is_alive:
855 time.sleep(0.1)
856 else:
857 break
858 else:
859 # OK, we've waited long enough.
930 if now:
860 931 if self.has_kernel:
861 self.kill_kernel()
932 self._kill_kernel()
933 else:
934 # Don't send any additional kernel kill messages immediately, to give
935 # the kernel a chance to properly execute shutdown actions. Wait for at
936 # most 1s, checking every 0.1s.
937 self.shell_channel.shutdown(restart=restart)
938 for i in range(10):
939 if self.is_alive:
940 time.sleep(0.1)
941 else:
942 break
943 else:
944 # OK, we've waited long enough.
945 if self.has_kernel:
946 self._kill_kernel()
862 947
863 if not restart and self._connection_file_written:
864 # cleanup connection files on full shutdown of kernel we started
865 self._connection_file_written = False
866 try:
867 os.remove(self.connection_file)
868 except IOError:
869 pass
948 if not restart:
949 self.cleanup_connection_file()
950 self.cleanup_ipc_files()
951 else:
952 self.cleanup_ipc_files()
870 953
871 954 def restart_kernel(self, now=False, **kw):
872 955 """Restarts a kernel with the arguments that were used to launch it.
873 956
874 957 If the old kernel was launched with random ports, the same ports will be
875 used for the new kernel.
958 used for the new kernel. The same connection file is used again.
876 959
877 960 Parameters
878 961 ----------
@@ -885,7 +968,7 b' class KernelManager(HasTraits):'
885 968 it is given a chance to perform a clean shutdown or not.
886 969
887 970 **kw : optional
888 Any options specified here will replace those used to launch the
971 Any options specified here will overwrite those used to launch the
889 972 kernel.
890 973 """
891 974 if self._launch_args is None:
@@ -893,11 +976,7 b' class KernelManager(HasTraits):'
893 976 "No previous call to 'start_kernel'.")
894 977 else:
895 978 # Stop currently running kernel.
896 if self.has_kernel:
897 if now:
898 self.kill_kernel()
899 else:
900 self.shutdown_kernel(restart=True)
979 self.shutdown_kernel(now=now, restart=True)
901 980
902 981 # Start new kernel.
903 982 self._launch_args.update(kw)
@@ -910,15 +989,13 b' class KernelManager(HasTraits):'
910 989
911 990 @property
912 991 def has_kernel(self):
913 """Returns whether a kernel process has been specified for the kernel
914 manager.
915 """
992 """Has a kernel been started that we are managing."""
916 993 return self.kernel is not None
917 994
918 def kill_kernel(self):
919 """ Kill the running kernel.
995 def _kill_kernel(self):
996 """Kill the running kernel.
920 997
921 This method blocks until the kernel process has terminated.
998 This is a private method, callers should use shutdown_kernel(now=True).
922 999 """
923 1000 if self.has_kernel:
924 1001 # Pause the heart beat channel if it exists.
@@ -949,7 +1026,7 b' class KernelManager(HasTraits):'
949 1026 raise RuntimeError("Cannot kill kernel. No kernel is running!")
950 1027
951 1028 def interrupt_kernel(self):
952 """ Interrupts the kernel.
1029 """Interrupts the kernel by sending it a signal.
953 1030
954 1031 Unlike ``signal_kernel``, this operation is well supported on all
955 1032 platforms.
@@ -964,7 +1041,7 b' class KernelManager(HasTraits):'
964 1041 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
965 1042
966 1043 def signal_kernel(self, signum):
967 """ Sends a signal to the kernel.
1044 """Sends a signal to the kernel.
968 1045
969 1046 Note that since only SIGTERM is supported on Windows, this function is
970 1047 only useful on Unix systems.
@@ -991,54 +1068,14 b' class KernelManager(HasTraits):'
991 1068 # so naively return True
992 1069 return True
993 1070
994 #--------------------------------------------------------------------------
995 # Channels used for communication with the kernel:
996 #--------------------------------------------------------------------------
997 1071
998 def _make_url(self, port):
999 """make a zmq url with a port"""
1000 if self.transport == 'tcp':
1001 return "tcp://%s:%i" % (self.ip, port)
1002 else:
1003 return "%s://%s-%s" % (self.transport, self.ip, port)
1004
1005 @property
1006 def shell_channel(self):
1007 """Get the REQ socket channel object to make requests of the kernel."""
1008 if self._shell_channel is None:
1009 self._shell_channel = self.shell_channel_class(self.context,
1010 self.session,
1011 self._make_url(self.shell_port),
1012 )
1013 return self._shell_channel
1014
1015 @property
1016 def sub_channel(self):
1017 """Get the SUB socket channel object."""
1018 if self._sub_channel is None:
1019 self._sub_channel = self.sub_channel_class(self.context,
1020 self.session,
1021 self._make_url(self.iopub_port),
1022 )
1023 return self._sub_channel
1072 #-----------------------------------------------------------------------------
1073 # ABC Registration
1074 #-----------------------------------------------------------------------------
1024 1075
1025 @property
1026 def stdin_channel(self):
1027 """Get the REP socket channel object to handle stdin (raw_input)."""
1028 if self._stdin_channel is None:
1029 self._stdin_channel = self.stdin_channel_class(self.context,
1030 self.session,
1031 self._make_url(self.stdin_port),
1032 )
1033 return self._stdin_channel
1076 ShellChannelABC.register(ShellChannel)
1077 IOPubChannelABC.register(IOPubChannel)
1078 HBChannelABC.register(HBChannel)
1079 StdInChannelABC.register(StdInChannel)
1080 KernelManagerABC.register(KernelManager)
1034 1081
1035 @property
1036 def hb_channel(self):
1037 """Get the heartbeat socket channel object to check that the
1038 kernel is alive."""
1039 if self._hb_channel is None:
1040 self._hb_channel = self.hb_channel_class(self.context,
1041 self.session,
1042 self._make_url(self.hb_port),
1043 )
1044 return self._hb_channel
@@ -48,7 +48,7 b' def teardown():'
48 48
49 49 def flush_channels():
50 50 """flush any messages waiting on the queue"""
51 for channel in (KM.shell_channel, KM.sub_channel):
51 for channel in (KM.shell_channel, KM.iopub_channel):
52 52 while True:
53 53 try:
54 54 msg = channel.get_msg(block=True, timeout=0.1)
@@ -61,7 +61,7 b' def flush_channels():'
61 61 def execute(code='', **kwargs):
62 62 """wrapper for doing common steps for validating an execution request"""
63 63 shell = KM.shell_channel
64 sub = KM.sub_channel
64 sub = KM.iopub_channel
65 65
66 66 msg_id = shell.execute(code=code, **kwargs)
67 67 reply = shell.get_msg(timeout=2)
@@ -310,23 +310,23 b' def test_execute_silent():'
310 310 msg_id, reply = execute(code='x=1', silent=True)
311 311
312 312 # flush status=idle
313 status = KM.sub_channel.get_msg(timeout=2)
313 status = KM.iopub_channel.get_msg(timeout=2)
314 314 for tst in validate_message(status, 'status', msg_id):
315 315 yield tst
316 316 nt.assert_equal(status['content']['execution_state'], 'idle')
317 317
318 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
318 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
319 319 count = reply['execution_count']
320 320
321 321 msg_id, reply = execute(code='x=2', silent=True)
322 322
323 323 # flush status=idle
324 status = KM.sub_channel.get_msg(timeout=2)
324 status = KM.iopub_channel.get_msg(timeout=2)
325 325 for tst in validate_message(status, 'status', msg_id):
326 326 yield tst
327 327 yield nt.assert_equal(status['content']['execution_state'], 'idle')
328 328
329 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
329 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
330 330 count_2 = reply['execution_count']
331 331 yield nt.assert_equal(count_2, count)
332 332
@@ -339,7 +339,7 b' def test_execute_error():'
339 339 yield nt.assert_equal(reply['status'], 'error')
340 340 yield nt.assert_equal(reply['ename'], 'ZeroDivisionError')
341 341
342 pyerr = KM.sub_channel.get_msg(timeout=2)
342 pyerr = KM.iopub_channel.get_msg(timeout=2)
343 343 for tst in validate_message(pyerr, 'pyerr', msg_id):
344 344 yield tst
345 345
@@ -475,7 +475,7 b' def test_stream():'
475 475
476 476 msg_id, reply = execute("print('hi')")
477 477
478 stdout = KM.sub_channel.get_msg(timeout=2)
478 stdout = KM.iopub_channel.get_msg(timeout=2)
479 479 for tst in validate_message(stdout, 'stream', msg_id):
480 480 yield tst
481 481 content = stdout['content']
@@ -489,7 +489,7 b' def test_display_data():'
489 489
490 490 msg_id, reply = execute("from IPython.core.display import display; display(1)")
491 491
492 display = KM.sub_channel.get_msg(timeout=2)
492 display = KM.iopub_channel.get_msg(timeout=2)
493 493 for tst in validate_message(display, 'display_data', parent=msg_id):
494 494 yield tst
495 495 data = display['content']['data']
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now