##// END OF EJS Templates
Refactor kernel managers in preparation for the EmbeddedKernel.
epatters -
Show More
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -0,0 +1,75 b''
1 """ Implements a fully blocking kernel manager.
2
3 Useful for test suites and blocking terminal interfaces.
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2012 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
17 # Standard library imports.
18 import Queue
19 from threading import Event
20
21 # Local imports.
22 from IPython.utils.traitlets import Type
23 from kernelmanager import EmbeddedKernelManager, ShellEmbeddedChannel, \
24 SubEmbeddedChannel, StdInEmbeddedChannel
25
26 #-----------------------------------------------------------------------------
27 # Utility classes
28 #-----------------------------------------------------------------------------
29
30 class BlockingChannelMixin(object):
31
32 def __init__(self, *args, **kwds):
33 super(BlockingChannelMixin, self).__init__(*args, **kwds)
34 self._in_queue = Queue.Queue()
35
36 def call_handlers(self, msg):
37 self._in_queue.put(msg)
38
39 def get_msg(self, block=True, timeout=None):
40 """ Gets a message if there is one that is ready. """
41 return self._in_queue.get(block, timeout)
42
43 def get_msgs(self):
44 """ Get all messages that are currently ready. """
45 msgs = []
46 while True:
47 try:
48 msgs.append(self.get_msg(block=False))
49 except Queue.Empty:
50 break
51 return msgs
52
53 def msg_ready(self):
54 """ Is there a message that has been received? """
55 return not self._in_queue.empty()
56
57 #-----------------------------------------------------------------------------
58 # Blocking kernel manager
59 #-----------------------------------------------------------------------------
60
61 class BlockingShellEmbeddedChannel(BlockingChannelMixin, ShellEmbeddedChannel):
62 pass
63
64 class BlockingSubEmbeddedChannel(BlockingChannelMixin, SubEmbeddedChannel):
65 pass
66
67 class BlockingStdInEmbeddedChannel(BlockingChannelMixin, StdInEmbeddedChannel):
68 pass
69
70 class BlockingEmbeddedKernelManager(EmbeddedKernelManager):
71
72 # The classes to use for the various channels.
73 shell_channel_class = Type(BlockingShellEmbeddedChannel)
74 sub_channel_class = Type(BlockingSubEmbeddedChannel)
75 stdin_channel_class = Type(BlockingStdInEmbeddedChannel)
@@ -0,0 +1,22 b''
1 """ An embedded (in-process) kernel. """
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Local imports.
15 from IPython.zmq.ipkernel import Kernel
16
17 #-----------------------------------------------------------------------------
18 # Main kernel class
19 #-----------------------------------------------------------------------------
20
21 class EmbeddedKernel(Kernel):
22 pass
@@ -0,0 +1,398 b''
1 """ A kernel manager for embedded (in-process) kernels. """
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Local imports.
15 from IPython.config.loader import Config
16 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
17
18 #-----------------------------------------------------------------------------
19 # Channel classes
20 #-----------------------------------------------------------------------------
21
22 class EmbeddedChannel(object):
23 """ Base class for embedded channels.
24 """
25
26 def __init__(self, manager):
27 super(EmbeddedChannel, self).__init__()
28 self.manager = manager
29 self._is_alive = False
30
31 #--------------------------------------------------------------------------
32 # Channel interface
33 #--------------------------------------------------------------------------
34
35 def is_alive(self):
36 return self._is_alive
37
38 def start(self):
39 self._is_alive = True
40
41 def stop(self):
42 self._is_alive = False
43
44 def call_handlers(self, msg):
45 """ This method is called in the main thread when a message arrives.
46
47 Subclasses should override this method to handle incoming messages.
48 """
49 raise NotImplementedError('call_handlers must be defined in a subclass.')
50
51 #--------------------------------------------------------------------------
52 # EmbeddedChannel interface
53 #--------------------------------------------------------------------------
54
55 def call_handlers_later(self, *args, **kwds):
56 """ Call the message handlers later.
57
58 The default implementation just calls the handlers immediately, but this
59 method exists so that GUI toolkits can defer calling the handlers until
60 after the event loop has run, as expected by GUI frontends.
61 """
62 self.call_handlers(*args, **kwds)
63
64 def process_events(self):
65 """ Process any pending GUI events.
66
67 This method will be never be called from a frontend without an event
68 loop (e.g., a terminal frontend).
69 """
70 raise NotImplementedError
71
72
73 class ShellEmbeddedChannel(EmbeddedChannel):
74 """The DEALER channel for issues request/replies to the kernel.
75 """
76
77 # flag for whether execute requests should be allowed to call raw_input
78 allow_stdin = True
79
80 def execute(self, code, silent=False, store_history=True,
81 user_variables=[], user_expressions={}, allow_stdin=None):
82 """Execute code in the kernel.
83
84 Parameters
85 ----------
86 code : str
87 A string of Python code.
88
89 silent : bool, optional (default False)
90 If set, the kernel will execute the code as quietly possible, and
91 will force store_history to be False.
92
93 store_history : bool, optional (default True)
94 If set, the kernel will store command history. This is forced
95 to be False if silent is True.
96
97 user_variables : list, optional
98 A list of variable names to pull from the user's namespace. They
99 will come back as a dict with these names as keys and their
100 :func:`repr` as values.
101
102 user_expressions : dict, optional
103 A dict mapping names to expressions to be evaluated in the user's
104 dict. The expression values are returned as strings formatted using
105 :func:`repr`.
106
107 allow_stdin : bool, optional (default self.allow_stdin)
108 Flag for whether the kernel can send stdin requests to frontends.
109
110 Some frontends (e.g. the Notebook) do not support stdin requests.
111 If raw_input is called from code executed from such a frontend, a
112 StdinNotImplementedError will be raised.
113
114 Returns
115 -------
116 The msg_id of the message sent.
117 """
118 raise NotImplementedError
119
120 def complete(self, text, line, cursor_pos, block=None):
121 """Tab complete text in the kernel's namespace.
122
123 Parameters
124 ----------
125 text : str
126 The text to complete.
127 line : str
128 The full line of text that is the surrounding context for the
129 text to complete.
130 cursor_pos : int
131 The position of the cursor in the line where the completion was
132 requested.
133 block : str, optional
134 The full block of code in which the completion is being requested.
135
136 Returns
137 -------
138 The msg_id of the message sent.
139 """
140 raise NotImplementedError
141
142 def object_info(self, oname, detail_level=0):
143 """Get metadata information about an object.
144
145 Parameters
146 ----------
147 oname : str
148 A string specifying the object name.
149 detail_level : int, optional
150 The level of detail for the introspection (0-2)
151
152 Returns
153 -------
154 The msg_id of the message sent.
155 """
156 raise NotImplementedError
157
158 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
159 """Get entries from the history list.
160
161 Parameters
162 ----------
163 raw : bool
164 If True, return the raw input.
165 output : bool
166 If True, then return the output as well.
167 hist_access_type : str
168 'range' (fill in session, start and stop params), 'tail' (fill in n)
169 or 'search' (fill in pattern param).
170
171 session : int
172 For a range request, the session from which to get lines. Session
173 numbers are positive integers; negative ones count back from the
174 current session.
175 start : int
176 The first line number of a history range.
177 stop : int
178 The final (excluded) line number of a history range.
179
180 n : int
181 The number of lines of history to get for a tail request.
182
183 pattern : str
184 The glob-syntax pattern for a search request.
185
186 Returns
187 -------
188 The msg_id of the message sent.
189 """
190 raise NotImplementedError
191
192 def shutdown(self, restart=False):
193 """ Request an immediate kernel shutdown.
194
195 A dummy method for the embedded kernel.
196 """
197 # FIXME: What to do here?
198 raise NotImplementedError('Shutdown not supported for embedded kernel')
199
200
201 class SubEmbeddedChannel(EmbeddedChannel):
202 """The SUB channel which listens for messages that the kernel publishes.
203 """
204
205 def flush(self, timeout=1.0):
206 """ Immediately processes all pending messages on the SUB channel.
207
208 A dummy method for the embedded kernel.
209 """
210 pass
211
212
213 class StdInEmbeddedChannel(EmbeddedChannel):
214 """ A reply channel to handle raw_input requests that the kernel makes. """
215
216 def input(self, string):
217 """ Send a string of raw input to the kernel.
218 """
219 raise NotImplementedError
220
221
222 class HBEmbeddedChannel(EmbeddedChannel):
223 """ A dummy heartbeat channel. """
224
225 time_to_dead = 3.0
226
227 def __init__(self, *args, **kwds):
228 super(HBEmbeddedChannel, self).__init__(*args, **kwds)
229 self._pause = True
230
231 def pause(self):
232 """ Pause the heartbeat. """
233 self._pause = True
234
235 def unpause(self):
236 """ Unpause the heartbeat. """
237 self._pause = False
238
239 def is_beating(self):
240 """ Is the heartbeat running and responsive (and not paused). """
241 return not self._pause
242
243
244 #-----------------------------------------------------------------------------
245 # Main kernel manager class
246 #-----------------------------------------------------------------------------
247
248 class EmbeddedKernelManager(HasTraits):
249 """ A manager for an embedded kernel.
250
251 This class implements most of the interface of
252 ``IPython.zmq.kernelmanager.KernelManager`` and allows (asynchronous)
253 frontends to be used seamlessly with an in-process kernel.
254 """
255 # Config object for passing to child configurables
256 config = Instance(Config)
257
258 # The Session to use for building messages.
259 session = Instance('IPython.zmq.session.Session')
260 def _session_default(self):
261 from IPython.zmq.session import Session
262 return Session(config=self.config)
263
264 # The kernel process with which the KernelManager is communicating.
265 kernel = Instance('IPython.embedded.ipkernel.EmbeddedKernel')
266
267 # The classes to use for the various channels.
268 shell_channel_class = Type(ShellEmbeddedChannel)
269 sub_channel_class = Type(SubEmbeddedChannel)
270 stdin_channel_class = Type(StdInEmbeddedChannel)
271 hb_channel_class = Type(HBEmbeddedChannel)
272
273 # Protected traits.
274 _shell_channel = Any
275 _sub_channel = Any
276 _stdin_channel = Any
277 _hb_channel = Any
278
279 #--------------------------------------------------------------------------
280 # Channel management methods:
281 #--------------------------------------------------------------------------
282
283 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
284 """ Starts the channels for this kernel.
285 """
286 if shell:
287 self.shell_channel.start()
288 if sub:
289 self.sub_channel.start()
290 if stdin:
291 self.stdin_channel.start()
292 self.shell_channel.allow_stdin = True
293 else:
294 self.shell_channel.allow_stdin = False
295 if hb:
296 self.hb_channel.start()
297
298 def stop_channels(self):
299 """ Stops all the running channels for this kernel.
300 """
301 if self.shell_channel.is_alive():
302 self.shell_channel.stop()
303 if self.sub_channel.is_alive():
304 self.sub_channel.stop()
305 if self.stdin_channel.is_alive():
306 self.stdin_channel.stop()
307 if self.hb_channel.is_alive():
308 self.hb_channel.stop()
309
310 @property
311 def channels_running(self):
312 """ Are any of the channels created and running? """
313 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
314 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
315
316 #--------------------------------------------------------------------------
317 # Kernel management methods:
318 #--------------------------------------------------------------------------
319
320 def start_kernel(self, **kwds):
321 """ Starts a kernel process and configures the manager to use it.
322 """
323 from IPython.embedded.ipkernel import EmbeddedKernel
324 self.kernel = EmbeddedKernel()
325 self.kernel.frontends.append(self)
326
327 def shutdown_kernel(self):
328 """ Attempts to the stop the kernel process cleanly. If the kernel
329 cannot be stopped and the kernel is local, it is killed.
330 """
331 self.kill_kernel()
332
333 def restart_kernel(self, now=False, **kwds):
334 """ Restarts a kernel with the arguments that were used to launch it.
335
336 The 'now' parameter is ignored.
337 """
338 self.shutdown_kernel()
339 self.start_kernel(**kwds)
340
341 @property
342 def has_kernel(self):
343 """ Returns whether a kernel process has been specified for the kernel
344 manager.
345 """
346 return self.kernel is not None
347
348 def kill_kernel(self):
349 """ Kill the running kernel.
350 """
351 self.kernel.frontends.remove(self)
352 self.kernel = None
353
354 def interrupt_kernel(self):
355 """ Interrupts the kernel. """
356 raise NotImplementedError("Cannot interrupt embedded kernel.")
357
358 def signal_kernel(self, signum):
359 """ Sends a signal to the kernel. """
360 raise NotImplementedError("Cannot signal embedded kernel.")
361
362 @property
363 def is_alive(self):
364 """ Is the kernel process still running? """
365 return True
366
367 #--------------------------------------------------------------------------
368 # Channels used for communication with the kernel:
369 #--------------------------------------------------------------------------
370
371 @property
372 def shell_channel(self):
373 """Get the REQ socket channel object to make requests of the kernel."""
374 if self._shell_channel is None:
375 self._shell_channel = self.shell_channel_class(self)
376 return self._shell_channel
377
378 @property
379 def sub_channel(self):
380 """Get the SUB socket channel object."""
381 if self._sub_channel is None:
382 self._sub_channel = self.sub_channel_class(self)
383 return self._sub_channel
384
385 @property
386 def stdin_channel(self):
387 """Get the REP socket channel object to handle stdin (raw_input)."""
388 if self._stdin_channel is None:
389 self._stdin_channel = self.stdin_channel_class(self)
390 return self._stdin_channel
391
392 @property
393 def hb_channel(self):
394 """Get the heartbeat socket channel object to check that the
395 kernel is alive."""
396 if self._hb_channel is None:
397 self._hb_channel = self.hb_channel_class(self)
398 return self._hb_channel
@@ -0,0 +1,258 b''
1 """ Defines a KernelManager that provides signals and slots.
2 """
3
4 # System library imports.
5 from IPython.external.qt import QtCore
6
7 # IPython imports.
8 from IPython.utils.traitlets import Type
9 from util import SuperQObject
10
11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 #---------------------------------------------------------------------------
21 # Channel interface
22 #---------------------------------------------------------------------------
23
24 def start(self):
25 """ Reimplemented to emit signal.
26 """
27 super(ChannelQObject, self).start()
28 self.started.emit()
29
30 def stop(self):
31 """ Reimplemented to emit signal.
32 """
33 super(ChannelQObject, self).stop()
34 self.stopped.emit()
35
36 #---------------------------------------------------------------------------
37 # EmbeddedChannel interface
38 #---------------------------------------------------------------------------
39
40 def call_handlers_later(self, *args, **kwds):
41 """ Call the message handlers later.
42 """
43 do_later = lambda: self.call_handlers(*args, **kwds)
44 QtCore.QTimer.singleShot(0, do_later)
45
46 def process_events(self):
47 """ Process any pending GUI events.
48 """
49 QtCore.QCoreApplication.instance().processEvents()
50
51
52 class QtShellChannelMixin(ChannelQObject):
53
54 # Emitted when any message is received.
55 message_received = QtCore.Signal(object)
56
57 # Emitted when a reply has been received for the corresponding request
58 # type.
59 execute_reply = QtCore.Signal(object)
60 complete_reply = QtCore.Signal(object)
61 object_info_reply = QtCore.Signal(object)
62 history_reply = QtCore.Signal(object)
63
64 # Emitted when the first reply comes back.
65 first_reply = QtCore.Signal()
66
67 # Used by the first_reply signal logic to determine if a reply is the
68 # first.
69 _handlers_called = False
70
71 #---------------------------------------------------------------------------
72 # 'ShellSocketChannel' interface
73 #---------------------------------------------------------------------------
74
75 def call_handlers(self, msg):
76 """ Reimplemented to emit signals instead of making callbacks.
77 """
78 # Emit the generic signal.
79 self.message_received.emit(msg)
80
81 # Emit signals for specialized message types.
82 msg_type = msg['header']['msg_type']
83 signal = getattr(self, msg_type, None)
84 if signal:
85 signal.emit(msg)
86
87 if not self._handlers_called:
88 self.first_reply.emit()
89 self._handlers_called = True
90
91 #---------------------------------------------------------------------------
92 # 'QtShellChannelMixin' interface
93 #---------------------------------------------------------------------------
94
95 def reset_first_reply(self):
96 """ Reset the first_reply signal to fire again on the next reply.
97 """
98 self._handlers_called = False
99
100
101 class QtSubChannelMixin(ChannelQObject):
102
103 # Emitted when any message is received.
104 message_received = QtCore.Signal(object)
105
106 # Emitted when a message of type 'stream' is received.
107 stream_received = QtCore.Signal(object)
108
109 # Emitted when a message of type 'pyin' is received.
110 pyin_received = QtCore.Signal(object)
111
112 # Emitted when a message of type 'pyout' is received.
113 pyout_received = QtCore.Signal(object)
114
115 # Emitted when a message of type 'pyerr' is received.
116 pyerr_received = QtCore.Signal(object)
117
118 # Emitted when a message of type 'display_data' is received
119 display_data_received = QtCore.Signal(object)
120
121 # Emitted when a crash report message is received from the kernel's
122 # last-resort sys.excepthook.
123 crash_received = QtCore.Signal(object)
124
125 # Emitted when a shutdown is noticed.
126 shutdown_reply_received = QtCore.Signal(object)
127
128 #---------------------------------------------------------------------------
129 # 'SubSocketChannel' interface
130 #---------------------------------------------------------------------------
131
132 def call_handlers(self, msg):
133 """ Reimplemented to emit signals instead of making callbacks.
134 """
135 # Emit the generic signal.
136 self.message_received.emit(msg)
137 # Emit signals for specialized message types.
138 msg_type = msg['header']['msg_type']
139 signal = getattr(self, msg_type + '_received', None)
140 if signal:
141 signal.emit(msg)
142 elif msg_type in ('stdout', 'stderr'):
143 self.stream_received.emit(msg)
144
145 def flush(self):
146 """ Reimplemented to ensure that signals are dispatched immediately.
147 """
148 super(QtSubChannelMixin, self).flush()
149 QtCore.QCoreApplication.instance().processEvents()
150
151
152 class QtStdInChannelMixin(ChannelQObject):
153
154 # Emitted when any message is received.
155 message_received = QtCore.Signal(object)
156
157 # Emitted when an input request is received.
158 input_requested = QtCore.Signal(object)
159
160 #---------------------------------------------------------------------------
161 # 'StdInSocketChannel' interface
162 #---------------------------------------------------------------------------
163
164 def call_handlers(self, msg):
165 """ Reimplemented to emit signals instead of making callbacks.
166 """
167 # Emit the generic signal.
168 self.message_received.emit(msg)
169
170 # Emit signals for specialized message types.
171 msg_type = msg['header']['msg_type']
172 if msg_type == 'input_request':
173 self.input_requested.emit(msg)
174
175
176 class QtHBChannelMixin(ChannelQObject):
177
178 # Emitted when the kernel has died.
179 kernel_died = QtCore.Signal(object)
180
181 #---------------------------------------------------------------------------
182 # 'HBSocketChannel' interface
183 #---------------------------------------------------------------------------
184
185 def call_handlers(self, since_last_heartbeat):
186 """ Reimplemented to emit signals instead of making callbacks.
187 """
188 # Emit the generic signal.
189 self.kernel_died.emit(since_last_heartbeat)
190
191
192 class QtKernelManagerMixin(object):
193 """ A KernelManager that provides signals and slots.
194 """
195
196 # Emitted when the kernel manager has started listening.
197 started_kernel = QtCore.Signal()
198
199 # Emitted when the kernel manager has started listening.
200 started_channels = QtCore.Signal()
201
202 # Emitted when the kernel manager has stopped listening.
203 stopped_channels = QtCore.Signal()
204
205 # Use Qt-specific channel classes that emit signals.
206 sub_channel_class = Type(QtSubChannelMixin)
207 shell_channel_class = Type(QtShellChannelMixin)
208 stdin_channel_class = Type(QtStdInChannelMixin)
209 hb_channel_class = Type(QtHBChannelMixin)
210
211 #---------------------------------------------------------------------------
212 # 'KernelManager' interface
213 #---------------------------------------------------------------------------
214
215 #------ Kernel process management ------------------------------------------
216
217 def start_kernel(self, *args, **kw):
218 """ Reimplemented for proper heartbeat management.
219 """
220 if self._shell_channel is not None:
221 self._shell_channel.reset_first_reply()
222 super(QtKernelManagerMixin, self).start_kernel(*args, **kw)
223 self.started_kernel.emit()
224
225 #------ Channel management -------------------------------------------------
226
227 def start_channels(self, *args, **kw):
228 """ Reimplemented to emit signal.
229 """
230 super(QtKernelManagerMixin, self).start_channels(*args, **kw)
231 self.started_channels.emit()
232
233 def stop_channels(self):
234 """ Reimplemented to emit signal.
235 """
236 super(QtKernelManagerMixin, self).stop_channels()
237 self.stopped_channels.emit()
238
239 @property
240 def shell_channel(self):
241 """ Reimplemented for proper heartbeat management.
242 """
243 if self._shell_channel is None:
244 self._shell_channel = super(QtKernelManagerMixin,self).shell_channel
245 self._shell_channel.first_reply.connect(self._first_reply)
246 return self._shell_channel
247
248 #---------------------------------------------------------------------------
249 # Protected interface
250 #---------------------------------------------------------------------------
251
252 def _first_reply(self):
253 """ Unpauses the heartbeat channel when the first reply is received on
254 the execute channel. Note that this will *not* start the heartbeat
255 channel if it is not already running!
256 """
257 if self._hb_channel is not None:
258 self._hb_channel.unpause()
@@ -0,0 +1,37 b''
1 """ Defines an embedded KernelManager that provides signals and slots.
2 """
3
4 # Local imports.
5 from IPython.embedded.kernelmanager import \
6 ShellEmbeddedChannel, SubEmbeddedChannel, StdInEmbeddedChannel, \
7 HBEmbeddedChannel, EmbeddedKernelManager
8 from IPython.utils.traitlets import Type
9 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 from util import MetaQObjectHasTraits, SuperQObject
12
13
14 class QtShellEmbeddedChannel(QtShellChannelMixin, ShellEmbeddedChannel):
15 pass
16
17 class QtSubEmbeddedChannel(QtSubChannelMixin, SubEmbeddedChannel):
18 pass
19
20 class QtStdInEmbeddedChannel(QtStdInChannelMixin, StdInEmbeddedChannel):
21 pass
22
23 class QtHBEmbeddedChannel(QtHBChannelMixin, HBEmbeddedChannel):
24 pass
25
26
27 class QtEmbeddedKernelManager(QtKernelManagerMixin,
28 EmbeddedKernelManager, SuperQObject):
29 """ An embedded KernelManager that provides signals and slots.
30 """
31
32 __metaclass__ = MetaQObjectHasTraits
33
34 sub_channel_class = Type(QtSubEmbeddedChannel)
35 shell_channel_class = Type(QtShellEmbeddedChannel)
36 stdin_channel_class = Type(QtStdInEmbeddedChannel)
37 hb_channel_class = Type(QtHBEmbeddedChannel)
@@ -1,247 +1,35 b''
1 """ Defines a KernelManager that provides signals and slots.
1 """ Defines a KernelManager that provides signals and slots.
2 """
2 """
3
3
4 # System library imports.
4 # Local imports.
5 from IPython.external.qt import QtCore
6
7 # IPython imports.
8 from IPython.utils.traitlets import Type
5 from IPython.utils.traitlets import Type
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
6 from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
10 ShellSocketChannel, StdInSocketChannel, HBSocketChannel
7 StdInSocketChannel, HBSocketChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 from util import MetaQObjectHasTraits, SuperQObject
10 from util import MetaQObjectHasTraits, SuperQObject
12
11
13
12
14 class SocketChannelQObject(SuperQObject):
13 class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
15
14 pass
16 # Emitted when the channel is started.
17 started = QtCore.Signal()
18
19 # Emitted when the channel is stopped.
20 stopped = QtCore.Signal()
21
22 #---------------------------------------------------------------------------
23 # 'ZMQSocketChannel' interface
24 #---------------------------------------------------------------------------
25
26 def start(self):
27 """ Reimplemented to emit signal.
28 """
29 super(SocketChannelQObject, self).start()
30 self.started.emit()
31
32 def stop(self):
33 """ Reimplemented to emit signal.
34 """
35 super(SocketChannelQObject, self).stop()
36 self.stopped.emit()
37
38
39 class QtShellSocketChannel(SocketChannelQObject, ShellSocketChannel):
40
41 # Emitted when any message is received.
42 message_received = QtCore.Signal(object)
43
44 # Emitted when a reply has been received for the corresponding request
45 # type.
46 execute_reply = QtCore.Signal(object)
47 complete_reply = QtCore.Signal(object)
48 object_info_reply = QtCore.Signal(object)
49 history_reply = QtCore.Signal(object)
50
51 # Emitted when the first reply comes back.
52 first_reply = QtCore.Signal()
53
54 # Used by the first_reply signal logic to determine if a reply is the
55 # first.
56 _handlers_called = False
57
58 #---------------------------------------------------------------------------
59 # 'ShellSocketChannel' interface
60 #---------------------------------------------------------------------------
61
62 def call_handlers(self, msg):
63 """ Reimplemented to emit signals instead of making callbacks.
64 """
65 # Emit the generic signal.
66 self.message_received.emit(msg)
67
68 # Emit signals for specialized message types.
69 msg_type = msg['header']['msg_type']
70 signal = getattr(self, msg_type, None)
71 if signal:
72 signal.emit(msg)
73
74 if not self._handlers_called:
75 self.first_reply.emit()
76 self._handlers_called = True
77
78 #---------------------------------------------------------------------------
79 # 'QtShellSocketChannel' interface
80 #---------------------------------------------------------------------------
81
82 def reset_first_reply(self):
83 """ Reset the first_reply signal to fire again on the next reply.
84 """
85 self._handlers_called = False
86
87
88 class QtSubSocketChannel(SocketChannelQObject, SubSocketChannel):
89
90 # Emitted when any message is received.
91 message_received = QtCore.Signal(object)
92
93 # Emitted when a message of type 'stream' is received.
94 stream_received = QtCore.Signal(object)
95
96 # Emitted when a message of type 'pyin' is received.
97 pyin_received = QtCore.Signal(object)
98
99 # Emitted when a message of type 'pyout' is received.
100 pyout_received = QtCore.Signal(object)
101
102 # Emitted when a message of type 'pyerr' is received.
103 pyerr_received = QtCore.Signal(object)
104
105 # Emitted when a message of type 'display_data' is received
106 display_data_received = QtCore.Signal(object)
107
15
108 # Emitted when a crash report message is received from the kernel's
16 class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
109 # last-resort sys.excepthook.
17 pass
110 crash_received = QtCore.Signal(object)
111
18
112 # Emitted when a shutdown is noticed.
19 class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
113 shutdown_reply_received = QtCore.Signal(object)
20 pass
114
21
115 #---------------------------------------------------------------------------
22 class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
116 # 'SubSocketChannel' interface
23 pass
117 #---------------------------------------------------------------------------
118
24
119 def call_handlers(self, msg):
120 """ Reimplemented to emit signals instead of making callbacks.
121 """
122 # Emit the generic signal.
123 self.message_received.emit(msg)
124 # Emit signals for specialized message types.
125 msg_type = msg['header']['msg_type']
126 signal = getattr(self, msg_type + '_received', None)
127 if signal:
128 signal.emit(msg)
129 elif msg_type in ('stdout', 'stderr'):
130 self.stream_received.emit(msg)
131
25
132 def flush(self):
26 class QtKernelManager(QtKernelManagerMixin, KernelManager, SuperQObject):
133 """ Reimplemented to ensure that signals are dispatched immediately.
134 """
135 super(QtSubSocketChannel, self).flush()
136 QtCore.QCoreApplication.instance().processEvents()
137
138
139 class QtStdInSocketChannel(SocketChannelQObject, StdInSocketChannel):
140
141 # Emitted when any message is received.
142 message_received = QtCore.Signal(object)
143
144 # Emitted when an input request is received.
145 input_requested = QtCore.Signal(object)
146
147 #---------------------------------------------------------------------------
148 # 'StdInSocketChannel' interface
149 #---------------------------------------------------------------------------
150
151 def call_handlers(self, msg):
152 """ Reimplemented to emit signals instead of making callbacks.
153 """
154 # Emit the generic signal.
155 self.message_received.emit(msg)
156
157 # Emit signals for specialized message types.
158 msg_type = msg['header']['msg_type']
159 if msg_type == 'input_request':
160 self.input_requested.emit(msg)
161
162
163 class QtHBSocketChannel(SocketChannelQObject, HBSocketChannel):
164
165 # Emitted when the kernel has died.
166 kernel_died = QtCore.Signal(object)
167
168 #---------------------------------------------------------------------------
169 # 'HBSocketChannel' interface
170 #---------------------------------------------------------------------------
171
172 def call_handlers(self, since_last_heartbeat):
173 """ Reimplemented to emit signals instead of making callbacks.
174 """
175 # Emit the generic signal.
176 self.kernel_died.emit(since_last_heartbeat)
177
178
179 class QtKernelManager(KernelManager, SuperQObject):
180 """ A KernelManager that provides signals and slots.
27 """ A KernelManager that provides signals and slots.
181 """
28 """
182
29
183 __metaclass__ = MetaQObjectHasTraits
30 __metaclass__ = MetaQObjectHasTraits
184
31
185 # Emitted when the kernel manager has started listening.
186 started_kernel = QtCore.Signal()
187
188 # Emitted when the kernel manager has started listening.
189 started_channels = QtCore.Signal()
190
191 # Emitted when the kernel manager has stopped listening.
192 stopped_channels = QtCore.Signal()
193
194 # Use Qt-specific channel classes that emit signals.
195 sub_channel_class = Type(QtSubSocketChannel)
32 sub_channel_class = Type(QtSubSocketChannel)
196 shell_channel_class = Type(QtShellSocketChannel)
33 shell_channel_class = Type(QtShellSocketChannel)
197 stdin_channel_class = Type(QtStdInSocketChannel)
34 stdin_channel_class = Type(QtStdInSocketChannel)
198 hb_channel_class = Type(QtHBSocketChannel)
35 hb_channel_class = Type(QtHBSocketChannel)
199
200 #---------------------------------------------------------------------------
201 # 'KernelManager' interface
202 #---------------------------------------------------------------------------
203
204 #------ Kernel process management ------------------------------------------
205
206 def start_kernel(self, *args, **kw):
207 """ Reimplemented for proper heartbeat management.
208 """
209 if self._shell_channel is not None:
210 self._shell_channel.reset_first_reply()
211 super(QtKernelManager, self).start_kernel(*args, **kw)
212 self.started_kernel.emit()
213
214 #------ Channel management -------------------------------------------------
215
216 def start_channels(self, *args, **kw):
217 """ Reimplemented to emit signal.
218 """
219 super(QtKernelManager, self).start_channels(*args, **kw)
220 self.started_channels.emit()
221
222 def stop_channels(self):
223 """ Reimplemented to emit signal.
224 """
225 super(QtKernelManager, self).stop_channels()
226 self.stopped_channels.emit()
227
228 @property
229 def shell_channel(self):
230 """ Reimplemented for proper heartbeat management.
231 """
232 if self._shell_channel is None:
233 self._shell_channel = super(QtKernelManager, self).shell_channel
234 self._shell_channel.first_reply.connect(self._first_reply)
235 return self._shell_channel
236
237 #---------------------------------------------------------------------------
238 # Protected interface
239 #---------------------------------------------------------------------------
240
241 def _first_reply(self):
242 """ Unpauses the heartbeat channel when the first reply is received on
243 the execute channel. Note that this will *not* start the heartbeat
244 channel if it is not already running!
245 """
246 if self._hb_channel is not None:
247 self._hb_channel.unpause()
@@ -1,154 +1,53 b''
1 """Implement a fully blocking kernel manager.
1 """ Implements a fully blocking kernel manager.
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2012 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
15
17 # Stdlib
16 # Local imports.
18 from Queue import Queue, Empty
17 from IPython.embedded.blockingkernelmanager import BlockingChannelMixin
19 from threading import Event
20
21 # Our own
22 from IPython.utils import io
23 from IPython.utils.traitlets import Type
18 from IPython.utils.traitlets import Type
24
19 from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
20 ShellSocketChannel, StdInSocketChannel
26 ShellSocketChannel, StdInSocketChannel)
27
21
28 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
29 # Functions and classes
23 # Blocking kernel manager
30 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
31
25
32 class BlockingSubSocketChannel(SubSocketChannel):
26 class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
33
27 pass
34 def __init__(self, context, session, address=None):
35 super(BlockingSubSocketChannel, self).__init__(context, session,
36 address)
37 self._in_queue = Queue()
38
39 def call_handlers(self, msg):
40 #io.rprint('[[Sub]]', msg) # dbg
41 self._in_queue.put(msg)
42
43 def msg_ready(self):
44 """Is there a message that has been received?"""
45 if self._in_queue.qsize() == 0:
46 return False
47 else:
48 return True
49
50 def get_msg(self, block=True, timeout=None):
51 """Get a message if there is one that is ready."""
52 if block and timeout is None:
53 # never use timeout=None, because get
54 # becomes uninterruptible
55 timeout = 1e6
56 return self._in_queue.get(block, timeout)
57
58 def get_msgs(self):
59 """Get all messages that are currently ready."""
60 msgs = []
61 while True:
62 try:
63 msgs.append(self.get_msg(block=False))
64 except Empty:
65 break
66 return msgs
67
68
69 class BlockingShellSocketChannel(ShellSocketChannel):
70
28
71 def __init__(self, context, session, address=None):
29 class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
72 super(BlockingShellSocketChannel, self).__init__(context, session,
30 pass
73 address)
74 self._in_queue = Queue()
75
76 def call_handlers(self, msg):
77 #io.rprint('[[Shell]]', msg) # dbg
78 self._in_queue.put(msg)
79
80 def msg_ready(self):
81 """Is there a message that has been received?"""
82 if self._in_queue.qsize() == 0:
83 return False
84 else:
85 return True
86
87 def get_msg(self, block=True, timeout=None):
88 """Get a message if there is one that is ready."""
89 if block and timeout is None:
90 # never use timeout=None, because get
91 # becomes uninterruptible
92 timeout = 1e6
93 return self._in_queue.get(block, timeout)
94
95 def get_msgs(self):
96 """Get all messages that are currently ready."""
97 msgs = []
98 while True:
99 try:
100 msgs.append(self.get_msg(block=False))
101 except Empty:
102 break
103 return msgs
104
105
106 class BlockingStdInSocketChannel(StdInSocketChannel):
107
108 def __init__(self, context, session, address=None):
109 super(BlockingStdInSocketChannel, self).__init__(context, session, address)
110 self._in_queue = Queue()
111
112 def call_handlers(self, msg):
113 #io.rprint('[[Rep]]', msg) # dbg
114 self._in_queue.put(msg)
115
116 def get_msg(self, block=True, timeout=None):
117 "Gets a message if there is one that is ready."
118 return self._in_queue.get(block, timeout)
119
120 def get_msgs(self):
121 """Get all messages that are currently ready."""
122 msgs = []
123 while True:
124 try:
125 msgs.append(self.get_msg(block=False))
126 except Empty:
127 break
128 return msgs
129
130 def msg_ready(self):
131 "Is there a message that has been received?"
132 return not self._in_queue.empty()
133
31
32 class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
33 pass
134
34
135 class BlockingHBSocketChannel(HBSocketChannel):
35 class BlockingHBSocketChannel(HBSocketChannel):
136
36
137 # This kernel needs quicker monitoring, shorten to 1 sec.
37 # This kernel needs quicker monitoring, shorten to 1 sec.
138 # less than 0.5s is unreliable, and will get occasional
38 # less than 0.5s is unreliable, and will get occasional
139 # false reports of missed beats.
39 # false reports of missed beats.
140 time_to_dead = 1.
40 time_to_dead = 1.
141
41
142 def call_handlers(self, since_last_heartbeat):
42 def call_handlers(self, since_last_heartbeat):
143 """pause beating on missed heartbeat"""
43 """ Pause beating on missed heartbeat. """
144 pass
44 pass
145
45
146
147 class BlockingKernelManager(KernelManager):
46 class BlockingKernelManager(KernelManager):
148
47
149 # The classes to use for the various channels.
48 # The classes to use for the various channels.
150 shell_channel_class = Type(BlockingShellSocketChannel)
49 shell_channel_class = Type(BlockingShellSocketChannel)
151 sub_channel_class = Type(BlockingSubSocketChannel)
50 sub_channel_class = Type(BlockingSubSocketChannel)
152 stdin_channel_class = Type(BlockingStdInSocketChannel)
51 stdin_channel_class = Type(BlockingStdInSocketChannel)
153 hb_channel_class = Type(BlockingHBSocketChannel)
52 hb_channel_class = Type(BlockingHBSocketChannel)
154
53
@@ -1,1000 +1,996 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Constants and exceptions
47 # Constants and exceptions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 class InvalidPortNumber(Exception):
50 class InvalidPortNumber(Exception):
51 pass
51 pass
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Utility functions
54 # Utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # some utilities to validate message structure, these might get moved elsewhere
57 # some utilities to validate message structure, these might get moved elsewhere
58 # if they prove to have more generic utility
58 # if they prove to have more generic utility
59
59
60 def validate_string_list(lst):
60 def validate_string_list(lst):
61 """Validate that the input is a list of strings.
61 """Validate that the input is a list of strings.
62
62
63 Raises ValueError if not."""
63 Raises ValueError if not."""
64 if not isinstance(lst, list):
64 if not isinstance(lst, list):
65 raise ValueError('input %r must be a list' % lst)
65 raise ValueError('input %r must be a list' % lst)
66 for x in lst:
66 for x in lst:
67 if not isinstance(x, basestring):
67 if not isinstance(x, basestring):
68 raise ValueError('element %r in list must be a string' % x)
68 raise ValueError('element %r in list must be a string' % x)
69
69
70
70
71 def validate_string_dict(dct):
71 def validate_string_dict(dct):
72 """Validate that the input is a dict with string keys and values.
72 """Validate that the input is a dict with string keys and values.
73
73
74 Raises ValueError if not."""
74 Raises ValueError if not."""
75 for k,v in dct.iteritems():
75 for k,v in dct.iteritems():
76 if not isinstance(k, basestring):
76 if not isinstance(k, basestring):
77 raise ValueError('key %r in dict must be a string' % k)
77 raise ValueError('key %r in dict must be a string' % k)
78 if not isinstance(v, basestring):
78 if not isinstance(v, basestring):
79 raise ValueError('value %r in dict must be a string' % v)
79 raise ValueError('value %r in dict must be a string' % v)
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # ZMQ Socket Channel classes
83 # ZMQ Socket Channel classes
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86 class ZMQSocketChannel(Thread):
86 class ZMQSocketChannel(Thread):
87 """The base class for the channels that use ZMQ sockets.
87 """The base class for the channels that use ZMQ sockets.
88 """
88 """
89 context = None
89 context = None
90 session = None
90 session = None
91 socket = None
91 socket = None
92 ioloop = None
92 ioloop = None
93 stream = None
93 stream = None
94 _address = None
94 _address = None
95 _exiting = False
95 _exiting = False
96
96
97 def __init__(self, context, session, address):
97 def __init__(self, context, session, address):
98 """Create a channel
98 """Create a channel
99
99
100 Parameters
100 Parameters
101 ----------
101 ----------
102 context : :class:`zmq.Context`
102 context : :class:`zmq.Context`
103 The ZMQ context to use.
103 The ZMQ context to use.
104 session : :class:`session.Session`
104 session : :class:`session.Session`
105 The session to use.
105 The session to use.
106 address : tuple
106 address : tuple
107 Standard (ip, port) tuple that the kernel is listening on.
107 Standard (ip, port) tuple that the kernel is listening on.
108 """
108 """
109 super(ZMQSocketChannel, self).__init__()
109 super(ZMQSocketChannel, self).__init__()
110 self.daemon = True
110 self.daemon = True
111
111
112 self.context = context
112 self.context = context
113 self.session = session
113 self.session = session
114 if address[1] == 0:
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
116 raise InvalidPortNumber(message)
117 self._address = address
117 self._address = address
118 atexit.register(self._notice_exit)
118 atexit.register(self._notice_exit)
119
119
120 def _notice_exit(self):
120 def _notice_exit(self):
121 self._exiting = True
121 self._exiting = True
122
122
123 def _run_loop(self):
123 def _run_loop(self):
124 """Run my loop, ignoring EINTR events in the poller"""
124 """Run my loop, ignoring EINTR events in the poller"""
125 while True:
125 while True:
126 try:
126 try:
127 self.ioloop.start()
127 self.ioloop.start()
128 except ZMQError as e:
128 except ZMQError as e:
129 if e.errno == errno.EINTR:
129 if e.errno == errno.EINTR:
130 continue
130 continue
131 else:
131 else:
132 raise
132 raise
133 except Exception:
133 except Exception:
134 if self._exiting:
134 if self._exiting:
135 break
135 break
136 else:
136 else:
137 raise
137 raise
138 else:
138 else:
139 break
139 break
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the channel's activity.
142 """Stop the channel's activity.
143
143
144 This calls :method:`Thread.join` and returns when the thread
144 This calls :method:`Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
145 terminates. :class:`RuntimeError` will be raised if
146 :method:`self.start` is called again.
146 :method:`self.start` is called again.
147 """
147 """
148 self.join()
148 self.join()
149
149
150 @property
150 @property
151 def address(self):
151 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
152 """Get the channel's address as an (ip, port) tuple.
153
153
154 By the default, the address is (localhost, 0), where 0 means a random
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
155 port.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """callback for stream.on_recv
174 """callback for stream.on_recv
175
175
176 unpacks message, and calls handlers with it.
176 unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 self.call_handlers(self.session.unserialize(smsg))
179 self.call_handlers(self.session.unserialize(smsg))
180
180
181
181
182
182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellSocketChannel(ZMQSocketChannel):
184 """The DEALER channel for issues request/replies to the kernel.
184 """The DEALER channel for issues request/replies to the kernel.
185 """
185 """
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190
190
191 def __init__(self, context, session, address):
191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellSocketChannel, self).__init__(context, session, address)
193 self.ioloop = ioloop.IOLoop()
193 self.ioloop = ioloop.IOLoop()
194
194
195 def run(self):
195 def run(self):
196 """The thread's main activity. Call start() instead."""
196 """The thread's main activity. Call start() instead."""
197 self.socket = self.context.socket(zmq.DEALER)
197 self.socket = self.context.socket(zmq.DEALER)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
199 self.socket.connect('tcp://%s:%i' % self.address)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 self.stream.on_recv(self._handle_recv)
201 self.stream.on_recv(self._handle_recv)
202 self._run_loop()
202 self._run_loop()
203 try:
203 try:
204 self.socket.close()
204 self.socket.close()
205 except:
205 except:
206 pass
206 pass
207
207
208 def stop(self):
208 def stop(self):
209 self.ioloop.stop()
209 self.ioloop.stop()
210 super(ShellSocketChannel, self).stop()
210 super(ShellSocketChannel, self).stop()
211
211
212 def call_handlers(self, msg):
212 def call_handlers(self, msg):
213 """This method is called in the ioloop thread when a message arrives.
213 """This method is called in the ioloop thread when a message arrives.
214
214
215 Subclasses should override this method to handle incoming messages.
215 Subclasses should override this method to handle incoming messages.
216 It is important to remember that this method is called in the thread
216 It is important to remember that this method is called in the thread
217 so that some logic must be done to ensure that the application leve
217 so that some logic must be done to ensure that the application leve
218 handlers are called in the application thread.
218 handlers are called in the application thread.
219 """
219 """
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221
221
222 def execute(self, code, silent=False, store_history=True,
222 def execute(self, code, silent=False, store_history=True,
223 user_variables=None, user_expressions=None, allow_stdin=None):
223 user_variables=None, user_expressions=None, allow_stdin=None):
224 """Execute code in the kernel.
224 """Execute code in the kernel.
225
225
226 Parameters
226 Parameters
227 ----------
227 ----------
228 code : str
228 code : str
229 A string of Python code.
229 A string of Python code.
230
230
231 silent : bool, optional (default False)
231 silent : bool, optional (default False)
232 If set, the kernel will execute the code as quietly possible, and
232 If set, the kernel will execute the code as quietly possible, and
233 will force store_history to be False.
233 will force store_history to be False.
234
234
235 store_history : bool, optional (default True)
235 store_history : bool, optional (default True)
236 If set, the kernel will store command history. This is forced
236 If set, the kernel will store command history. This is forced
237 to be False if silent is True.
237 to be False if silent is True.
238
238
239 user_variables : list, optional
239 user_variables : list, optional
240 A list of variable names to pull from the user's namespace. They
240 A list of variable names to pull from the user's namespace. They
241 will come back as a dict with these names as keys and their
241 will come back as a dict with these names as keys and their
242 :func:`repr` as values.
242 :func:`repr` as values.
243
243
244 user_expressions : dict, optional
244 user_expressions : dict, optional
245 A dict mapping names to expressions to be evaluated in the user's
245 A dict mapping names to expressions to be evaluated in the user's
246 dict. The expression values are returned as strings formatted using
246 dict. The expression values are returned as strings formatted using
247 :func:`repr`.
247 :func:`repr`.
248
248
249 allow_stdin : bool, optional (default self.allow_stdin)
249 allow_stdin : bool, optional (default self.allow_stdin)
250 Flag for whether the kernel can send stdin requests to frontends.
250 Flag for whether the kernel can send stdin requests to frontends.
251
251
252 Some frontends (e.g. the Notebook) do not support stdin requests.
252 Some frontends (e.g. the Notebook) do not support stdin requests.
253 If raw_input is called from code executed from such a frontend, a
253 If raw_input is called from code executed from such a frontend, a
254 StdinNotImplementedError will be raised.
254 StdinNotImplementedError will be raised.
255
255
256 Returns
256 Returns
257 -------
257 -------
258 The msg_id of the message sent.
258 The msg_id of the message sent.
259 """
259 """
260 if user_variables is None:
260 if user_variables is None:
261 user_variables = []
261 user_variables = []
262 if user_expressions is None:
262 if user_expressions is None:
263 user_expressions = {}
263 user_expressions = {}
264 if allow_stdin is None:
264 if allow_stdin is None:
265 allow_stdin = self.allow_stdin
265 allow_stdin = self.allow_stdin
266
266
267
267
268 # Don't waste network traffic if inputs are invalid
268 # Don't waste network traffic if inputs are invalid
269 if not isinstance(code, basestring):
269 if not isinstance(code, basestring):
270 raise ValueError('code %r must be a string' % code)
270 raise ValueError('code %r must be a string' % code)
271 validate_string_list(user_variables)
271 validate_string_list(user_variables)
272 validate_string_dict(user_expressions)
272 validate_string_dict(user_expressions)
273
273
274 # Create class for content/msg creation. Related to, but possibly
274 # Create class for content/msg creation. Related to, but possibly
275 # not in Session.
275 # not in Session.
276 content = dict(code=code, silent=silent, store_history=store_history,
276 content = dict(code=code, silent=silent, store_history=store_history,
277 user_variables=user_variables,
277 user_variables=user_variables,
278 user_expressions=user_expressions,
278 user_expressions=user_expressions,
279 allow_stdin=allow_stdin,
279 allow_stdin=allow_stdin,
280 )
280 )
281 msg = self.session.msg('execute_request', content)
281 msg = self.session.msg('execute_request', content)
282 self._queue_send(msg)
282 self._queue_send(msg)
283 return msg['header']['msg_id']
283 return msg['header']['msg_id']
284
284
285 def complete(self, text, line, cursor_pos, block=None):
285 def complete(self, text, line, cursor_pos, block=None):
286 """Tab complete text in the kernel's namespace.
286 """Tab complete text in the kernel's namespace.
287
287
288 Parameters
288 Parameters
289 ----------
289 ----------
290 text : str
290 text : str
291 The text to complete.
291 The text to complete.
292 line : str
292 line : str
293 The full line of text that is the surrounding context for the
293 The full line of text that is the surrounding context for the
294 text to complete.
294 text to complete.
295 cursor_pos : int
295 cursor_pos : int
296 The position of the cursor in the line where the completion was
296 The position of the cursor in the line where the completion was
297 requested.
297 requested.
298 block : str, optional
298 block : str, optional
299 The full block of code in which the completion is being requested.
299 The full block of code in which the completion is being requested.
300
300
301 Returns
301 Returns
302 -------
302 -------
303 The msg_id of the message sent.
303 The msg_id of the message sent.
304 """
304 """
305 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
305 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
306 msg = self.session.msg('complete_request', content)
306 msg = self.session.msg('complete_request', content)
307 self._queue_send(msg)
307 self._queue_send(msg)
308 return msg['header']['msg_id']
308 return msg['header']['msg_id']
309
309
310 def object_info(self, oname, detail_level=0):
310 def object_info(self, oname, detail_level=0):
311 """Get metadata information about an object.
311 """Get metadata information about an object.
312
312
313 Parameters
313 Parameters
314 ----------
314 ----------
315 oname : str
315 oname : str
316 A string specifying the object name.
316 A string specifying the object name.
317 detail_level : int, optional
317 detail_level : int, optional
318 The level of detail for the introspection (0-2)
318 The level of detail for the introspection (0-2)
319
319
320 Returns
320 Returns
321 -------
321 -------
322 The msg_id of the message sent.
322 The msg_id of the message sent.
323 """
323 """
324 content = dict(oname=oname, detail_level=detail_level)
324 content = dict(oname=oname, detail_level=detail_level)
325 msg = self.session.msg('object_info_request', content)
325 msg = self.session.msg('object_info_request', content)
326 self._queue_send(msg)
326 self._queue_send(msg)
327 return msg['header']['msg_id']
327 return msg['header']['msg_id']
328
328
329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 """Get entries from the history list.
330 """Get entries from the history list.
331
331
332 Parameters
332 Parameters
333 ----------
333 ----------
334 raw : bool
334 raw : bool
335 If True, return the raw input.
335 If True, return the raw input.
336 output : bool
336 output : bool
337 If True, then return the output as well.
337 If True, then return the output as well.
338 hist_access_type : str
338 hist_access_type : str
339 'range' (fill in session, start and stop params), 'tail' (fill in n)
339 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 or 'search' (fill in pattern param).
340 or 'search' (fill in pattern param).
341
341
342 session : int
342 session : int
343 For a range request, the session from which to get lines. Session
343 For a range request, the session from which to get lines. Session
344 numbers are positive integers; negative ones count back from the
344 numbers are positive integers; negative ones count back from the
345 current session.
345 current session.
346 start : int
346 start : int
347 The first line number of a history range.
347 The first line number of a history range.
348 stop : int
348 stop : int
349 The final (excluded) line number of a history range.
349 The final (excluded) line number of a history range.
350
350
351 n : int
351 n : int
352 The number of lines of history to get for a tail request.
352 The number of lines of history to get for a tail request.
353
353
354 pattern : str
354 pattern : str
355 The glob-syntax pattern for a search request.
355 The glob-syntax pattern for a search request.
356
356
357 Returns
357 Returns
358 -------
358 -------
359 The msg_id of the message sent.
359 The msg_id of the message sent.
360 """
360 """
361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 **kwargs)
362 **kwargs)
363 msg = self.session.msg('history_request', content)
363 msg = self.session.msg('history_request', content)
364 self._queue_send(msg)
364 self._queue_send(msg)
365 return msg['header']['msg_id']
365 return msg['header']['msg_id']
366
366
367 def shutdown(self, restart=False):
367 def shutdown(self, restart=False):
368 """Request an immediate kernel shutdown.
368 """Request an immediate kernel shutdown.
369
369
370 Upon receipt of the (empty) reply, client code can safely assume that
370 Upon receipt of the (empty) reply, client code can safely assume that
371 the kernel has shut down and it's safe to forcefully terminate it if
371 the kernel has shut down and it's safe to forcefully terminate it if
372 it's still alive.
372 it's still alive.
373
373
374 The kernel will send the reply via a function registered with Python's
374 The kernel will send the reply via a function registered with Python's
375 atexit module, ensuring it's truly done as the kernel is done with all
375 atexit module, ensuring it's truly done as the kernel is done with all
376 normal operation.
376 normal operation.
377 """
377 """
378 # Send quit message to kernel. Once we implement kernel-side setattr,
378 # Send quit message to kernel. Once we implement kernel-side setattr,
379 # this should probably be done that way, but for now this will do.
379 # this should probably be done that way, but for now this will do.
380 msg = self.session.msg('shutdown_request', {'restart':restart})
380 msg = self.session.msg('shutdown_request', {'restart':restart})
381 self._queue_send(msg)
381 self._queue_send(msg)
382 return msg['header']['msg_id']
382 return msg['header']['msg_id']
383
383
384
384
385
385
386 class SubSocketChannel(ZMQSocketChannel):
386 class SubSocketChannel(ZMQSocketChannel):
387 """The SUB channel which listens for messages that the kernel publishes.
387 """The SUB channel which listens for messages that the kernel publishes.
388 """
388 """
389
389
390 def __init__(self, context, session, address):
390 def __init__(self, context, session, address):
391 super(SubSocketChannel, self).__init__(context, session, address)
391 super(SubSocketChannel, self).__init__(context, session, address)
392 self.ioloop = ioloop.IOLoop()
392 self.ioloop = ioloop.IOLoop()
393
393
394 def run(self):
394 def run(self):
395 """The thread's main activity. Call start() instead."""
395 """The thread's main activity. Call start() instead."""
396 self.socket = self.context.socket(zmq.SUB)
396 self.socket = self.context.socket(zmq.SUB)
397 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
397 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
398 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
398 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
399 self.socket.connect('tcp://%s:%i' % self.address)
399 self.socket.connect('tcp://%s:%i' % self.address)
400 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
400 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
401 self.stream.on_recv(self._handle_recv)
401 self.stream.on_recv(self._handle_recv)
402 self._run_loop()
402 self._run_loop()
403 try:
403 try:
404 self.socket.close()
404 self.socket.close()
405 except:
405 except:
406 pass
406 pass
407
407
408 def stop(self):
408 def stop(self):
409 self.ioloop.stop()
409 self.ioloop.stop()
410 super(SubSocketChannel, self).stop()
410 super(SubSocketChannel, self).stop()
411
411
412 def call_handlers(self, msg):
412 def call_handlers(self, msg):
413 """This method is called in the ioloop thread when a message arrives.
413 """This method is called in the ioloop thread when a message arrives.
414
414
415 Subclasses should override this method to handle incoming messages.
415 Subclasses should override this method to handle incoming messages.
416 It is important to remember that this method is called in the thread
416 It is important to remember that this method is called in the thread
417 so that some logic must be done to ensure that the application leve
417 so that some logic must be done to ensure that the application leve
418 handlers are called in the application thread.
418 handlers are called in the application thread.
419 """
419 """
420 raise NotImplementedError('call_handlers must be defined in a subclass.')
420 raise NotImplementedError('call_handlers must be defined in a subclass.')
421
421
422 def flush(self, timeout=1.0):
422 def flush(self, timeout=1.0):
423 """Immediately processes all pending messages on the SUB channel.
423 """Immediately processes all pending messages on the SUB channel.
424
424
425 Callers should use this method to ensure that :method:`call_handlers`
425 Callers should use this method to ensure that :method:`call_handlers`
426 has been called for all messages that have been received on the
426 has been called for all messages that have been received on the
427 0MQ SUB socket of this channel.
427 0MQ SUB socket of this channel.
428
428
429 This method is thread safe.
429 This method is thread safe.
430
430
431 Parameters
431 Parameters
432 ----------
432 ----------
433 timeout : float, optional
433 timeout : float, optional
434 The maximum amount of time to spend flushing, in seconds. The
434 The maximum amount of time to spend flushing, in seconds. The
435 default is one second.
435 default is one second.
436 """
436 """
437 # We do the IOLoop callback process twice to ensure that the IOLoop
437 # We do the IOLoop callback process twice to ensure that the IOLoop
438 # gets to perform at least one full poll.
438 # gets to perform at least one full poll.
439 stop_time = time.time() + timeout
439 stop_time = time.time() + timeout
440 for i in xrange(2):
440 for i in xrange(2):
441 self._flushed = False
441 self._flushed = False
442 self.ioloop.add_callback(self._flush)
442 self.ioloop.add_callback(self._flush)
443 while not self._flushed and time.time() < stop_time:
443 while not self._flushed and time.time() < stop_time:
444 time.sleep(0.01)
444 time.sleep(0.01)
445
445
446 def _flush(self):
446 def _flush(self):
447 """Callback for :method:`self.flush`."""
447 """Callback for :method:`self.flush`."""
448 self.stream.flush()
448 self.stream.flush()
449 self._flushed = True
449 self._flushed = True
450
450
451
451
452 class StdInSocketChannel(ZMQSocketChannel):
452 class StdInSocketChannel(ZMQSocketChannel):
453 """A reply channel to handle raw_input requests that the kernel makes."""
453 """A reply channel to handle raw_input requests that the kernel makes."""
454
454
455 msg_queue = None
455 msg_queue = None
456
456
457 def __init__(self, context, session, address):
457 def __init__(self, context, session, address):
458 super(StdInSocketChannel, self).__init__(context, session, address)
458 super(StdInSocketChannel, self).__init__(context, session, address)
459 self.ioloop = ioloop.IOLoop()
459 self.ioloop = ioloop.IOLoop()
460
460
461 def run(self):
461 def run(self):
462 """The thread's main activity. Call start() instead."""
462 """The thread's main activity. Call start() instead."""
463 self.socket = self.context.socket(zmq.DEALER)
463 self.socket = self.context.socket(zmq.DEALER)
464 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 self.socket.connect('tcp://%s:%i' % self.address)
465 self.socket.connect('tcp://%s:%i' % self.address)
466 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
466 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
467 self.stream.on_recv(self._handle_recv)
467 self.stream.on_recv(self._handle_recv)
468 self._run_loop()
468 self._run_loop()
469 try:
469 try:
470 self.socket.close()
470 self.socket.close()
471 except:
471 except:
472 pass
472 pass
473
473
474
474
475 def stop(self):
475 def stop(self):
476 self.ioloop.stop()
476 self.ioloop.stop()
477 super(StdInSocketChannel, self).stop()
477 super(StdInSocketChannel, self).stop()
478
478
479 def call_handlers(self, msg):
479 def call_handlers(self, msg):
480 """This method is called in the ioloop thread when a message arrives.
480 """This method is called in the ioloop thread when a message arrives.
481
481
482 Subclasses should override this method to handle incoming messages.
482 Subclasses should override this method to handle incoming messages.
483 It is important to remember that this method is called in the thread
483 It is important to remember that this method is called in the thread
484 so that some logic must be done to ensure that the application leve
484 so that some logic must be done to ensure that the application leve
485 handlers are called in the application thread.
485 handlers are called in the application thread.
486 """
486 """
487 raise NotImplementedError('call_handlers must be defined in a subclass.')
487 raise NotImplementedError('call_handlers must be defined in a subclass.')
488
488
489 def input(self, string):
489 def input(self, string):
490 """Send a string of raw input to the kernel."""
490 """Send a string of raw input to the kernel."""
491 content = dict(value=string)
491 content = dict(value=string)
492 msg = self.session.msg('input_reply', content)
492 msg = self.session.msg('input_reply', content)
493 self._queue_send(msg)
493 self._queue_send(msg)
494
494
495
495
496 class HBSocketChannel(ZMQSocketChannel):
496 class HBSocketChannel(ZMQSocketChannel):
497 """The heartbeat channel which monitors the kernel heartbeat.
497 """The heartbeat channel which monitors the kernel heartbeat.
498
498
499 Note that the heartbeat channel is paused by default. As long as you start
499 Note that the heartbeat channel is paused by default. As long as you start
500 this channel, the kernel manager will ensure that it is paused and un-paused
500 this channel, the kernel manager will ensure that it is paused and un-paused
501 as appropriate.
501 as appropriate.
502 """
502 """
503
503
504 time_to_dead = 3.0
504 time_to_dead = 3.0
505 socket = None
505 socket = None
506 poller = None
506 poller = None
507 _running = None
507 _running = None
508 _pause = None
508 _pause = None
509 _beating = None
509 _beating = None
510
510
511 def __init__(self, context, session, address):
511 def __init__(self, context, session, address):
512 super(HBSocketChannel, self).__init__(context, session, address)
512 super(HBSocketChannel, self).__init__(context, session, address)
513 self._running = False
513 self._running = False
514 self._pause =True
514 self._pause =True
515 self.poller = zmq.Poller()
515 self.poller = zmq.Poller()
516
516
517 def _create_socket(self):
517 def _create_socket(self):
518 if self.socket is not None:
518 if self.socket is not None:
519 # close previous socket, before opening a new one
519 # close previous socket, before opening a new one
520 self.poller.unregister(self.socket)
520 self.poller.unregister(self.socket)
521 self.socket.close()
521 self.socket.close()
522 self.socket = self.context.socket(zmq.REQ)
522 self.socket = self.context.socket(zmq.REQ)
523 self.socket.setsockopt(zmq.LINGER, 0)
523 self.socket.setsockopt(zmq.LINGER, 0)
524 self.socket.connect('tcp://%s:%i' % self.address)
524 self.socket.connect('tcp://%s:%i' % self.address)
525
525
526 self.poller.register(self.socket, zmq.POLLIN)
526 self.poller.register(self.socket, zmq.POLLIN)
527
527
528 def _poll(self, start_time):
528 def _poll(self, start_time):
529 """poll for heartbeat replies until we reach self.time_to_dead
529 """poll for heartbeat replies until we reach self.time_to_dead
530
530
531 Ignores interrupts, and returns the result of poll(), which
531 Ignores interrupts, and returns the result of poll(), which
532 will be an empty list if no messages arrived before the timeout,
532 will be an empty list if no messages arrived before the timeout,
533 or the event tuple if there is a message to receive.
533 or the event tuple if there is a message to receive.
534 """
534 """
535
535
536 until_dead = self.time_to_dead - (time.time() - start_time)
536 until_dead = self.time_to_dead - (time.time() - start_time)
537 # ensure poll at least once
537 # ensure poll at least once
538 until_dead = max(until_dead, 1e-3)
538 until_dead = max(until_dead, 1e-3)
539 events = []
539 events = []
540 while True:
540 while True:
541 try:
541 try:
542 events = self.poller.poll(1000 * until_dead)
542 events = self.poller.poll(1000 * until_dead)
543 except ZMQError as e:
543 except ZMQError as e:
544 if e.errno == errno.EINTR:
544 if e.errno == errno.EINTR:
545 # ignore interrupts during heartbeat
545 # ignore interrupts during heartbeat
546 # this may never actually happen
546 # this may never actually happen
547 until_dead = self.time_to_dead - (time.time() - start_time)
547 until_dead = self.time_to_dead - (time.time() - start_time)
548 until_dead = max(until_dead, 1e-3)
548 until_dead = max(until_dead, 1e-3)
549 pass
549 pass
550 else:
550 else:
551 raise
551 raise
552 except Exception:
552 except Exception:
553 if self._exiting:
553 if self._exiting:
554 break
554 break
555 else:
555 else:
556 raise
556 raise
557 else:
557 else:
558 break
558 break
559 return events
559 return events
560
560
561 def run(self):
561 def run(self):
562 """The thread's main activity. Call start() instead."""
562 """The thread's main activity. Call start() instead."""
563 self._create_socket()
563 self._create_socket()
564 self._running = True
564 self._running = True
565 self._beating = True
565 self._beating = True
566
566
567 while self._running:
567 while self._running:
568 if self._pause:
568 if self._pause:
569 # just sleep, and skip the rest of the loop
569 # just sleep, and skip the rest of the loop
570 time.sleep(self.time_to_dead)
570 time.sleep(self.time_to_dead)
571 continue
571 continue
572
572
573 since_last_heartbeat = 0.0
573 since_last_heartbeat = 0.0
574 # io.rprint('Ping from HB channel') # dbg
574 # io.rprint('Ping from HB channel') # dbg
575 # no need to catch EFSM here, because the previous event was
575 # no need to catch EFSM here, because the previous event was
576 # either a recv or connect, which cannot be followed by EFSM
576 # either a recv or connect, which cannot be followed by EFSM
577 self.socket.send(b'ping')
577 self.socket.send(b'ping')
578 request_time = time.time()
578 request_time = time.time()
579 ready = self._poll(request_time)
579 ready = self._poll(request_time)
580 if ready:
580 if ready:
581 self._beating = True
581 self._beating = True
582 # the poll above guarantees we have something to recv
582 # the poll above guarantees we have something to recv
583 self.socket.recv()
583 self.socket.recv()
584 # sleep the remainder of the cycle
584 # sleep the remainder of the cycle
585 remainder = self.time_to_dead - (time.time() - request_time)
585 remainder = self.time_to_dead - (time.time() - request_time)
586 if remainder > 0:
586 if remainder > 0:
587 time.sleep(remainder)
587 time.sleep(remainder)
588 continue
588 continue
589 else:
589 else:
590 # nothing was received within the time limit, signal heart failure
590 # nothing was received within the time limit, signal heart failure
591 self._beating = False
591 self._beating = False
592 since_last_heartbeat = time.time() - request_time
592 since_last_heartbeat = time.time() - request_time
593 self.call_handlers(since_last_heartbeat)
593 self.call_handlers(since_last_heartbeat)
594 # and close/reopen the socket, because the REQ/REP cycle has been broken
594 # and close/reopen the socket, because the REQ/REP cycle has been broken
595 self._create_socket()
595 self._create_socket()
596 continue
596 continue
597 try:
597 try:
598 self.socket.close()
598 self.socket.close()
599 except:
599 except:
600 pass
600 pass
601
601
602 def pause(self):
602 def pause(self):
603 """Pause the heartbeat."""
603 """Pause the heartbeat."""
604 self._pause = True
604 self._pause = True
605
605
606 def unpause(self):
606 def unpause(self):
607 """Unpause the heartbeat."""
607 """Unpause the heartbeat."""
608 self._pause = False
608 self._pause = False
609
609
610 def is_beating(self):
610 def is_beating(self):
611 """Is the heartbeat running and responsive (and not paused)."""
611 """Is the heartbeat running and responsive (and not paused)."""
612 if self.is_alive() and not self._pause and self._beating:
612 if self.is_alive() and not self._pause and self._beating:
613 return True
613 return True
614 else:
614 else:
615 return False
615 return False
616
616
617 def stop(self):
617 def stop(self):
618 self._running = False
618 self._running = False
619 super(HBSocketChannel, self).stop()
619 super(HBSocketChannel, self).stop()
620
620
621 def call_handlers(self, since_last_heartbeat):
621 def call_handlers(self, since_last_heartbeat):
622 """This method is called in the ioloop thread when a message arrives.
622 """This method is called in the ioloop thread when a message arrives.
623
623
624 Subclasses should override this method to handle incoming messages.
624 Subclasses should override this method to handle incoming messages.
625 It is important to remember that this method is called in the thread
625 It is important to remember that this method is called in the thread
626 so that some logic must be done to ensure that the application level
626 so that some logic must be done to ensure that the application level
627 handlers are called in the application thread.
627 handlers are called in the application thread.
628 """
628 """
629 raise NotImplementedError('call_handlers must be defined in a subclass.')
629 raise NotImplementedError('call_handlers must be defined in a subclass.')
630
630
631
631
632 #-----------------------------------------------------------------------------
632 #-----------------------------------------------------------------------------
633 # Main kernel manager class
633 # Main kernel manager class
634 #-----------------------------------------------------------------------------
634 #-----------------------------------------------------------------------------
635
635
636 class KernelManager(HasTraits):
636 class KernelManager(HasTraits):
637 """ Manages a kernel for a frontend.
637 """ Manages a kernel for a frontend.
638
638
639 The SUB channel is for the frontend to receive messages published by the
639 The SUB channel is for the frontend to receive messages published by the
640 kernel.
640 kernel.
641
641
642 The REQ channel is for the frontend to make requests of the kernel.
642 The REQ channel is for the frontend to make requests of the kernel.
643
643
644 The REP channel is for the kernel to request stdin (raw_input) from the
644 The REP channel is for the kernel to request stdin (raw_input) from the
645 frontend.
645 frontend.
646 """
646 """
647 # config object for passing to child configurables
647 # config object for passing to child configurables
648 config = Instance(Config)
648 config = Instance(Config)
649
649
650 # The PyZMQ Context to use for communication with the kernel.
650 # The PyZMQ Context to use for communication with the kernel.
651 context = Instance(zmq.Context)
651 context = Instance(zmq.Context)
652 def _context_default(self):
652 def _context_default(self):
653 return zmq.Context.instance()
653 return zmq.Context.instance()
654
654
655 # The Session to use for communication with the kernel.
655 # The Session to use for communication with the kernel.
656 session = Instance(Session)
656 session = Instance(Session)
657 def _session_default(self):
658 return Session(config=self.config)
657
659
658 # The kernel process with which the KernelManager is communicating.
660 # The kernel process with which the KernelManager is communicating.
659 kernel = Instance(Popen)
661 kernel = Instance(Popen)
660
662
661 # The addresses for the communication channels.
663 # The addresses for the communication channels.
662 connection_file = Unicode('')
664 connection_file = Unicode('')
663 ip = Unicode(LOCALHOST)
665 ip = Unicode(LOCALHOST)
664 def _ip_changed(self, name, old, new):
666 def _ip_changed(self, name, old, new):
665 if new == '*':
667 if new == '*':
666 self.ip = '0.0.0.0'
668 self.ip = '0.0.0.0'
667 shell_port = Integer(0)
669 shell_port = Integer(0)
668 iopub_port = Integer(0)
670 iopub_port = Integer(0)
669 stdin_port = Integer(0)
671 stdin_port = Integer(0)
670 hb_port = Integer(0)
672 hb_port = Integer(0)
671
673
672 # The classes to use for the various channels.
674 # The classes to use for the various channels.
673 shell_channel_class = Type(ShellSocketChannel)
675 shell_channel_class = Type(ShellSocketChannel)
674 sub_channel_class = Type(SubSocketChannel)
676 sub_channel_class = Type(SubSocketChannel)
675 stdin_channel_class = Type(StdInSocketChannel)
677 stdin_channel_class = Type(StdInSocketChannel)
676 hb_channel_class = Type(HBSocketChannel)
678 hb_channel_class = Type(HBSocketChannel)
677
679
678 # Protected traits.
680 # Protected traits.
679 _launch_args = Any
681 _launch_args = Any
680 _shell_channel = Any
682 _shell_channel = Any
681 _sub_channel = Any
683 _sub_channel = Any
682 _stdin_channel = Any
684 _stdin_channel = Any
683 _hb_channel = Any
685 _hb_channel = Any
684 _connection_file_written=Bool(False)
686 _connection_file_written=Bool(False)
685
686 def __init__(self, **kwargs):
687 super(KernelManager, self).__init__(**kwargs)
688 if self.session is None:
689 self.session = Session(config=self.config)
690
687
691 def __del__(self):
688 def __del__(self):
692 self.cleanup_connection_file()
689 self.cleanup_connection_file()
693
690
694
695 #--------------------------------------------------------------------------
691 #--------------------------------------------------------------------------
696 # Channel management methods:
692 # Channel management methods:
697 #--------------------------------------------------------------------------
693 #--------------------------------------------------------------------------
698
694
699 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
695 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
700 """Starts the channels for this kernel.
696 """Starts the channels for this kernel.
701
697
702 This will create the channels if they do not exist and then start
698 This will create the channels if they do not exist and then start
703 them. If port numbers of 0 are being used (random ports) then you
699 them. If port numbers of 0 are being used (random ports) then you
704 must first call :method:`start_kernel`. If the channels have been
700 must first call :method:`start_kernel`. If the channels have been
705 stopped and you call this, :class:`RuntimeError` will be raised.
701 stopped and you call this, :class:`RuntimeError` will be raised.
706 """
702 """
707 if shell:
703 if shell:
708 self.shell_channel.start()
704 self.shell_channel.start()
709 if sub:
705 if sub:
710 self.sub_channel.start()
706 self.sub_channel.start()
711 if stdin:
707 if stdin:
712 self.stdin_channel.start()
708 self.stdin_channel.start()
713 self.shell_channel.allow_stdin = True
709 self.shell_channel.allow_stdin = True
714 else:
710 else:
715 self.shell_channel.allow_stdin = False
711 self.shell_channel.allow_stdin = False
716 if hb:
712 if hb:
717 self.hb_channel.start()
713 self.hb_channel.start()
718
714
719 def stop_channels(self):
715 def stop_channels(self):
720 """Stops all the running channels for this kernel.
716 """Stops all the running channels for this kernel.
721 """
717 """
722 if self.shell_channel.is_alive():
718 if self.shell_channel.is_alive():
723 self.shell_channel.stop()
719 self.shell_channel.stop()
724 if self.sub_channel.is_alive():
720 if self.sub_channel.is_alive():
725 self.sub_channel.stop()
721 self.sub_channel.stop()
726 if self.stdin_channel.is_alive():
722 if self.stdin_channel.is_alive():
727 self.stdin_channel.stop()
723 self.stdin_channel.stop()
728 if self.hb_channel.is_alive():
724 if self.hb_channel.is_alive():
729 self.hb_channel.stop()
725 self.hb_channel.stop()
730
726
731 @property
727 @property
732 def channels_running(self):
728 def channels_running(self):
733 """Are any of the channels created and running?"""
729 """Are any of the channels created and running?"""
734 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
730 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
735 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
731 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
736
732
737 #--------------------------------------------------------------------------
733 #--------------------------------------------------------------------------
738 # Kernel process management methods:
734 # Kernel process management methods:
739 #--------------------------------------------------------------------------
735 #--------------------------------------------------------------------------
740
736
741 def cleanup_connection_file(self):
737 def cleanup_connection_file(self):
742 """cleanup connection file *if we wrote it*
738 """cleanup connection file *if we wrote it*
743
739
744 Will not raise if the connection file was already removed somehow.
740 Will not raise if the connection file was already removed somehow.
745 """
741 """
746 if self._connection_file_written:
742 if self._connection_file_written:
747 # cleanup connection files on full shutdown of kernel we started
743 # cleanup connection files on full shutdown of kernel we started
748 self._connection_file_written = False
744 self._connection_file_written = False
749 try:
745 try:
750 os.remove(self.connection_file)
746 os.remove(self.connection_file)
751 except OSError:
747 except OSError:
752 pass
748 pass
753
749
754 def load_connection_file(self):
750 def load_connection_file(self):
755 """load connection info from JSON dict in self.connection_file"""
751 """load connection info from JSON dict in self.connection_file"""
756 with open(self.connection_file) as f:
752 with open(self.connection_file) as f:
757 cfg = json.loads(f.read())
753 cfg = json.loads(f.read())
758
754
759 self.ip = cfg['ip']
755 self.ip = cfg['ip']
760 self.shell_port = cfg['shell_port']
756 self.shell_port = cfg['shell_port']
761 self.stdin_port = cfg['stdin_port']
757 self.stdin_port = cfg['stdin_port']
762 self.iopub_port = cfg['iopub_port']
758 self.iopub_port = cfg['iopub_port']
763 self.hb_port = cfg['hb_port']
759 self.hb_port = cfg['hb_port']
764 self.session.key = str_to_bytes(cfg['key'])
760 self.session.key = str_to_bytes(cfg['key'])
765
761
766 def write_connection_file(self):
762 def write_connection_file(self):
767 """write connection info to JSON dict in self.connection_file"""
763 """write connection info to JSON dict in self.connection_file"""
768 if self._connection_file_written:
764 if self._connection_file_written:
769 return
765 return
770 self.connection_file,cfg = write_connection_file(self.connection_file,
766 self.connection_file,cfg = write_connection_file(self.connection_file,
771 ip=self.ip, key=self.session.key,
767 ip=self.ip, key=self.session.key,
772 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
768 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
773 shell_port=self.shell_port, hb_port=self.hb_port)
769 shell_port=self.shell_port, hb_port=self.hb_port)
774 # write_connection_file also sets default ports:
770 # write_connection_file also sets default ports:
775 self.shell_port = cfg['shell_port']
771 self.shell_port = cfg['shell_port']
776 self.stdin_port = cfg['stdin_port']
772 self.stdin_port = cfg['stdin_port']
777 self.iopub_port = cfg['iopub_port']
773 self.iopub_port = cfg['iopub_port']
778 self.hb_port = cfg['hb_port']
774 self.hb_port = cfg['hb_port']
779
775
780 self._connection_file_written = True
776 self._connection_file_written = True
781
777
782 def start_kernel(self, **kw):
778 def start_kernel(self, **kw):
783 """Starts a kernel process and configures the manager to use it.
779 """Starts a kernel process and configures the manager to use it.
784
780
785 If random ports (port=0) are being used, this method must be called
781 If random ports (port=0) are being used, this method must be called
786 before the channels are created.
782 before the channels are created.
787
783
788 Parameters:
784 Parameters:
789 -----------
785 -----------
790 launcher : callable, optional (default None)
786 launcher : callable, optional (default None)
791 A custom function for launching the kernel process (generally a
787 A custom function for launching the kernel process (generally a
792 wrapper around ``entry_point.base_launch_kernel``). In most cases,
788 wrapper around ``entry_point.base_launch_kernel``). In most cases,
793 it should not be necessary to use this parameter.
789 it should not be necessary to use this parameter.
794
790
795 **kw : optional
791 **kw : optional
796 See respective options for IPython and Python kernels.
792 See respective options for IPython and Python kernels.
797 """
793 """
798 if self.ip not in LOCAL_IPS:
794 if self.ip not in LOCAL_IPS:
799 raise RuntimeError("Can only launch a kernel on a local interface. "
795 raise RuntimeError("Can only launch a kernel on a local interface. "
800 "Make sure that the '*_address' attributes are "
796 "Make sure that the '*_address' attributes are "
801 "configured properly. "
797 "configured properly. "
802 "Currently valid addresses are: %s"%LOCAL_IPS
798 "Currently valid addresses are: %s"%LOCAL_IPS
803 )
799 )
804
800
805 # write connection file / get default ports
801 # write connection file / get default ports
806 self.write_connection_file()
802 self.write_connection_file()
807
803
808 self._launch_args = kw.copy()
804 self._launch_args = kw.copy()
809 launch_kernel = kw.pop('launcher', None)
805 launch_kernel = kw.pop('launcher', None)
810 if launch_kernel is None:
806 if launch_kernel is None:
811 from ipkernel import launch_kernel
807 from ipkernel import launch_kernel
812 self.kernel = launch_kernel(fname=self.connection_file, **kw)
808 self.kernel = launch_kernel(fname=self.connection_file, **kw)
813
809
814 def shutdown_kernel(self, restart=False):
810 def shutdown_kernel(self, restart=False):
815 """ Attempts to the stop the kernel process cleanly. If the kernel
811 """ Attempts to the stop the kernel process cleanly. If the kernel
816 cannot be stopped, it is killed, if possible.
812 cannot be stopped, it is killed, if possible.
817 """
813 """
818 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
814 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
819 if sys.platform == 'win32':
815 if sys.platform == 'win32':
820 self.kill_kernel()
816 self.kill_kernel()
821 return
817 return
822
818
823 # Pause the heart beat channel if it exists.
819 # Pause the heart beat channel if it exists.
824 if self._hb_channel is not None:
820 if self._hb_channel is not None:
825 self._hb_channel.pause()
821 self._hb_channel.pause()
826
822
827 # Don't send any additional kernel kill messages immediately, to give
823 # Don't send any additional kernel kill messages immediately, to give
828 # the kernel a chance to properly execute shutdown actions. Wait for at
824 # the kernel a chance to properly execute shutdown actions. Wait for at
829 # most 1s, checking every 0.1s.
825 # most 1s, checking every 0.1s.
830 self.shell_channel.shutdown(restart=restart)
826 self.shell_channel.shutdown(restart=restart)
831 for i in range(10):
827 for i in range(10):
832 if self.is_alive:
828 if self.is_alive:
833 time.sleep(0.1)
829 time.sleep(0.1)
834 else:
830 else:
835 break
831 break
836 else:
832 else:
837 # OK, we've waited long enough.
833 # OK, we've waited long enough.
838 if self.has_kernel:
834 if self.has_kernel:
839 self.kill_kernel()
835 self.kill_kernel()
840
836
841 if not restart and self._connection_file_written:
837 if not restart and self._connection_file_written:
842 # cleanup connection files on full shutdown of kernel we started
838 # cleanup connection files on full shutdown of kernel we started
843 self._connection_file_written = False
839 self._connection_file_written = False
844 try:
840 try:
845 os.remove(self.connection_file)
841 os.remove(self.connection_file)
846 except IOError:
842 except IOError:
847 pass
843 pass
848
844
849 def restart_kernel(self, now=False, **kw):
845 def restart_kernel(self, now=False, **kw):
850 """Restarts a kernel with the arguments that were used to launch it.
846 """Restarts a kernel with the arguments that were used to launch it.
851
847
852 If the old kernel was launched with random ports, the same ports will be
848 If the old kernel was launched with random ports, the same ports will be
853 used for the new kernel.
849 used for the new kernel.
854
850
855 Parameters
851 Parameters
856 ----------
852 ----------
857 now : bool, optional
853 now : bool, optional
858 If True, the kernel is forcefully restarted *immediately*, without
854 If True, the kernel is forcefully restarted *immediately*, without
859 having a chance to do any cleanup action. Otherwise the kernel is
855 having a chance to do any cleanup action. Otherwise the kernel is
860 given 1s to clean up before a forceful restart is issued.
856 given 1s to clean up before a forceful restart is issued.
861
857
862 In all cases the kernel is restarted, the only difference is whether
858 In all cases the kernel is restarted, the only difference is whether
863 it is given a chance to perform a clean shutdown or not.
859 it is given a chance to perform a clean shutdown or not.
864
860
865 **kw : optional
861 **kw : optional
866 Any options specified here will replace those used to launch the
862 Any options specified here will replace those used to launch the
867 kernel.
863 kernel.
868 """
864 """
869 if self._launch_args is None:
865 if self._launch_args is None:
870 raise RuntimeError("Cannot restart the kernel. "
866 raise RuntimeError("Cannot restart the kernel. "
871 "No previous call to 'start_kernel'.")
867 "No previous call to 'start_kernel'.")
872 else:
868 else:
873 # Stop currently running kernel.
869 # Stop currently running kernel.
874 if self.has_kernel:
870 if self.has_kernel:
875 if now:
871 if now:
876 self.kill_kernel()
872 self.kill_kernel()
877 else:
873 else:
878 self.shutdown_kernel(restart=True)
874 self.shutdown_kernel(restart=True)
879
875
880 # Start new kernel.
876 # Start new kernel.
881 self._launch_args.update(kw)
877 self._launch_args.update(kw)
882 self.start_kernel(**self._launch_args)
878 self.start_kernel(**self._launch_args)
883
879
884 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
880 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
885 # unless there is some delay here.
881 # unless there is some delay here.
886 if sys.platform == 'win32':
882 if sys.platform == 'win32':
887 time.sleep(0.2)
883 time.sleep(0.2)
888
884
889 @property
885 @property
890 def has_kernel(self):
886 def has_kernel(self):
891 """Returns whether a kernel process has been specified for the kernel
887 """Returns whether a kernel process has been specified for the kernel
892 manager.
888 manager.
893 """
889 """
894 return self.kernel is not None
890 return self.kernel is not None
895
891
896 def kill_kernel(self):
892 def kill_kernel(self):
897 """ Kill the running kernel. """
893 """ Kill the running kernel. """
898 if self.has_kernel:
894 if self.has_kernel:
899 # Pause the heart beat channel if it exists.
895 # Pause the heart beat channel if it exists.
900 if self._hb_channel is not None:
896 if self._hb_channel is not None:
901 self._hb_channel.pause()
897 self._hb_channel.pause()
902
898
903 # Attempt to kill the kernel.
899 # Attempt to kill the kernel.
904 try:
900 try:
905 self.kernel.kill()
901 self.kernel.kill()
906 except OSError as e:
902 except OSError as e:
907 # In Windows, we will get an Access Denied error if the process
903 # In Windows, we will get an Access Denied error if the process
908 # has already terminated. Ignore it.
904 # has already terminated. Ignore it.
909 if sys.platform == 'win32':
905 if sys.platform == 'win32':
910 if e.winerror != 5:
906 if e.winerror != 5:
911 raise
907 raise
912 # On Unix, we may get an ESRCH error if the process has already
908 # On Unix, we may get an ESRCH error if the process has already
913 # terminated. Ignore it.
909 # terminated. Ignore it.
914 else:
910 else:
915 from errno import ESRCH
911 from errno import ESRCH
916 if e.errno != ESRCH:
912 if e.errno != ESRCH:
917 raise
913 raise
918 self.kernel = None
914 self.kernel = None
919 else:
915 else:
920 raise RuntimeError("Cannot kill kernel. No kernel is running!")
916 raise RuntimeError("Cannot kill kernel. No kernel is running!")
921
917
922 def interrupt_kernel(self):
918 def interrupt_kernel(self):
923 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
919 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
924 well supported on all platforms.
920 well supported on all platforms.
925 """
921 """
926 if self.has_kernel:
922 if self.has_kernel:
927 if sys.platform == 'win32':
923 if sys.platform == 'win32':
928 from parentpoller import ParentPollerWindows as Poller
924 from parentpoller import ParentPollerWindows as Poller
929 Poller.send_interrupt(self.kernel.win32_interrupt_event)
925 Poller.send_interrupt(self.kernel.win32_interrupt_event)
930 else:
926 else:
931 self.kernel.send_signal(signal.SIGINT)
927 self.kernel.send_signal(signal.SIGINT)
932 else:
928 else:
933 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
929 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
934
930
935 def signal_kernel(self, signum):
931 def signal_kernel(self, signum):
936 """ Sends a signal to the kernel. Note that since only SIGTERM is
932 """ Sends a signal to the kernel. Note that since only SIGTERM is
937 supported on Windows, this function is only useful on Unix systems.
933 supported on Windows, this function is only useful on Unix systems.
938 """
934 """
939 if self.has_kernel:
935 if self.has_kernel:
940 self.kernel.send_signal(signum)
936 self.kernel.send_signal(signum)
941 else:
937 else:
942 raise RuntimeError("Cannot signal kernel. No kernel is running!")
938 raise RuntimeError("Cannot signal kernel. No kernel is running!")
943
939
944 @property
940 @property
945 def is_alive(self):
941 def is_alive(self):
946 """Is the kernel process still running?"""
942 """Is the kernel process still running?"""
947 if self.has_kernel:
943 if self.has_kernel:
948 if self.kernel.poll() is None:
944 if self.kernel.poll() is None:
949 return True
945 return True
950 else:
946 else:
951 return False
947 return False
952 elif self._hb_channel is not None:
948 elif self._hb_channel is not None:
953 # We didn't start the kernel with this KernelManager so we
949 # We didn't start the kernel with this KernelManager so we
954 # use the heartbeat.
950 # use the heartbeat.
955 return self._hb_channel.is_beating()
951 return self._hb_channel.is_beating()
956 else:
952 else:
957 # no heartbeat and not local, we can't tell if it's running,
953 # no heartbeat and not local, we can't tell if it's running,
958 # so naively return True
954 # so naively return True
959 return True
955 return True
960
956
961 #--------------------------------------------------------------------------
957 #--------------------------------------------------------------------------
962 # Channels used for communication with the kernel:
958 # Channels used for communication with the kernel:
963 #--------------------------------------------------------------------------
959 #--------------------------------------------------------------------------
964
960
965 @property
961 @property
966 def shell_channel(self):
962 def shell_channel(self):
967 """Get the REQ socket channel object to make requests of the kernel."""
963 """Get the REQ socket channel object to make requests of the kernel."""
968 if self._shell_channel is None:
964 if self._shell_channel is None:
969 self._shell_channel = self.shell_channel_class(self.context,
965 self._shell_channel = self.shell_channel_class(self.context,
970 self.session,
966 self.session,
971 (self.ip, self.shell_port))
967 (self.ip, self.shell_port))
972 return self._shell_channel
968 return self._shell_channel
973
969
974 @property
970 @property
975 def sub_channel(self):
971 def sub_channel(self):
976 """Get the SUB socket channel object."""
972 """Get the SUB socket channel object."""
977 if self._sub_channel is None:
973 if self._sub_channel is None:
978 self._sub_channel = self.sub_channel_class(self.context,
974 self._sub_channel = self.sub_channel_class(self.context,
979 self.session,
975 self.session,
980 (self.ip, self.iopub_port))
976 (self.ip, self.iopub_port))
981 return self._sub_channel
977 return self._sub_channel
982
978
983 @property
979 @property
984 def stdin_channel(self):
980 def stdin_channel(self):
985 """Get the REP socket channel object to handle stdin (raw_input)."""
981 """Get the REP socket channel object to handle stdin (raw_input)."""
986 if self._stdin_channel is None:
982 if self._stdin_channel is None:
987 self._stdin_channel = self.stdin_channel_class(self.context,
983 self._stdin_channel = self.stdin_channel_class(self.context,
988 self.session,
984 self.session,
989 (self.ip, self.stdin_port))
985 (self.ip, self.stdin_port))
990 return self._stdin_channel
986 return self._stdin_channel
991
987
992 @property
988 @property
993 def hb_channel(self):
989 def hb_channel(self):
994 """Get the heartbeat socket channel object to check that the
990 """Get the heartbeat socket channel object to check that the
995 kernel is alive."""
991 kernel is alive."""
996 if self._hb_channel is None:
992 if self._hb_channel is None:
997 self._hb_channel = self.hb_channel_class(self.context,
993 self._hb_channel = self.hb_channel_class(self.context,
998 self.session,
994 self.session,
999 (self.ip, self.hb_port))
995 (self.ip, self.hb_port))
1000 return self._hb_channel
996 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now