##// END OF EJS Templates
Refactor kernel managers in preparation for the EmbeddedKernel.
epatters -
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,75 b''
1 """ Implements a fully blocking kernel manager.
2
3 Useful for test suites and blocking terminal interfaces.
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2012 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
17 # Standard library imports.
18 import Queue
19 from threading import Event
20
21 # Local imports.
22 from IPython.utils.traitlets import Type
23 from kernelmanager import EmbeddedKernelManager, ShellEmbeddedChannel, \
24 SubEmbeddedChannel, StdInEmbeddedChannel
25
26 #-----------------------------------------------------------------------------
27 # Utility classes
28 #-----------------------------------------------------------------------------
29
30 class BlockingChannelMixin(object):
31
32 def __init__(self, *args, **kwds):
33 super(BlockingChannelMixin, self).__init__(*args, **kwds)
34 self._in_queue = Queue.Queue()
35
36 def call_handlers(self, msg):
37 self._in_queue.put(msg)
38
39 def get_msg(self, block=True, timeout=None):
40 """ Gets a message if there is one that is ready. """
41 return self._in_queue.get(block, timeout)
42
43 def get_msgs(self):
44 """ Get all messages that are currently ready. """
45 msgs = []
46 while True:
47 try:
48 msgs.append(self.get_msg(block=False))
49 except Queue.Empty:
50 break
51 return msgs
52
53 def msg_ready(self):
54 """ Is there a message that has been received? """
55 return not self._in_queue.empty()
56
57 #-----------------------------------------------------------------------------
58 # Blocking kernel manager
59 #-----------------------------------------------------------------------------
60
61 class BlockingShellEmbeddedChannel(BlockingChannelMixin, ShellEmbeddedChannel):
62 pass
63
64 class BlockingSubEmbeddedChannel(BlockingChannelMixin, SubEmbeddedChannel):
65 pass
66
67 class BlockingStdInEmbeddedChannel(BlockingChannelMixin, StdInEmbeddedChannel):
68 pass
69
70 class BlockingEmbeddedKernelManager(EmbeddedKernelManager):
71
72 # The classes to use for the various channels.
73 shell_channel_class = Type(BlockingShellEmbeddedChannel)
74 sub_channel_class = Type(BlockingSubEmbeddedChannel)
75 stdin_channel_class = Type(BlockingStdInEmbeddedChannel)
@@ -0,0 +1,22 b''
1 """ An embedded (in-process) kernel. """
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Local imports.
15 from IPython.zmq.ipkernel import Kernel
16
17 #-----------------------------------------------------------------------------
18 # Main kernel class
19 #-----------------------------------------------------------------------------
20
21 class EmbeddedKernel(Kernel):
22 pass
@@ -0,0 +1,398 b''
1 """ A kernel manager for embedded (in-process) kernels. """
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Local imports.
15 from IPython.config.loader import Config
16 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
17
18 #-----------------------------------------------------------------------------
19 # Channel classes
20 #-----------------------------------------------------------------------------
21
22 class EmbeddedChannel(object):
23 """ Base class for embedded channels.
24 """
25
26 def __init__(self, manager):
27 super(EmbeddedChannel, self).__init__()
28 self.manager = manager
29 self._is_alive = False
30
31 #--------------------------------------------------------------------------
32 # Channel interface
33 #--------------------------------------------------------------------------
34
35 def is_alive(self):
36 return self._is_alive
37
38 def start(self):
39 self._is_alive = True
40
41 def stop(self):
42 self._is_alive = False
43
44 def call_handlers(self, msg):
45 """ This method is called in the main thread when a message arrives.
46
47 Subclasses should override this method to handle incoming messages.
48 """
49 raise NotImplementedError('call_handlers must be defined in a subclass.')
50
51 #--------------------------------------------------------------------------
52 # EmbeddedChannel interface
53 #--------------------------------------------------------------------------
54
55 def call_handlers_later(self, *args, **kwds):
56 """ Call the message handlers later.
57
58 The default implementation just calls the handlers immediately, but this
59 method exists so that GUI toolkits can defer calling the handlers until
60 after the event loop has run, as expected by GUI frontends.
61 """
62 self.call_handlers(*args, **kwds)
63
64 def process_events(self):
65 """ Process any pending GUI events.
66
67 This method will be never be called from a frontend without an event
68 loop (e.g., a terminal frontend).
69 """
70 raise NotImplementedError
71
72
73 class ShellEmbeddedChannel(EmbeddedChannel):
74 """The DEALER channel for issues request/replies to the kernel.
75 """
76
77 # flag for whether execute requests should be allowed to call raw_input
78 allow_stdin = True
79
80 def execute(self, code, silent=False, store_history=True,
81 user_variables=[], user_expressions={}, allow_stdin=None):
82 """Execute code in the kernel.
83
84 Parameters
85 ----------
86 code : str
87 A string of Python code.
88
89 silent : bool, optional (default False)
90 If set, the kernel will execute the code as quietly possible, and
91 will force store_history to be False.
92
93 store_history : bool, optional (default True)
94 If set, the kernel will store command history. This is forced
95 to be False if silent is True.
96
97 user_variables : list, optional
98 A list of variable names to pull from the user's namespace. They
99 will come back as a dict with these names as keys and their
100 :func:`repr` as values.
101
102 user_expressions : dict, optional
103 A dict mapping names to expressions to be evaluated in the user's
104 dict. The expression values are returned as strings formatted using
105 :func:`repr`.
106
107 allow_stdin : bool, optional (default self.allow_stdin)
108 Flag for whether the kernel can send stdin requests to frontends.
109
110 Some frontends (e.g. the Notebook) do not support stdin requests.
111 If raw_input is called from code executed from such a frontend, a
112 StdinNotImplementedError will be raised.
113
114 Returns
115 -------
116 The msg_id of the message sent.
117 """
118 raise NotImplementedError
119
120 def complete(self, text, line, cursor_pos, block=None):
121 """Tab complete text in the kernel's namespace.
122
123 Parameters
124 ----------
125 text : str
126 The text to complete.
127 line : str
128 The full line of text that is the surrounding context for the
129 text to complete.
130 cursor_pos : int
131 The position of the cursor in the line where the completion was
132 requested.
133 block : str, optional
134 The full block of code in which the completion is being requested.
135
136 Returns
137 -------
138 The msg_id of the message sent.
139 """
140 raise NotImplementedError
141
142 def object_info(self, oname, detail_level=0):
143 """Get metadata information about an object.
144
145 Parameters
146 ----------
147 oname : str
148 A string specifying the object name.
149 detail_level : int, optional
150 The level of detail for the introspection (0-2)
151
152 Returns
153 -------
154 The msg_id of the message sent.
155 """
156 raise NotImplementedError
157
158 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
159 """Get entries from the history list.
160
161 Parameters
162 ----------
163 raw : bool
164 If True, return the raw input.
165 output : bool
166 If True, then return the output as well.
167 hist_access_type : str
168 'range' (fill in session, start and stop params), 'tail' (fill in n)
169 or 'search' (fill in pattern param).
170
171 session : int
172 For a range request, the session from which to get lines. Session
173 numbers are positive integers; negative ones count back from the
174 current session.
175 start : int
176 The first line number of a history range.
177 stop : int
178 The final (excluded) line number of a history range.
179
180 n : int
181 The number of lines of history to get for a tail request.
182
183 pattern : str
184 The glob-syntax pattern for a search request.
185
186 Returns
187 -------
188 The msg_id of the message sent.
189 """
190 raise NotImplementedError
191
192 def shutdown(self, restart=False):
193 """ Request an immediate kernel shutdown.
194
195 A dummy method for the embedded kernel.
196 """
197 # FIXME: What to do here?
198 raise NotImplementedError('Shutdown not supported for embedded kernel')
199
200
201 class SubEmbeddedChannel(EmbeddedChannel):
202 """The SUB channel which listens for messages that the kernel publishes.
203 """
204
205 def flush(self, timeout=1.0):
206 """ Immediately processes all pending messages on the SUB channel.
207
208 A dummy method for the embedded kernel.
209 """
210 pass
211
212
213 class StdInEmbeddedChannel(EmbeddedChannel):
214 """ A reply channel to handle raw_input requests that the kernel makes. """
215
216 def input(self, string):
217 """ Send a string of raw input to the kernel.
218 """
219 raise NotImplementedError
220
221
222 class HBEmbeddedChannel(EmbeddedChannel):
223 """ A dummy heartbeat channel. """
224
225 time_to_dead = 3.0
226
227 def __init__(self, *args, **kwds):
228 super(HBEmbeddedChannel, self).__init__(*args, **kwds)
229 self._pause = True
230
231 def pause(self):
232 """ Pause the heartbeat. """
233 self._pause = True
234
235 def unpause(self):
236 """ Unpause the heartbeat. """
237 self._pause = False
238
239 def is_beating(self):
240 """ Is the heartbeat running and responsive (and not paused). """
241 return not self._pause
242
243
244 #-----------------------------------------------------------------------------
245 # Main kernel manager class
246 #-----------------------------------------------------------------------------
247
248 class EmbeddedKernelManager(HasTraits):
249 """ A manager for an embedded kernel.
250
251 This class implements most of the interface of
252 ``IPython.zmq.kernelmanager.KernelManager`` and allows (asynchronous)
253 frontends to be used seamlessly with an in-process kernel.
254 """
255 # Config object for passing to child configurables
256 config = Instance(Config)
257
258 # The Session to use for building messages.
259 session = Instance('IPython.zmq.session.Session')
260 def _session_default(self):
261 from IPython.zmq.session import Session
262 return Session(config=self.config)
263
264 # The kernel process with which the KernelManager is communicating.
265 kernel = Instance('IPython.embedded.ipkernel.EmbeddedKernel')
266
267 # The classes to use for the various channels.
268 shell_channel_class = Type(ShellEmbeddedChannel)
269 sub_channel_class = Type(SubEmbeddedChannel)
270 stdin_channel_class = Type(StdInEmbeddedChannel)
271 hb_channel_class = Type(HBEmbeddedChannel)
272
273 # Protected traits.
274 _shell_channel = Any
275 _sub_channel = Any
276 _stdin_channel = Any
277 _hb_channel = Any
278
279 #--------------------------------------------------------------------------
280 # Channel management methods:
281 #--------------------------------------------------------------------------
282
283 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
284 """ Starts the channels for this kernel.
285 """
286 if shell:
287 self.shell_channel.start()
288 if sub:
289 self.sub_channel.start()
290 if stdin:
291 self.stdin_channel.start()
292 self.shell_channel.allow_stdin = True
293 else:
294 self.shell_channel.allow_stdin = False
295 if hb:
296 self.hb_channel.start()
297
298 def stop_channels(self):
299 """ Stops all the running channels for this kernel.
300 """
301 if self.shell_channel.is_alive():
302 self.shell_channel.stop()
303 if self.sub_channel.is_alive():
304 self.sub_channel.stop()
305 if self.stdin_channel.is_alive():
306 self.stdin_channel.stop()
307 if self.hb_channel.is_alive():
308 self.hb_channel.stop()
309
310 @property
311 def channels_running(self):
312 """ Are any of the channels created and running? """
313 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
314 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
315
316 #--------------------------------------------------------------------------
317 # Kernel management methods:
318 #--------------------------------------------------------------------------
319
320 def start_kernel(self, **kwds):
321 """ Starts a kernel process and configures the manager to use it.
322 """
323 from IPython.embedded.ipkernel import EmbeddedKernel
324 self.kernel = EmbeddedKernel()
325 self.kernel.frontends.append(self)
326
327 def shutdown_kernel(self):
328 """ Attempts to the stop the kernel process cleanly. If the kernel
329 cannot be stopped and the kernel is local, it is killed.
330 """
331 self.kill_kernel()
332
333 def restart_kernel(self, now=False, **kwds):
334 """ Restarts a kernel with the arguments that were used to launch it.
335
336 The 'now' parameter is ignored.
337 """
338 self.shutdown_kernel()
339 self.start_kernel(**kwds)
340
341 @property
342 def has_kernel(self):
343 """ Returns whether a kernel process has been specified for the kernel
344 manager.
345 """
346 return self.kernel is not None
347
348 def kill_kernel(self):
349 """ Kill the running kernel.
350 """
351 self.kernel.frontends.remove(self)
352 self.kernel = None
353
354 def interrupt_kernel(self):
355 """ Interrupts the kernel. """
356 raise NotImplementedError("Cannot interrupt embedded kernel.")
357
358 def signal_kernel(self, signum):
359 """ Sends a signal to the kernel. """
360 raise NotImplementedError("Cannot signal embedded kernel.")
361
362 @property
363 def is_alive(self):
364 """ Is the kernel process still running? """
365 return True
366
367 #--------------------------------------------------------------------------
368 # Channels used for communication with the kernel:
369 #--------------------------------------------------------------------------
370
371 @property
372 def shell_channel(self):
373 """Get the REQ socket channel object to make requests of the kernel."""
374 if self._shell_channel is None:
375 self._shell_channel = self.shell_channel_class(self)
376 return self._shell_channel
377
378 @property
379 def sub_channel(self):
380 """Get the SUB socket channel object."""
381 if self._sub_channel is None:
382 self._sub_channel = self.sub_channel_class(self)
383 return self._sub_channel
384
385 @property
386 def stdin_channel(self):
387 """Get the REP socket channel object to handle stdin (raw_input)."""
388 if self._stdin_channel is None:
389 self._stdin_channel = self.stdin_channel_class(self)
390 return self._stdin_channel
391
392 @property
393 def hb_channel(self):
394 """Get the heartbeat socket channel object to check that the
395 kernel is alive."""
396 if self._hb_channel is None:
397 self._hb_channel = self.hb_channel_class(self)
398 return self._hb_channel
@@ -0,0 +1,258 b''
1 """ Defines a KernelManager that provides signals and slots.
2 """
3
4 # System library imports.
5 from IPython.external.qt import QtCore
6
7 # IPython imports.
8 from IPython.utils.traitlets import Type
9 from util import SuperQObject
10
11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 #---------------------------------------------------------------------------
21 # Channel interface
22 #---------------------------------------------------------------------------
23
24 def start(self):
25 """ Reimplemented to emit signal.
26 """
27 super(ChannelQObject, self).start()
28 self.started.emit()
29
30 def stop(self):
31 """ Reimplemented to emit signal.
32 """
33 super(ChannelQObject, self).stop()
34 self.stopped.emit()
35
36 #---------------------------------------------------------------------------
37 # EmbeddedChannel interface
38 #---------------------------------------------------------------------------
39
40 def call_handlers_later(self, *args, **kwds):
41 """ Call the message handlers later.
42 """
43 do_later = lambda: self.call_handlers(*args, **kwds)
44 QtCore.QTimer.singleShot(0, do_later)
45
46 def process_events(self):
47 """ Process any pending GUI events.
48 """
49 QtCore.QCoreApplication.instance().processEvents()
50
51
52 class QtShellChannelMixin(ChannelQObject):
53
54 # Emitted when any message is received.
55 message_received = QtCore.Signal(object)
56
57 # Emitted when a reply has been received for the corresponding request
58 # type.
59 execute_reply = QtCore.Signal(object)
60 complete_reply = QtCore.Signal(object)
61 object_info_reply = QtCore.Signal(object)
62 history_reply = QtCore.Signal(object)
63
64 # Emitted when the first reply comes back.
65 first_reply = QtCore.Signal()
66
67 # Used by the first_reply signal logic to determine if a reply is the
68 # first.
69 _handlers_called = False
70
71 #---------------------------------------------------------------------------
72 # 'ShellSocketChannel' interface
73 #---------------------------------------------------------------------------
74
75 def call_handlers(self, msg):
76 """ Reimplemented to emit signals instead of making callbacks.
77 """
78 # Emit the generic signal.
79 self.message_received.emit(msg)
80
81 # Emit signals for specialized message types.
82 msg_type = msg['header']['msg_type']
83 signal = getattr(self, msg_type, None)
84 if signal:
85 signal.emit(msg)
86
87 if not self._handlers_called:
88 self.first_reply.emit()
89 self._handlers_called = True
90
91 #---------------------------------------------------------------------------
92 # 'QtShellChannelMixin' interface
93 #---------------------------------------------------------------------------
94
95 def reset_first_reply(self):
96 """ Reset the first_reply signal to fire again on the next reply.
97 """
98 self._handlers_called = False
99
100
101 class QtSubChannelMixin(ChannelQObject):
102
103 # Emitted when any message is received.
104 message_received = QtCore.Signal(object)
105
106 # Emitted when a message of type 'stream' is received.
107 stream_received = QtCore.Signal(object)
108
109 # Emitted when a message of type 'pyin' is received.
110 pyin_received = QtCore.Signal(object)
111
112 # Emitted when a message of type 'pyout' is received.
113 pyout_received = QtCore.Signal(object)
114
115 # Emitted when a message of type 'pyerr' is received.
116 pyerr_received = QtCore.Signal(object)
117
118 # Emitted when a message of type 'display_data' is received
119 display_data_received = QtCore.Signal(object)
120
121 # Emitted when a crash report message is received from the kernel's
122 # last-resort sys.excepthook.
123 crash_received = QtCore.Signal(object)
124
125 # Emitted when a shutdown is noticed.
126 shutdown_reply_received = QtCore.Signal(object)
127
128 #---------------------------------------------------------------------------
129 # 'SubSocketChannel' interface
130 #---------------------------------------------------------------------------
131
132 def call_handlers(self, msg):
133 """ Reimplemented to emit signals instead of making callbacks.
134 """
135 # Emit the generic signal.
136 self.message_received.emit(msg)
137 # Emit signals for specialized message types.
138 msg_type = msg['header']['msg_type']
139 signal = getattr(self, msg_type + '_received', None)
140 if signal:
141 signal.emit(msg)
142 elif msg_type in ('stdout', 'stderr'):
143 self.stream_received.emit(msg)
144
145 def flush(self):
146 """ Reimplemented to ensure that signals are dispatched immediately.
147 """
148 super(QtSubChannelMixin, self).flush()
149 QtCore.QCoreApplication.instance().processEvents()
150
151
152 class QtStdInChannelMixin(ChannelQObject):
153
154 # Emitted when any message is received.
155 message_received = QtCore.Signal(object)
156
157 # Emitted when an input request is received.
158 input_requested = QtCore.Signal(object)
159
160 #---------------------------------------------------------------------------
161 # 'StdInSocketChannel' interface
162 #---------------------------------------------------------------------------
163
164 def call_handlers(self, msg):
165 """ Reimplemented to emit signals instead of making callbacks.
166 """
167 # Emit the generic signal.
168 self.message_received.emit(msg)
169
170 # Emit signals for specialized message types.
171 msg_type = msg['header']['msg_type']
172 if msg_type == 'input_request':
173 self.input_requested.emit(msg)
174
175
176 class QtHBChannelMixin(ChannelQObject):
177
178 # Emitted when the kernel has died.
179 kernel_died = QtCore.Signal(object)
180
181 #---------------------------------------------------------------------------
182 # 'HBSocketChannel' interface
183 #---------------------------------------------------------------------------
184
185 def call_handlers(self, since_last_heartbeat):
186 """ Reimplemented to emit signals instead of making callbacks.
187 """
188 # Emit the generic signal.
189 self.kernel_died.emit(since_last_heartbeat)
190
191
192 class QtKernelManagerMixin(object):
193 """ A KernelManager that provides signals and slots.
194 """
195
196 # Emitted when the kernel manager has started listening.
197 started_kernel = QtCore.Signal()
198
199 # Emitted when the kernel manager has started listening.
200 started_channels = QtCore.Signal()
201
202 # Emitted when the kernel manager has stopped listening.
203 stopped_channels = QtCore.Signal()
204
205 # Use Qt-specific channel classes that emit signals.
206 sub_channel_class = Type(QtSubChannelMixin)
207 shell_channel_class = Type(QtShellChannelMixin)
208 stdin_channel_class = Type(QtStdInChannelMixin)
209 hb_channel_class = Type(QtHBChannelMixin)
210
211 #---------------------------------------------------------------------------
212 # 'KernelManager' interface
213 #---------------------------------------------------------------------------
214
215 #------ Kernel process management ------------------------------------------
216
217 def start_kernel(self, *args, **kw):
218 """ Reimplemented for proper heartbeat management.
219 """
220 if self._shell_channel is not None:
221 self._shell_channel.reset_first_reply()
222 super(QtKernelManagerMixin, self).start_kernel(*args, **kw)
223 self.started_kernel.emit()
224
225 #------ Channel management -------------------------------------------------
226
227 def start_channels(self, *args, **kw):
228 """ Reimplemented to emit signal.
229 """
230 super(QtKernelManagerMixin, self).start_channels(*args, **kw)
231 self.started_channels.emit()
232
233 def stop_channels(self):
234 """ Reimplemented to emit signal.
235 """
236 super(QtKernelManagerMixin, self).stop_channels()
237 self.stopped_channels.emit()
238
239 @property
240 def shell_channel(self):
241 """ Reimplemented for proper heartbeat management.
242 """
243 if self._shell_channel is None:
244 self._shell_channel = super(QtKernelManagerMixin,self).shell_channel
245 self._shell_channel.first_reply.connect(self._first_reply)
246 return self._shell_channel
247
248 #---------------------------------------------------------------------------
249 # Protected interface
250 #---------------------------------------------------------------------------
251
252 def _first_reply(self):
253 """ Unpauses the heartbeat channel when the first reply is received on
254 the execute channel. Note that this will *not* start the heartbeat
255 channel if it is not already running!
256 """
257 if self._hb_channel is not None:
258 self._hb_channel.unpause()
@@ -0,0 +1,37 b''
1 """ Defines an embedded KernelManager that provides signals and slots.
2 """
3
4 # Local imports.
5 from IPython.embedded.kernelmanager import \
6 ShellEmbeddedChannel, SubEmbeddedChannel, StdInEmbeddedChannel, \
7 HBEmbeddedChannel, EmbeddedKernelManager
8 from IPython.utils.traitlets import Type
9 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 from util import MetaQObjectHasTraits, SuperQObject
12
13
14 class QtShellEmbeddedChannel(QtShellChannelMixin, ShellEmbeddedChannel):
15 pass
16
17 class QtSubEmbeddedChannel(QtSubChannelMixin, SubEmbeddedChannel):
18 pass
19
20 class QtStdInEmbeddedChannel(QtStdInChannelMixin, StdInEmbeddedChannel):
21 pass
22
23 class QtHBEmbeddedChannel(QtHBChannelMixin, HBEmbeddedChannel):
24 pass
25
26
27 class QtEmbeddedKernelManager(QtKernelManagerMixin,
28 EmbeddedKernelManager, SuperQObject):
29 """ An embedded KernelManager that provides signals and slots.
30 """
31
32 __metaclass__ = MetaQObjectHasTraits
33
34 sub_channel_class = Type(QtSubEmbeddedChannel)
35 shell_channel_class = Type(QtShellEmbeddedChannel)
36 stdin_channel_class = Type(QtStdInEmbeddedChannel)
37 hb_channel_class = Type(QtHBEmbeddedChannel)
@@ -1,247 +1,35 b''
1 1 """ Defines a KernelManager that provides signals and slots.
2 2 """
3 3
4 # System library imports.
5 from IPython.external.qt import QtCore
6
7 # IPython imports.
4 # Local imports.
8 5 from IPython.utils.traitlets import Type
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
10 ShellSocketChannel, StdInSocketChannel, HBSocketChannel
6 from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
7 StdInSocketChannel, HBSocketChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 10 from util import MetaQObjectHasTraits, SuperQObject
12 11
13 12
14 class SocketChannelQObject(SuperQObject):
15
16 # Emitted when the channel is started.
17 started = QtCore.Signal()
18
19 # Emitted when the channel is stopped.
20 stopped = QtCore.Signal()
21
22 #---------------------------------------------------------------------------
23 # 'ZMQSocketChannel' interface
24 #---------------------------------------------------------------------------
25
26 def start(self):
27 """ Reimplemented to emit signal.
28 """
29 super(SocketChannelQObject, self).start()
30 self.started.emit()
31
32 def stop(self):
33 """ Reimplemented to emit signal.
34 """
35 super(SocketChannelQObject, self).stop()
36 self.stopped.emit()
37
38
39 class QtShellSocketChannel(SocketChannelQObject, ShellSocketChannel):
40
41 # Emitted when any message is received.
42 message_received = QtCore.Signal(object)
43
44 # Emitted when a reply has been received for the corresponding request
45 # type.
46 execute_reply = QtCore.Signal(object)
47 complete_reply = QtCore.Signal(object)
48 object_info_reply = QtCore.Signal(object)
49 history_reply = QtCore.Signal(object)
50
51 # Emitted when the first reply comes back.
52 first_reply = QtCore.Signal()
53
54 # Used by the first_reply signal logic to determine if a reply is the
55 # first.
56 _handlers_called = False
57
58 #---------------------------------------------------------------------------
59 # 'ShellSocketChannel' interface
60 #---------------------------------------------------------------------------
61
62 def call_handlers(self, msg):
63 """ Reimplemented to emit signals instead of making callbacks.
64 """
65 # Emit the generic signal.
66 self.message_received.emit(msg)
67
68 # Emit signals for specialized message types.
69 msg_type = msg['header']['msg_type']
70 signal = getattr(self, msg_type, None)
71 if signal:
72 signal.emit(msg)
73
74 if not self._handlers_called:
75 self.first_reply.emit()
76 self._handlers_called = True
77
78 #---------------------------------------------------------------------------
79 # 'QtShellSocketChannel' interface
80 #---------------------------------------------------------------------------
81
82 def reset_first_reply(self):
83 """ Reset the first_reply signal to fire again on the next reply.
84 """
85 self._handlers_called = False
86
87
88 class QtSubSocketChannel(SocketChannelQObject, SubSocketChannel):
89
90 # Emitted when any message is received.
91 message_received = QtCore.Signal(object)
92
93 # Emitted when a message of type 'stream' is received.
94 stream_received = QtCore.Signal(object)
95
96 # Emitted when a message of type 'pyin' is received.
97 pyin_received = QtCore.Signal(object)
98
99 # Emitted when a message of type 'pyout' is received.
100 pyout_received = QtCore.Signal(object)
101
102 # Emitted when a message of type 'pyerr' is received.
103 pyerr_received = QtCore.Signal(object)
104
105 # Emitted when a message of type 'display_data' is received
106 display_data_received = QtCore.Signal(object)
13 class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
14 pass
107 15
108 # Emitted when a crash report message is received from the kernel's
109 # last-resort sys.excepthook.
110 crash_received = QtCore.Signal(object)
16 class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
17 pass
111 18
112 # Emitted when a shutdown is noticed.
113 shutdown_reply_received = QtCore.Signal(object)
19 class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
20 pass
114 21
115 #---------------------------------------------------------------------------
116 # 'SubSocketChannel' interface
117 #---------------------------------------------------------------------------
22 class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
23 pass
118 24
119 def call_handlers(self, msg):
120 """ Reimplemented to emit signals instead of making callbacks.
121 """
122 # Emit the generic signal.
123 self.message_received.emit(msg)
124 # Emit signals for specialized message types.
125 msg_type = msg['header']['msg_type']
126 signal = getattr(self, msg_type + '_received', None)
127 if signal:
128 signal.emit(msg)
129 elif msg_type in ('stdout', 'stderr'):
130 self.stream_received.emit(msg)
131 25
132 def flush(self):
133 """ Reimplemented to ensure that signals are dispatched immediately.
134 """
135 super(QtSubSocketChannel, self).flush()
136 QtCore.QCoreApplication.instance().processEvents()
137
138
139 class QtStdInSocketChannel(SocketChannelQObject, StdInSocketChannel):
140
141 # Emitted when any message is received.
142 message_received = QtCore.Signal(object)
143
144 # Emitted when an input request is received.
145 input_requested = QtCore.Signal(object)
146
147 #---------------------------------------------------------------------------
148 # 'StdInSocketChannel' interface
149 #---------------------------------------------------------------------------
150
151 def call_handlers(self, msg):
152 """ Reimplemented to emit signals instead of making callbacks.
153 """
154 # Emit the generic signal.
155 self.message_received.emit(msg)
156
157 # Emit signals for specialized message types.
158 msg_type = msg['header']['msg_type']
159 if msg_type == 'input_request':
160 self.input_requested.emit(msg)
161
162
163 class QtHBSocketChannel(SocketChannelQObject, HBSocketChannel):
164
165 # Emitted when the kernel has died.
166 kernel_died = QtCore.Signal(object)
167
168 #---------------------------------------------------------------------------
169 # 'HBSocketChannel' interface
170 #---------------------------------------------------------------------------
171
172 def call_handlers(self, since_last_heartbeat):
173 """ Reimplemented to emit signals instead of making callbacks.
174 """
175 # Emit the generic signal.
176 self.kernel_died.emit(since_last_heartbeat)
177
178
179 class QtKernelManager(KernelManager, SuperQObject):
26 class QtKernelManager(QtKernelManagerMixin, KernelManager, SuperQObject):
180 27 """ A KernelManager that provides signals and slots.
181 28 """
182 29
183 30 __metaclass__ = MetaQObjectHasTraits
184 31
185 # Emitted when the kernel manager has started listening.
186 started_kernel = QtCore.Signal()
187
188 # Emitted when the kernel manager has started listening.
189 started_channels = QtCore.Signal()
190
191 # Emitted when the kernel manager has stopped listening.
192 stopped_channels = QtCore.Signal()
193
194 # Use Qt-specific channel classes that emit signals.
195 32 sub_channel_class = Type(QtSubSocketChannel)
196 33 shell_channel_class = Type(QtShellSocketChannel)
197 34 stdin_channel_class = Type(QtStdInSocketChannel)
198 35 hb_channel_class = Type(QtHBSocketChannel)
199
200 #---------------------------------------------------------------------------
201 # 'KernelManager' interface
202 #---------------------------------------------------------------------------
203
204 #------ Kernel process management ------------------------------------------
205
206 def start_kernel(self, *args, **kw):
207 """ Reimplemented for proper heartbeat management.
208 """
209 if self._shell_channel is not None:
210 self._shell_channel.reset_first_reply()
211 super(QtKernelManager, self).start_kernel(*args, **kw)
212 self.started_kernel.emit()
213
214 #------ Channel management -------------------------------------------------
215
216 def start_channels(self, *args, **kw):
217 """ Reimplemented to emit signal.
218 """
219 super(QtKernelManager, self).start_channels(*args, **kw)
220 self.started_channels.emit()
221
222 def stop_channels(self):
223 """ Reimplemented to emit signal.
224 """
225 super(QtKernelManager, self).stop_channels()
226 self.stopped_channels.emit()
227
228 @property
229 def shell_channel(self):
230 """ Reimplemented for proper heartbeat management.
231 """
232 if self._shell_channel is None:
233 self._shell_channel = super(QtKernelManager, self).shell_channel
234 self._shell_channel.first_reply.connect(self._first_reply)
235 return self._shell_channel
236
237 #---------------------------------------------------------------------------
238 # Protected interface
239 #---------------------------------------------------------------------------
240
241 def _first_reply(self):
242 """ Unpauses the heartbeat channel when the first reply is received on
243 the execute channel. Note that this will *not* start the heartbeat
244 channel if it is not already running!
245 """
246 if self._hb_channel is not None:
247 self._hb_channel.unpause()
@@ -1,9 +1,9 b''
1 """Implement a fully blocking kernel manager.
1 """ Implements a fully blocking kernel manager.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2012 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
@@ -12,125 +12,25 b' Useful for test suites and blocking terminal interfaces.'
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16 15
17 # Stdlib
18 from Queue import Queue, Empty
19 from threading import Event
20
21 # Our own
22 from IPython.utils import io
16 # Local imports.
17 from IPython.embedded.blockingkernelmanager import BlockingChannelMixin
23 18 from IPython.utils.traitlets import Type
24
25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
26 ShellSocketChannel, StdInSocketChannel)
19 from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
20 ShellSocketChannel, StdInSocketChannel
27 21
28 22 #-----------------------------------------------------------------------------
29 # Functions and classes
23 # Blocking kernel manager
30 24 #-----------------------------------------------------------------------------
31 25
32 class BlockingSubSocketChannel(SubSocketChannel):
33
34 def __init__(self, context, session, address=None):
35 super(BlockingSubSocketChannel, self).__init__(context, session,
36 address)
37 self._in_queue = Queue()
38
39 def call_handlers(self, msg):
40 #io.rprint('[[Sub]]', msg) # dbg
41 self._in_queue.put(msg)
42
43 def msg_ready(self):
44 """Is there a message that has been received?"""
45 if self._in_queue.qsize() == 0:
46 return False
47 else:
48 return True
49
50 def get_msg(self, block=True, timeout=None):
51 """Get a message if there is one that is ready."""
52 if block and timeout is None:
53 # never use timeout=None, because get
54 # becomes uninterruptible
55 timeout = 1e6
56 return self._in_queue.get(block, timeout)
57
58 def get_msgs(self):
59 """Get all messages that are currently ready."""
60 msgs = []
61 while True:
62 try:
63 msgs.append(self.get_msg(block=False))
64 except Empty:
65 break
66 return msgs
67
68
69 class BlockingShellSocketChannel(ShellSocketChannel):
26 class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
27 pass
70 28
71 def __init__(self, context, session, address=None):
72 super(BlockingShellSocketChannel, self).__init__(context, session,
73 address)
74 self._in_queue = Queue()
75
76 def call_handlers(self, msg):
77 #io.rprint('[[Shell]]', msg) # dbg
78 self._in_queue.put(msg)
79
80 def msg_ready(self):
81 """Is there a message that has been received?"""
82 if self._in_queue.qsize() == 0:
83 return False
84 else:
85 return True
86
87 def get_msg(self, block=True, timeout=None):
88 """Get a message if there is one that is ready."""
89 if block and timeout is None:
90 # never use timeout=None, because get
91 # becomes uninterruptible
92 timeout = 1e6
93 return self._in_queue.get(block, timeout)
94
95 def get_msgs(self):
96 """Get all messages that are currently ready."""
97 msgs = []
98 while True:
99 try:
100 msgs.append(self.get_msg(block=False))
101 except Empty:
102 break
103 return msgs
104
105
106 class BlockingStdInSocketChannel(StdInSocketChannel):
107
108 def __init__(self, context, session, address=None):
109 super(BlockingStdInSocketChannel, self).__init__(context, session, address)
110 self._in_queue = Queue()
111
112 def call_handlers(self, msg):
113 #io.rprint('[[Rep]]', msg) # dbg
114 self._in_queue.put(msg)
115
116 def get_msg(self, block=True, timeout=None):
117 "Gets a message if there is one that is ready."
118 return self._in_queue.get(block, timeout)
119
120 def get_msgs(self):
121 """Get all messages that are currently ready."""
122 msgs = []
123 while True:
124 try:
125 msgs.append(self.get_msg(block=False))
126 except Empty:
127 break
128 return msgs
129
130 def msg_ready(self):
131 "Is there a message that has been received?"
132 return not self._in_queue.empty()
29 class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
30 pass
133 31
32 class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
33 pass
134 34
135 35 class BlockingHBSocketChannel(HBSocketChannel):
136 36
@@ -140,10 +40,9 b' class BlockingHBSocketChannel(HBSocketChannel):'
140 40 time_to_dead = 1.
141 41
142 42 def call_handlers(self, since_last_heartbeat):
143 """pause beating on missed heartbeat"""
43 """ Pause beating on missed heartbeat. """
144 44 pass
145 45
146
147 46 class BlockingKernelManager(KernelManager):
148 47
149 48 # The classes to use for the various channels.
@@ -654,6 +654,8 b' class KernelManager(HasTraits):'
654 654
655 655 # The Session to use for communication with the kernel.
656 656 session = Instance(Session)
657 def _session_default(self):
658 return Session(config=self.config)
657 659
658 660 # The kernel process with which the KernelManager is communicating.
659 661 kernel = Instance(Popen)
@@ -682,16 +684,10 b' class KernelManager(HasTraits):'
682 684 _stdin_channel = Any
683 685 _hb_channel = Any
684 686 _connection_file_written=Bool(False)
685
686 def __init__(self, **kwargs):
687 super(KernelManager, self).__init__(**kwargs)
688 if self.session is None:
689 self.session = Session(config=self.config)
690 687
691 688 def __del__(self):
692 689 self.cleanup_connection_file()
693 690
694
695 691 #--------------------------------------------------------------------------
696 692 # Channel management methods:
697 693 #--------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now