##// END OF EJS Templates
Merge branch 'kernelmanager' of git://github.com/ellisonbg/ipython into qtfrontend. Fixed breakage and conflicts from merge....
epatters -
r2701:c46f948d merge
parent child Browse files
Show More
@@ -161,10 +161,10 b' class FrontendWidget(HistoryConsoleWidget):'
161 """
161 """
162 # Disconnect the old kernel manager, if necessary.
162 # Disconnect the old kernel manager, if necessary.
163 if self._kernel_manager is not None:
163 if self._kernel_manager is not None:
164 self._kernel_manager.started_listening.disconnect(
164 self._kernel_manager.started_channels.disconnect(
165 self._started_listening)
165 self._started_channels)
166 self._kernel_manager.stopped_listening.disconnect(
166 self._kernel_manager.stopped_channels.disconnect(
167 self._stopped_listening)
167 self._stopped_channels)
168
168
169 # Disconnect the old kernel manager's channels.
169 # Disconnect the old kernel manager's channels.
170 sub = self._kernel_manager.sub_channel
170 sub = self._kernel_manager.sub_channel
@@ -174,9 +174,9 b' class FrontendWidget(HistoryConsoleWidget):'
174 xreq.complete_reply.disconnect(self._handle_complete_reply)
174 xreq.complete_reply.disconnect(self._handle_complete_reply)
175 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
175 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
176
176
177 # Handle the case where the old kernel manager is still listening.
177 # Handle the case where the old kernel manager is still channels.
178 if self._kernel_manager.is_listening:
178 if self._kernel_manager.channels_running:
179 self._stopped_listening()
179 self._stopped_channels()
180
180
181 # Set the new kernel manager.
181 # Set the new kernel manager.
182 self._kernel_manager = kernel_manager
182 self._kernel_manager = kernel_manager
@@ -184,8 +184,8 b' class FrontendWidget(HistoryConsoleWidget):'
184 return
184 return
185
185
186 # Connect the new kernel manager.
186 # Connect the new kernel manager.
187 kernel_manager.started_listening.connect(self._started_listening)
187 kernel_manager.started_channels.connect(self._started_channels)
188 kernel_manager.stopped_listening.connect(self._stopped_listening)
188 kernel_manager.stopped_channels.connect(self._stopped_channels)
189
189
190 # Connect the new kernel manager's channels.
190 # Connect the new kernel manager's channels.
191 sub = kernel_manager.sub_channel
191 sub = kernel_manager.sub_channel
@@ -195,10 +195,10 b' class FrontendWidget(HistoryConsoleWidget):'
195 xreq.complete_reply.connect(self._handle_complete_reply)
195 xreq.complete_reply.connect(self._handle_complete_reply)
196 xreq.object_info_reply.connect(self._handle_object_info_reply)
196 xreq.object_info_reply.connect(self._handle_object_info_reply)
197
197
198 # Handle the case where the kernel manager started listening before
198 # Handle the case where the kernel manager started channels before
199 # we connected.
199 # we connected.
200 if kernel_manager.is_listening:
200 if kernel_manager.channels_running:
201 self._started_listening()
201 self._started_channels()
202
202
203 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
203 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
204
204
@@ -323,8 +323,8 b' class FrontendWidget(HistoryConsoleWidget):'
323 if doc:
323 if doc:
324 self._call_tip_widget.show_docstring(doc)
324 self._call_tip_widget.show_docstring(doc)
325
325
326 def _started_listening(self):
326 def _started_channels(self):
327 self.clear()
327 self.clear()
328
328
329 def _stopped_listening(self):
329 def _stopped_channels(self):
330 pass
330 pass
@@ -82,7 +82,7 b" if __name__ == '__main__':"
82 # Create a KernelManager.
82 # Create a KernelManager.
83 kernel_manager = QtKernelManager()
83 kernel_manager = QtKernelManager()
84 kernel_manager.start_kernel()
84 kernel_manager.start_kernel()
85 kernel_manager.start_listening()
85 kernel_manager.start_channels()
86
86
87 # Launch the application.
87 # Launch the application.
88 app = QtGui.QApplication([])
88 app = QtGui.QApplication([])
@@ -93,12 +93,6 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
93 if signal:
93 if signal:
94 signal.emit(msg)
94 signal.emit(msg)
95
95
96 def _queue_request(self, msg, callback):
97 """ Reimplemented to skip callback handling.
98 """
99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
101
102
96
103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
97 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
104
98
@@ -120,10 +114,10 b' class QtKernelManager(KernelManager, QtCore.QObject):'
120 __metaclass__ = MetaQObjectHasTraits
114 __metaclass__ = MetaQObjectHasTraits
121
115
122 # Emitted when the kernel manager has started listening.
116 # Emitted when the kernel manager has started listening.
123 started_listening = QtCore.pyqtSignal()
117 started_channels = QtCore.pyqtSignal()
124
118
125 # Emitted when the kernel manager has stopped listening.
119 # Emitted when the kernel manager has stopped listening.
126 stopped_listening = QtCore.pyqtSignal()
120 stopped_channels = QtCore.pyqtSignal()
127
121
128 # Use Qt-specific channel classes that emit signals.
122 # Use Qt-specific channel classes that emit signals.
129 sub_channel_class = QtSubSocketChannel
123 sub_channel_class = QtSubSocketChannel
@@ -131,17 +125,27 b' class QtKernelManager(KernelManager, QtCore.QObject):'
131 rep_channel_class = QtRepSocketChannel
125 rep_channel_class = QtRepSocketChannel
132
126
133 #---------------------------------------------------------------------------
127 #---------------------------------------------------------------------------
128 # 'object' interface
129 #---------------------------------------------------------------------------
130
131 def __init__(self, *args, **kw):
132 """ Reimplemented to ensure that QtCore.QObject is initialized first.
133 """
134 QtCore.QObject.__init__(self)
135 KernelManager.__init__(self, *args, **kw)
136
137 #---------------------------------------------------------------------------
134 # 'KernelManager' interface
138 # 'KernelManager' interface
135 #---------------------------------------------------------------------------
139 #---------------------------------------------------------------------------
136
140
137 def start_listening(self):
141 def start_channels(self):
138 """ Reimplemented to emit signal.
142 """ Reimplemented to emit signal.
139 """
143 """
140 super(QtKernelManager, self).start_listening()
144 super(QtKernelManager, self).start_channels()
141 self.started_listening.emit()
145 self.started_channels.emit()
142
146
143 def stop_listening(self):
147 def stop_channels(self):
144 """ Reimplemented to emit signal.
148 """ Reimplemented to emit signal.
145 """
149 """
146 super(QtKernelManager, self).stop_listening()
150 super(QtKernelManager, self).stop_channels()
147 self.stopped_listening.emit()
151 self.stopped_channels.emit()
This diff has been collapsed as it changes many lines, (553 lines changed) Show them Hide them
@@ -1,15 +1,27 b''
1 """Kernel frontend classes.
1 """Classes to manage the interaction with a running kernel.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 Todo
4 ====
4
5
6 * Create logger to handle debugging and console messages.
5 """
7 """
6
8
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
19
7 # Standard library imports.
20 # Standard library imports.
8 from Queue import Queue, Empty
21 from Queue import Queue, Empty
9 from subprocess import Popen
22 from subprocess import Popen
10 from threading import Thread
23 from threading import Thread
11 import time
24 import time
12 import traceback
13
25
14 # System library imports.
26 # System library imports.
15 import zmq
27 import zmq
@@ -17,70 +29,80 b' from zmq import POLLIN, POLLOUT, POLLERR'
17 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
18
30
19 # Local imports.
31 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
21 Type
22 from kernel import launch_kernel
33 from kernel import launch_kernel
23 from session import Session
34 from session import Session
24
35
25 # Constants.
36 #-----------------------------------------------------------------------------
26 LOCALHOST = '127.0.0.1'
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
27
39
40 LOCALHOST = '127.0.0.1'
28
41
29 class MissingHandlerError(Exception):
42 class InvalidPortNumber(Exception):
30 pass
43 pass
31
44
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
32
48
33 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
35 """
51 """
36
37 context = None
52 context = None
38 session = None
53 session = None
39 socket = None
54 socket = None
40 ioloop = None
55 ioloop = None
41 iostate = None
56 iostate = None
57 _address = None
42
58
43 def __init__(self, context, session, address=None):
59 def __init__(self, context, session, address):
60 """Create a channel
61
62 Parameters
63 ----------
64 context : zmq.Context
65 The ZMQ context to use.
66 session : session.Session
67 The session to use.
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
44 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
45 self.daemon = True
72 self.daemon = True
46
73
47 self.context = context
74 self.context = context
48 self.session = session
75 self.session = session
49 self.address = address
76 if address[1] == 0:
77 raise InvalidPortNumber('The port number for a channel cannot be 0.')
78 self._address = address
50
79
51 def stop(self):
80 def stop(self):
52 """Stop the thread's activity. Returns when the thread terminates.
81 """Stop the channel's activity.
53
82
54 The thread will raise :class:`RuntimeError` if :method:`self.start`
83 This calls :method:`Thread.join` and returns when the thread
55 is called again.
84 terminates. :class:`RuntimeError` will be raised if
85 :method:`self.start` is called again.
56 """
86 """
57 self.join()
87 self.join()
58
88
59
89 @property
60 def get_address(self):
90 def address(self):
61 """ Get the channel's address. By the default, a channel is on
91 """Get the channel's address as an (ip, port) tuple.
62 localhost with no port specified (a negative port number).
92
93 By the default, the address is (localhost, 0), where 0 means a random
94 port.
63 """
95 """
64 return self._address
96 return self._address
65
97
66 def set_adresss(self, address):
67 """ Set the channel's address. Should be a tuple of form:
68 (ip address [str], port [int]).
69 or None, in which case the address is reset to its default value.
70 """
71 # FIXME: Validate address.
72 if self.is_alive(): # This is Thread.is_alive
73 raise RuntimeError("Cannot set address on a running channel!")
74 else:
75 if address is None:
76 address = (LOCALHOST, 0)
77 self._address = address
78
79 address = property(get_address, set_adresss)
80
81 def add_io_state(self, state):
98 def add_io_state(self, state):
82 """Add IO state to the eventloop.
99 """Add IO state to the eventloop.
83
100
101 Parameters
102 ----------
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 The IO state flag to set.
105
84 This is thread safe as it uses the thread safe IOLoop.add_callback.
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 """
107 """
86 def add_io_state_callback():
108 def add_io_state_callback():
@@ -92,6 +114,11 b' class ZmqSocketChannel(Thread):'
92 def drop_io_state(self, state):
114 def drop_io_state(self, state):
93 """Drop IO state from the eventloop.
115 """Drop IO state from the eventloop.
94
116
117 Parameters
118 ----------
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 The IO state flag to set.
121
95 This is thread safe as it uses the thread safe IOLoop.add_callback.
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 """
123 """
97 def drop_io_state_callback():
124 def drop_io_state_callback():
@@ -101,48 +128,30 b' class ZmqSocketChannel(Thread):'
101 self.ioloop.add_callback(drop_io_state_callback)
128 self.ioloop.add_callback(drop_io_state_callback)
102
129
103
130
104 class SubSocketChannel(ZmqSocketChannel):
131 class XReqSocketChannel(ZmqSocketChannel):
132 """The XREQ channel for issues request/replies to the kernel.
133 """
105
134
106 def __init__(self, context, session, address=None):
135 command_queue = None
107 super(SubSocketChannel, self).__init__(context, session, address)
136
137 def __init__(self, context, session, address):
138 self.command_queue = Queue()
139 super(XReqSocketChannel, self).__init__(context, session, address)
108
140
109 def run(self):
141 def run(self):
110 self.socket = self.context.socket(zmq.SUB)
142 """The thread's main activity. Call start() instead."""
111 self.socket.setsockopt(zmq.SUBSCRIBE,'')
143 self.socket = self.context.socket(zmq.XREQ)
112 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
113 self.socket.connect('tcp://%s:%i' % self.address)
145 self.socket.connect('tcp://%s:%i' % self.address)
114 self.ioloop = ioloop.IOLoop()
146 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
147 self.iostate = POLLERR|POLLIN
116 self.ioloop.add_handler(self.socket, self._handle_events,
148 self.ioloop.add_handler(self.socket, self._handle_events,
117 self.iostate)
149 self.iostate)
118 self.ioloop.start()
150 self.ioloop.start()
119
151
120 def stop(self):
152 def stop(self):
121 self.ioloop.stop()
153 self.ioloop.stop()
122 super(SubSocketChannel, self).stop()
154 super(XReqSocketChannel, self).stop()
123
124 def _handle_events(self, socket, events):
125 # Turn on and off POLLOUT depending on if we have made a request
126 if events & POLLERR:
127 self._handle_err()
128 if events & POLLIN:
129 self._handle_recv()
130
131 def _handle_err(self):
132 # We don't want to let this go silently, so eventually we should log.
133 raise zmq.ZMQError()
134
135 def _handle_recv(self):
136 # Get all of the messages we can
137 while True:
138 try:
139 msg = self.socket.recv_json(zmq.NOBLOCK)
140 except zmq.ZMQError:
141 # Check the errno?
142 # Will this tigger POLLERR?
143 break
144 else:
145 self.call_handlers(msg)
146
155
147 def call_handlers(self, msg):
156 def call_handlers(self, msg):
148 """This method is called in the ioloop thread when a message arrives.
157 """This method is called in the ioloop thread when a message arrives.
@@ -154,59 +163,65 b' class SubSocketChannel(ZmqSocketChannel):'
154 """
163 """
155 raise NotImplementedError('call_handlers must be defined in a subclass.')
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
156
165
157 def flush(self, timeout=1.0):
166 def execute(self, code):
158 """Immediately processes all pending messages on the SUB channel.
167 """Execute code in the kernel.
159
160 This method is thread safe.
161
168
162 Parameters
169 Parameters
163 ----------
170 ----------
164 timeout : float, optional
171 code : str
165 The maximum amount of time to spend flushing, in seconds. The
172 A string of Python code.
166 default is one second.
173
167 """
174 Returns
168 # We do the IOLoop callback process twice to ensure that the IOLoop
175 -------
169 # gets to perform at least one full poll.
176 The msg_id of the message sent.
170 stop_time = time.time() + timeout
171 for i in xrange(2):
172 self._flushed = False
173 self.ioloop.add_callback(self._flush)
174 while not self._flushed and time.time() < stop_time:
175 time.sleep(0.01)
176
177 def _flush(self):
178 """Called in this thread by the IOLoop to indicate that all events have
179 been processed.
180 """
177 """
181 self._flushed = True
178 # Create class for content/msg creation. Related to, but possibly
179 # not in Session.
180 content = dict(code=code)
181 msg = self.session.msg('execute_request', content)
182 self._queue_request(msg)
183 return msg['header']['msg_id']
182
184
185 def complete(self, text, line, block=None):
186 """Tab complete text, line, block in the kernel's namespace.
183
187
184 class XReqSocketChannel(ZmqSocketChannel):
188 Parameters
189 ----------
190 text : str
191 The text to complete.
192 line : str
193 The full line of text that is the surrounding context for the
194 text to complete.
195 block : str
196 The full block of code in which the completion is being requested.
197
198 Returns
199 -------
200 The msg_id of the message sent.
185
201
186 handler_queue = None
202 """
187 command_queue = None
203 content = dict(text=text, line=line)
188 handlers = None
204 msg = self.session.msg('complete_request', content)
189 _overriden_call_handler = None
205 self._queue_request(msg)
206 return msg['header']['msg_id']
190
207
191 def __init__(self, context, session, address=None):
208 def object_info(self, oname):
192 self.handlers = {}
209 """Get metadata information about an object.
193 self.handler_queue = Queue()
194 self.command_queue = Queue()
195 super(XReqSocketChannel, self).__init__(context, session, address)
196
210
197 def run(self):
211 Parameters
198 self.socket = self.context.socket(zmq.XREQ)
212 ----------
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
213 oname : str
200 self.socket.connect('tcp://%s:%i' % self.address)
214 A string specifying the object name.
201 self.ioloop = ioloop.IOLoop()
215
202 self.iostate = POLLERR|POLLIN
216 Returns
203 self.ioloop.add_handler(self.socket, self._handle_events,
217 -------
204 self.iostate)
218 The msg_id of the message sent.
205 self.ioloop.start()
219 """
206
220 print oname
207 def stop(self):
221 content = dict(oname=oname)
208 self.ioloop.stop()
222 msg = self.session.msg('object_info_request', content)
209 super(XReqSocketChannel, self).stop()
223 self._queue_request(msg)
224 return msg['header']['msg_id']
210
225
211 def _handle_events(self, socket, events):
226 def _handle_events(self, socket, events):
212 if events & POLLERR:
227 if events & POLLERR:
@@ -234,82 +249,113 b' class XReqSocketChannel(ZmqSocketChannel):'
234 # We don't want to let this go silently, so eventually we should log.
249 # We don't want to let this go silently, so eventually we should log.
235 raise zmq.ZMQError()
250 raise zmq.ZMQError()
236
251
237 def _queue_request(self, msg, callback):
252 def _queue_request(self, msg):
238 handler = self._find_handler(msg['msg_type'], callback)
239 self.handler_queue.put(handler)
240 self.command_queue.put(msg)
253 self.command_queue.put(msg)
241 self.add_io_state(POLLOUT)
254 self.add_io_state(POLLOUT)
242
255
243 def execute(self, code, callback=None):
244 # Create class for content/msg creation. Related to, but possibly
245 # not in Session.
246 content = dict(code=code)
247 msg = self.session.msg('execute_request', content)
248 self._queue_request(msg, callback)
249 return msg['header']['msg_id']
250
256
251 def complete(self, text, line, block=None, callback=None):
257 class SubSocketChannel(ZmqSocketChannel):
252 content = dict(text=text, line=line)
258 """The SUB channel which listens for messages that the kernel publishes.
253 msg = self.session.msg('complete_request', content)
259 """
254 self._queue_request(msg, callback)
255 return msg['header']['msg_id']
256
260
257 def object_info(self, oname, callback=None):
261 def __init__(self, context, session, address):
258 content = dict(oname=oname)
262 super(SubSocketChannel, self).__init__(context, session, address)
259 msg = self.session.msg('object_info_request', content)
260 self._queue_request(msg, callback)
261 return msg['header']['msg_id']
262
263
263 def _find_handler(self, name, callback):
264 def run(self):
264 if callback is not None:
265 """The thread's main activity. Call start() instead."""
265 return callback
266 self.socket = self.context.socket(zmq.SUB)
266 handler = self.handlers.get(name)
267 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 if handler is None:
268 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 raise MissingHandlerError(
269 self.socket.connect('tcp://%s:%i' % self.address)
269 'No handler defined for method: %s' % name)
270 self.ioloop = ioloop.IOLoop()
270 return handler
271 self.iostate = POLLIN|POLLERR
271
272 self.ioloop.add_handler(self.socket, self._handle_events,
272 def override_call_handler(self, func):
273 self.iostate)
273 """Permanently override the call_handler.
274 self.ioloop.start()
274
275 The function func will be called as::
276
275
277 func(handler, msg)
276 def stop(self):
277 self.ioloop.stop()
278 super(SubSocketChannel, self).stop()
278
279
279 And must call::
280 def call_handlers(self, msg):
280
281 """This method is called in the ioloop thread when a message arrives.
281 handler(msg)
282
282
283 in the main thread.
283 Subclasses should override this method to handle incoming messages.
284 It is important to remember that this method is called in the thread
285 so that some logic must be done to ensure that the application leve
286 handlers are called in the application thread.
284 """
287 """
285 assert callable(func), "not a callable: %r" % func
288 raise NotImplementedError('call_handlers must be defined in a subclass.')
286 self._overriden_call_handler = func
287
289
288 def call_handlers(self, msg):
290 def flush(self, timeout=1.0):
289 try:
291 """Immediately processes all pending messages on the SUB channel.
290 handler = self.handler_queue.get(False)
292
291 except Empty:
293 This method is thread safe.
292 print "Message received with no handler!!!"
294
293 print msg
295 Parameters
294 else:
296 ----------
295 self.call_handler(handler, msg)
297 timeout : float, optional
296
298 The maximum amount of time to spend flushing, in seconds. The
297 def call_handler(self, handler, msg):
299 default is one second.
298 if self._overriden_call_handler is not None:
300 """
299 self._overriden_call_handler(handler, msg)
301 # We do the IOLoop callback process twice to ensure that the IOLoop
300 elif hasattr(self, '_call_handler'):
302 # gets to perform at least one full poll.
301 call_handler = getattr(self, '_call_handler')
303 stop_time = time.time() + timeout
302 call_handler(handler, msg)
304 for i in xrange(2):
303 else:
305 self._flushed = False
304 raise RuntimeError('no handler!')
306 self.ioloop.add_callback(self._flush)
307 while not self._flushed and time.time() < stop_time:
308 time.sleep(0.01)
309
310 def _handle_events(self, socket, events):
311 # Turn on and off POLLOUT depending on if we have made a request
312 if events & POLLERR:
313 self._handle_err()
314 if events & POLLIN:
315 self._handle_recv()
316
317 def _handle_err(self):
318 # We don't want to let this go silently, so eventually we should log.
319 raise zmq.ZMQError()
320
321 def _handle_recv(self):
322 # Get all of the messages we can
323 while True:
324 try:
325 msg = self.socket.recv_json(zmq.NOBLOCK)
326 except zmq.ZMQError:
327 # Check the errno?
328 # Will this tigger POLLERR?
329 break
330 else:
331 self.call_handlers(msg)
332
333 def _flush(self):
334 """Callback for :method:`self.flush`."""
335 self._flushed = True
305
336
306
337
307 class RepSocketChannel(ZmqSocketChannel):
338 class RepSocketChannel(ZmqSocketChannel):
339 """A reply channel to handle raw_input requests that the kernel makes."""
340
341 def run(self):
342 """The thread's main activity. Call start() instead."""
343 self.ioloop = ioloop.IOLoop()
344 self.ioloop.start()
345
346 def stop(self):
347 self.ioloop.stop()
348 super(SubSocketChannel, self).stop()
308
349
309 def on_raw_input(self):
350 def on_raw_input(self):
310 pass
351 pass
311
352
312
353
354 #-----------------------------------------------------------------------------
355 # Main kernel manager class
356 #-----------------------------------------------------------------------------
357
358
313 class KernelManager(HasTraits):
359 class KernelManager(HasTraits):
314 """ Manages a kernel for a frontend.
360 """ Manages a kernel for a frontend.
315
361
@@ -321,59 +367,66 b' class KernelManager(HasTraits):'
321 The REP channel is for the kernel to request stdin (raw_input) from the
367 The REP channel is for the kernel to request stdin (raw_input) from the
322 frontend.
368 frontend.
323 """
369 """
324
325 # Whether the kernel manager is currently listening on its channels.
326 is_listening = Bool(False)
327
328 # The PyZMQ Context to use for communication with the kernel.
370 # The PyZMQ Context to use for communication with the kernel.
329 context = Instance(zmq.Context, ())
371 context = Instance(zmq.Context)
330
372
331 # The Session to use for communication with the kernel.
373 # The Session to use for communication with the kernel.
332 session = Instance(Session, ())
374 session = Instance(Session)
333
375
334 # The classes to use for the various channels.
376 # The classes to use for the various channels.
335 sub_channel_class = Type(SubSocketChannel)
336 xreq_channel_class = Type(XReqSocketChannel)
377 xreq_channel_class = Type(XReqSocketChannel)
378 sub_channel_class = Type(SubSocketChannel)
337 rep_channel_class = Type(RepSocketChannel)
379 rep_channel_class = Type(RepSocketChannel)
338
380
339 # Protected traits.
381 # Protected traits.
340 _kernel = Instance(Popen)
382 _kernel = Instance(Popen)
341 _sub_channel = Any
383 _xreq_address = Any
384 _sub_address = Any
385 _rep_address = Any
342 _xreq_channel = Any
386 _xreq_channel = Any
387 _sub_channel = Any
343 _rep_channel = Any
388 _rep_channel = Any
344
389
390 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
391 context=None, session=None):
392 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
393 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
394 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
395 self.context = zmq.Context() if context is None else context
396 self.session = Session() if session is None else session
397
345 #--------------------------------------------------------------------------
398 #--------------------------------------------------------------------------
346 # Channel management methods:
399 # Channel management methods:
347 #--------------------------------------------------------------------------
400 #--------------------------------------------------------------------------
348
401
349 def start_listening(self):
402 def start_channels(self):
350 """Starts listening on the specified ports. If already listening, raises
403 """Starts the channels for this kernel.
351 a RuntimeError.
404
405 This will create the channels if they do not exist and then start
406 them. If port numbers of 0 are being used (random ports) then you
407 must first call :method:`start_kernel`. If the channels have been
408 stopped and you call this, :class:`RuntimeError` will be raised.
352 """
409 """
353 if self.is_listening:
410 self.xreq_channel.start()
354 raise RuntimeError("Cannot start listening. Already listening!")
411 self.sub_channel.start()
355 else:
412 self.rep_channel.start()
356 self.is_listening = True
357 self.sub_channel.start()
358 self.xreq_channel.start()
359 self.rep_channel.start()
360
413
361 @property
414 def stop_channels(self):
362 def is_alive(self):
415 """Stops the channels for this kernel.
363 """ Returns whether the kernel is alive. """
416
364 if self.is_listening:
417 This stops the channels by joining their threads. If the channels
365 # TODO: check if alive.
418 were not started, :class:`RuntimeError` will be raised.
366 return True
419 """
367 else:
420 self.xreq_channel.stop()
368 return False
421 self.sub_channel.stop()
422 self.rep_channel.stop()
369
423
370 def stop_listening(self):
424 @property
371 """Stops listening. If not listening, does nothing. """
425 def channels_running(self):
372 if self.is_listening:
426 """Are all of the channels created and running?"""
373 self.is_listening = False
427 return self.xreq_channel.is_alive() \
374 self.sub_channel.stop()
428 and self.sub_channel.is_alive() \
375 self.xreq_channel.stop()
429 and self.rep_channel.is_alive()
376 self.rep_channel.stop()
377
430
378 #--------------------------------------------------------------------------
431 #--------------------------------------------------------------------------
379 # Kernel process management methods:
432 # Kernel process management methods:
@@ -382,9 +435,8 b' class KernelManager(HasTraits):'
382 def start_kernel(self):
435 def start_kernel(self):
383 """Starts a kernel process and configures the manager to use it.
436 """Starts a kernel process and configures the manager to use it.
384
437
385 If ports have been specified via the address attributes, they are used.
438 If random ports (port=0) are being used, this method must be called
386 Otherwise, open ports are chosen by the OS and the channel port
439 before the channels are created.
387 attributes are configured as appropriate.
388 """
440 """
389 xreq, sub = self.xreq_address, self.sub_address
441 xreq, sub = self.xreq_address, self.sub_address
390 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
442 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
@@ -393,24 +445,13 b' class KernelManager(HasTraits):'
393 "configured properly.")
445 "configured properly.")
394
446
395 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
447 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
396 self.set_kernel(kernel)
397 self.xreq_address = (LOCALHOST, xrep)
398 self.sub_address = (LOCALHOST, pub)
399
400 def set_kernel(self, kernel):
401 """Sets the kernel manager's kernel to an existing kernel process.
402
403 It is *not* necessary to a set a kernel to communicate with it via the
404 channels, and those objects must be configured separately. It
405 *is* necessary to set a kernel if you want to use the manager (or
406 frontends that use the manager) to signal and/or kill the kernel.
407
408 Parameters:
409 -----------
410 kernel : Popen
411 An existing kernel process.
412 """
413 self._kernel = kernel
448 self._kernel = kernel
449 self._xreq_address = (LOCALHOST, xrep)
450 self._sub_address = (LOCALHOST, pub)
451 # The rep channel is not fully working yet, but its base class makes
452 # sure the port is not 0. We set to -1 for now until the rep channel
453 # is fully working.
454 self._rep_address = (LOCALHOST, -1)
414
455
415 @property
456 @property
416 def has_kernel(self):
457 def has_kernel(self):
@@ -423,7 +464,7 b' class KernelManager(HasTraits):'
423
464
424 def kill_kernel(self):
465 def kill_kernel(self):
425 """ Kill the running kernel. """
466 """ Kill the running kernel. """
426 if self._kernel:
467 if self._kernel is not None:
427 self._kernel.kill()
468 self._kernel.kill()
428 self._kernel = None
469 self._kernel = None
429 else:
470 else:
@@ -431,67 +472,65 b' class KernelManager(HasTraits):'
431
472
432 def signal_kernel(self, signum):
473 def signal_kernel(self, signum):
433 """ Sends a signal to the kernel. """
474 """ Sends a signal to the kernel. """
434 if self._kernel:
475 if self._kernel is not None:
435 self._kernel.send_signal(signum)
476 self._kernel.send_signal(signum)
436 else:
477 else:
437 raise RuntimeError("Cannot signal kernel. No kernel is running!")
478 raise RuntimeError("Cannot signal kernel. No kernel is running!")
438
479
480 @property
481 def is_alive(self):
482 """Is the kernel process still running?"""
483 if self._kernel is not None:
484 if self._kernel.poll() is None:
485 return True
486 else:
487 return False
488 else:
489 # We didn't start the kernel with this KernelManager so we don't
490 # know if it is running. We should use a heartbeat for this case.
491 return True
492
439 #--------------------------------------------------------------------------
493 #--------------------------------------------------------------------------
440 # Channels used for communication with the kernel:
494 # Channels used for communication with the kernel:
441 #--------------------------------------------------------------------------
495 #--------------------------------------------------------------------------
442
496
443 @property
497 @property
444 def sub_channel(self):
445 """Get the SUB socket channel object."""
446 if self._sub_channel is None:
447 self._sub_channel = self.sub_channel_class(self.context,
448 self.session)
449 return self._sub_channel
450
451 @property
452 def xreq_channel(self):
498 def xreq_channel(self):
453 """Get the REQ socket channel object to make requests of the kernel."""
499 """Get the REQ socket channel object to make requests of the kernel."""
454 if self._xreq_channel is None:
500 if self._xreq_channel is None:
455 self._xreq_channel = self.xreq_channel_class(self.context,
501 self._xreq_channel = self.xreq_channel_class(self.context,
456 self.session)
502 self.session,
503 self.xreq_address)
457 return self._xreq_channel
504 return self._xreq_channel
458
505
459 @property
506 @property
507 def sub_channel(self):
508 """Get the SUB socket channel object."""
509 if self._sub_channel is None:
510 self._sub_channel = self.sub_channel_class(self.context,
511 self.session,
512 self.sub_address)
513 return self._sub_channel
514
515 @property
460 def rep_channel(self):
516 def rep_channel(self):
461 """Get the REP socket channel object to handle stdin (raw_input)."""
517 """Get the REP socket channel object to handle stdin (raw_input)."""
462 if self._rep_channel is None:
518 if self._rep_channel is None:
463 self._rep_channel = self.rep_channel_class(self.context,
519 self._rep_channel = self.rep_channel_class(self.context,
464 self.session)
520 self.session,
521 self.rep_address)
465 return self._rep_channel
522 return self._rep_channel
466
523
467 #--------------------------------------------------------------------------
524 @property
468 # Delegates for the Channel address attributes:
525 def xreq_address(self):
469 #--------------------------------------------------------------------------
526 return self._xreq_address
470
471 def get_sub_address(self):
472 return self.sub_channel.address
473
474 def set_sub_address(self, address):
475 self.sub_channel.address = address
476
477 sub_address = property(get_sub_address, set_sub_address,
478 doc="The address used by SUB socket channel.")
479
480 def get_xreq_address(self):
481 return self.xreq_channel.address
482
483 def set_xreq_address(self, address):
484 self.xreq_channel.address = address
485
527
486 xreq_address = property(get_xreq_address, set_xreq_address,
528 @property
487 doc="The address used by XREQ socket channel.")
529 def sub_address(self):
488
530 return self._sub_address
489 def get_rep_address(self):
490 return self.rep_channel.address
491
531
492 def set_rep_address(self, address):
532 @property
493 self.rep_channel.address = address
533 def rep_address(self):
534 return self._rep_address
494
535
495 rep_address = property(get_rep_address, set_rep_address,
496 doc="The address used by REP socket channel.")
497
536
General Comments 0
You need to be logged in to leave comments. Login now