##// END OF EJS Templates
Merge branch 'kernelmanager' of git://github.com/ellisonbg/ipython into qtfrontend. Fixed breakage and conflicts from merge....
epatters -
r2701:c46f948d merge
parent child Browse files
Show More
@@ -161,10 +161,10 b' class FrontendWidget(HistoryConsoleWidget):'
161 161 """
162 162 # Disconnect the old kernel manager, if necessary.
163 163 if self._kernel_manager is not None:
164 self._kernel_manager.started_listening.disconnect(
165 self._started_listening)
166 self._kernel_manager.stopped_listening.disconnect(
167 self._stopped_listening)
164 self._kernel_manager.started_channels.disconnect(
165 self._started_channels)
166 self._kernel_manager.stopped_channels.disconnect(
167 self._stopped_channels)
168 168
169 169 # Disconnect the old kernel manager's channels.
170 170 sub = self._kernel_manager.sub_channel
@@ -174,9 +174,9 b' class FrontendWidget(HistoryConsoleWidget):'
174 174 xreq.complete_reply.disconnect(self._handle_complete_reply)
175 175 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
176 176
177 # Handle the case where the old kernel manager is still listening.
178 if self._kernel_manager.is_listening:
179 self._stopped_listening()
177 # Handle the case where the old kernel manager is still channels.
178 if self._kernel_manager.channels_running:
179 self._stopped_channels()
180 180
181 181 # Set the new kernel manager.
182 182 self._kernel_manager = kernel_manager
@@ -184,8 +184,8 b' class FrontendWidget(HistoryConsoleWidget):'
184 184 return
185 185
186 186 # Connect the new kernel manager.
187 kernel_manager.started_listening.connect(self._started_listening)
188 kernel_manager.stopped_listening.connect(self._stopped_listening)
187 kernel_manager.started_channels.connect(self._started_channels)
188 kernel_manager.stopped_channels.connect(self._stopped_channels)
189 189
190 190 # Connect the new kernel manager's channels.
191 191 sub = kernel_manager.sub_channel
@@ -195,10 +195,10 b' class FrontendWidget(HistoryConsoleWidget):'
195 195 xreq.complete_reply.connect(self._handle_complete_reply)
196 196 xreq.object_info_reply.connect(self._handle_object_info_reply)
197 197
198 # Handle the case where the kernel manager started listening before
198 # Handle the case where the kernel manager started channels before
199 199 # we connected.
200 if kernel_manager.is_listening:
201 self._started_listening()
200 if kernel_manager.channels_running:
201 self._started_channels()
202 202
203 203 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
204 204
@@ -323,8 +323,8 b' class FrontendWidget(HistoryConsoleWidget):'
323 323 if doc:
324 324 self._call_tip_widget.show_docstring(doc)
325 325
326 def _started_listening(self):
326 def _started_channels(self):
327 327 self.clear()
328 328
329 def _stopped_listening(self):
329 def _stopped_channels(self):
330 330 pass
@@ -82,7 +82,7 b" if __name__ == '__main__':"
82 82 # Create a KernelManager.
83 83 kernel_manager = QtKernelManager()
84 84 kernel_manager.start_kernel()
85 kernel_manager.start_listening()
85 kernel_manager.start_channels()
86 86
87 87 # Launch the application.
88 88 app = QtGui.QApplication([])
@@ -93,12 +93,6 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
93 93 if signal:
94 94 signal.emit(msg)
95 95
96 def _queue_request(self, msg, callback):
97 """ Reimplemented to skip callback handling.
98 """
99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
101
102 96
103 97 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
104 98
@@ -120,10 +114,10 b' class QtKernelManager(KernelManager, QtCore.QObject):'
120 114 __metaclass__ = MetaQObjectHasTraits
121 115
122 116 # Emitted when the kernel manager has started listening.
123 started_listening = QtCore.pyqtSignal()
117 started_channels = QtCore.pyqtSignal()
124 118
125 119 # Emitted when the kernel manager has stopped listening.
126 stopped_listening = QtCore.pyqtSignal()
120 stopped_channels = QtCore.pyqtSignal()
127 121
128 122 # Use Qt-specific channel classes that emit signals.
129 123 sub_channel_class = QtSubSocketChannel
@@ -131,17 +125,27 b' class QtKernelManager(KernelManager, QtCore.QObject):'
131 125 rep_channel_class = QtRepSocketChannel
132 126
133 127 #---------------------------------------------------------------------------
128 # 'object' interface
129 #---------------------------------------------------------------------------
130
131 def __init__(self, *args, **kw):
132 """ Reimplemented to ensure that QtCore.QObject is initialized first.
133 """
134 QtCore.QObject.__init__(self)
135 KernelManager.__init__(self, *args, **kw)
136
137 #---------------------------------------------------------------------------
134 138 # 'KernelManager' interface
135 139 #---------------------------------------------------------------------------
136 140
137 def start_listening(self):
141 def start_channels(self):
138 142 """ Reimplemented to emit signal.
139 143 """
140 super(QtKernelManager, self).start_listening()
141 self.started_listening.emit()
144 super(QtKernelManager, self).start_channels()
145 self.started_channels.emit()
142 146
143 def stop_listening(self):
147 def stop_channels(self):
144 148 """ Reimplemented to emit signal.
145 149 """
146 super(QtKernelManager, self).stop_listening()
147 self.stopped_listening.emit()
150 super(QtKernelManager, self).stop_channels()
151 self.stopped_channels.emit()
This diff has been collapsed as it changes many lines, (527 lines changed) Show them Hide them
@@ -1,15 +1,27 b''
1 """Kernel frontend classes.
1 """Classes to manage the interaction with a running kernel.
2 2
3 TODO: Create logger to handle debugging and console messages.
3 Todo
4 ====
4 5
6 * Create logger to handle debugging and console messages.
5 7 """
6 8
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
19
7 20 # Standard library imports.
8 21 from Queue import Queue, Empty
9 22 from subprocess import Popen
10 23 from threading import Thread
11 24 import time
12 import traceback
13 25
14 26 # System library imports.
15 27 import zmq
@@ -17,70 +29,80 b' from zmq import POLLIN, POLLOUT, POLLERR'
17 29 from zmq.eventloop import ioloop
18 30
19 31 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
22 33 from kernel import launch_kernel
23 34 from session import Session
24 35
25 # Constants.
26 LOCALHOST = '127.0.0.1'
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
27 39
40 LOCALHOST = '127.0.0.1'
28 41
29 class MissingHandlerError(Exception):
42 class InvalidPortNumber(Exception):
30 43 pass
31 44
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
32 48
33 49 class ZmqSocketChannel(Thread):
34 50 """ The base class for the channels that use ZMQ sockets.
35 51 """
36
37 52 context = None
38 53 session = None
39 54 socket = None
40 55 ioloop = None
41 56 iostate = None
57 _address = None
58
59 def __init__(self, context, session, address):
60 """Create a channel
42 61
43 def __init__(self, context, session, address=None):
62 Parameters
63 ----------
64 context : zmq.Context
65 The ZMQ context to use.
66 session : session.Session
67 The session to use.
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
44 71 super(ZmqSocketChannel, self).__init__()
45 72 self.daemon = True
46 73
47 74 self.context = context
48 75 self.session = session
49 self.address = address
76 if address[1] == 0:
77 raise InvalidPortNumber('The port number for a channel cannot be 0.')
78 self._address = address
50 79
51 80 def stop(self):
52 """Stop the thread's activity. Returns when the thread terminates.
81 """Stop the channel's activity.
53 82
54 The thread will raise :class:`RuntimeError` if :method:`self.start`
55 is called again.
83 This calls :method:`Thread.join` and returns when the thread
84 terminates. :class:`RuntimeError` will be raised if
85 :method:`self.start` is called again.
56 86 """
57 87 self.join()
58 88
89 @property
90 def address(self):
91 """Get the channel's address as an (ip, port) tuple.
59 92
60 def get_address(self):
61 """ Get the channel's address. By the default, a channel is on
62 localhost with no port specified (a negative port number).
93 By the default, the address is (localhost, 0), where 0 means a random
94 port.
63 95 """
64 96 return self._address
65 97
66 def set_adresss(self, address):
67 """ Set the channel's address. Should be a tuple of form:
68 (ip address [str], port [int]).
69 or None, in which case the address is reset to its default value.
70 """
71 # FIXME: Validate address.
72 if self.is_alive(): # This is Thread.is_alive
73 raise RuntimeError("Cannot set address on a running channel!")
74 else:
75 if address is None:
76 address = (LOCALHOST, 0)
77 self._address = address
78
79 address = property(get_address, set_adresss)
80
81 98 def add_io_state(self, state):
82 99 """Add IO state to the eventloop.
83 100
101 Parameters
102 ----------
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 The IO state flag to set.
105
84 106 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 107 """
86 108 def add_io_state_callback():
@@ -92,6 +114,11 b' class ZmqSocketChannel(Thread):'
92 114 def drop_io_state(self, state):
93 115 """Drop IO state from the eventloop.
94 116
117 Parameters
118 ----------
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 The IO state flag to set.
121
95 122 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 123 """
97 124 def drop_io_state_callback():
@@ -101,48 +128,30 b' class ZmqSocketChannel(Thread):'
101 128 self.ioloop.add_callback(drop_io_state_callback)
102 129
103 130
104 class SubSocketChannel(ZmqSocketChannel):
131 class XReqSocketChannel(ZmqSocketChannel):
132 """The XREQ channel for issues request/replies to the kernel.
133 """
105 134
106 def __init__(self, context, session, address=None):
107 super(SubSocketChannel, self).__init__(context, session, address)
135 command_queue = None
136
137 def __init__(self, context, session, address):
138 self.command_queue = Queue()
139 super(XReqSocketChannel, self).__init__(context, session, address)
108 140
109 141 def run(self):
110 self.socket = self.context.socket(zmq.SUB)
111 self.socket.setsockopt(zmq.SUBSCRIBE,'')
142 """The thread's main activity. Call start() instead."""
143 self.socket = self.context.socket(zmq.XREQ)
112 144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
113 145 self.socket.connect('tcp://%s:%i' % self.address)
114 146 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
147 self.iostate = POLLERR|POLLIN
116 148 self.ioloop.add_handler(self.socket, self._handle_events,
117 149 self.iostate)
118 150 self.ioloop.start()
119 151
120 152 def stop(self):
121 153 self.ioloop.stop()
122 super(SubSocketChannel, self).stop()
123
124 def _handle_events(self, socket, events):
125 # Turn on and off POLLOUT depending on if we have made a request
126 if events & POLLERR:
127 self._handle_err()
128 if events & POLLIN:
129 self._handle_recv()
130
131 def _handle_err(self):
132 # We don't want to let this go silently, so eventually we should log.
133 raise zmq.ZMQError()
134
135 def _handle_recv(self):
136 # Get all of the messages we can
137 while True:
138 try:
139 msg = self.socket.recv_json(zmq.NOBLOCK)
140 except zmq.ZMQError:
141 # Check the errno?
142 # Will this tigger POLLERR?
143 break
144 else:
145 self.call_handlers(msg)
154 super(XReqSocketChannel, self).stop()
146 155
147 156 def call_handlers(self, msg):
148 157 """This method is called in the ioloop thread when a message arrives.
@@ -154,59 +163,65 b' class SubSocketChannel(ZmqSocketChannel):'
154 163 """
155 164 raise NotImplementedError('call_handlers must be defined in a subclass.')
156 165
157 def flush(self, timeout=1.0):
158 """Immediately processes all pending messages on the SUB channel.
159
160 This method is thread safe.
166 def execute(self, code):
167 """Execute code in the kernel.
161 168
162 169 Parameters
163 170 ----------
164 timeout : float, optional
165 The maximum amount of time to spend flushing, in seconds. The
166 default is one second.
167 """
168 # We do the IOLoop callback process twice to ensure that the IOLoop
169 # gets to perform at least one full poll.
170 stop_time = time.time() + timeout
171 for i in xrange(2):
172 self._flushed = False
173 self.ioloop.add_callback(self._flush)
174 while not self._flushed and time.time() < stop_time:
175 time.sleep(0.01)
171 code : str
172 A string of Python code.
176 173
177 def _flush(self):
178 """Called in this thread by the IOLoop to indicate that all events have
179 been processed.
174 Returns
175 -------
176 The msg_id of the message sent.
180 177 """
181 self._flushed = True
178 # Create class for content/msg creation. Related to, but possibly
179 # not in Session.
180 content = dict(code=code)
181 msg = self.session.msg('execute_request', content)
182 self._queue_request(msg)
183 return msg['header']['msg_id']
182 184
185 def complete(self, text, line, block=None):
186 """Tab complete text, line, block in the kernel's namespace.
183 187
184 class XReqSocketChannel(ZmqSocketChannel):
188 Parameters
189 ----------
190 text : str
191 The text to complete.
192 line : str
193 The full line of text that is the surrounding context for the
194 text to complete.
195 block : str
196 The full block of code in which the completion is being requested.
197
198 Returns
199 -------
200 The msg_id of the message sent.
185 201
186 handler_queue = None
187 command_queue = None
188 handlers = None
189 _overriden_call_handler = None
202 """
203 content = dict(text=text, line=line)
204 msg = self.session.msg('complete_request', content)
205 self._queue_request(msg)
206 return msg['header']['msg_id']
190 207
191 def __init__(self, context, session, address=None):
192 self.handlers = {}
193 self.handler_queue = Queue()
194 self.command_queue = Queue()
195 super(XReqSocketChannel, self).__init__(context, session, address)
208 def object_info(self, oname):
209 """Get metadata information about an object.
196 210
197 def run(self):
198 self.socket = self.context.socket(zmq.XREQ)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 self.socket.connect('tcp://%s:%i' % self.address)
201 self.ioloop = ioloop.IOLoop()
202 self.iostate = POLLERR|POLLIN
203 self.ioloop.add_handler(self.socket, self._handle_events,
204 self.iostate)
205 self.ioloop.start()
211 Parameters
212 ----------
213 oname : str
214 A string specifying the object name.
206 215
207 def stop(self):
208 self.ioloop.stop()
209 super(XReqSocketChannel, self).stop()
216 Returns
217 -------
218 The msg_id of the message sent.
219 """
220 print oname
221 content = dict(oname=oname)
222 msg = self.session.msg('object_info_request', content)
223 self._queue_request(msg)
224 return msg['header']['msg_id']
210 225
211 226 def _handle_events(self, socket, events):
212 227 if events & POLLERR:
@@ -234,82 +249,113 b' class XReqSocketChannel(ZmqSocketChannel):'
234 249 # We don't want to let this go silently, so eventually we should log.
235 250 raise zmq.ZMQError()
236 251
237 def _queue_request(self, msg, callback):
238 handler = self._find_handler(msg['msg_type'], callback)
239 self.handler_queue.put(handler)
252 def _queue_request(self, msg):
240 253 self.command_queue.put(msg)
241 254 self.add_io_state(POLLOUT)
242 255
243 def execute(self, code, callback=None):
244 # Create class for content/msg creation. Related to, but possibly
245 # not in Session.
246 content = dict(code=code)
247 msg = self.session.msg('execute_request', content)
248 self._queue_request(msg, callback)
249 return msg['header']['msg_id']
250 256
251 def complete(self, text, line, block=None, callback=None):
252 content = dict(text=text, line=line)
253 msg = self.session.msg('complete_request', content)
254 self._queue_request(msg, callback)
255 return msg['header']['msg_id']
257 class SubSocketChannel(ZmqSocketChannel):
258 """The SUB channel which listens for messages that the kernel publishes.
259 """
256 260
257 def object_info(self, oname, callback=None):
258 content = dict(oname=oname)
259 msg = self.session.msg('object_info_request', content)
260 self._queue_request(msg, callback)
261 return msg['header']['msg_id']
261 def __init__(self, context, session, address):
262 super(SubSocketChannel, self).__init__(context, session, address)
262 263
263 def _find_handler(self, name, callback):
264 if callback is not None:
265 return callback
266 handler = self.handlers.get(name)
267 if handler is None:
268 raise MissingHandlerError(
269 'No handler defined for method: %s' % name)
270 return handler
264 def run(self):
265 """The thread's main activity. Call start() instead."""
266 self.socket = self.context.socket(zmq.SUB)
267 self.socket.setsockopt(zmq.SUBSCRIBE,'')
268 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
269 self.socket.connect('tcp://%s:%i' % self.address)
270 self.ioloop = ioloop.IOLoop()
271 self.iostate = POLLIN|POLLERR
272 self.ioloop.add_handler(self.socket, self._handle_events,
273 self.iostate)
274 self.ioloop.start()
271 275
272 def override_call_handler(self, func):
273 """Permanently override the call_handler.
276 def stop(self):
277 self.ioloop.stop()
278 super(SubSocketChannel, self).stop()
274 279
275 The function func will be called as::
280 def call_handlers(self, msg):
281 """This method is called in the ioloop thread when a message arrives.
276 282
277 func(handler, msg)
283 Subclasses should override this method to handle incoming messages.
284 It is important to remember that this method is called in the thread
285 so that some logic must be done to ensure that the application leve
286 handlers are called in the application thread.
287 """
288 raise NotImplementedError('call_handlers must be defined in a subclass.')
278 289
279 And must call::
290 def flush(self, timeout=1.0):
291 """Immediately processes all pending messages on the SUB channel.
280 292
281 handler(msg)
293 This method is thread safe.
282 294
283 in the main thread.
295 Parameters
296 ----------
297 timeout : float, optional
298 The maximum amount of time to spend flushing, in seconds. The
299 default is one second.
284 300 """
285 assert callable(func), "not a callable: %r" % func
286 self._overriden_call_handler = func
301 # We do the IOLoop callback process twice to ensure that the IOLoop
302 # gets to perform at least one full poll.
303 stop_time = time.time() + timeout
304 for i in xrange(2):
305 self._flushed = False
306 self.ioloop.add_callback(self._flush)
307 while not self._flushed and time.time() < stop_time:
308 time.sleep(0.01)
287 309
288 def call_handlers(self, msg):
310 def _handle_events(self, socket, events):
311 # Turn on and off POLLOUT depending on if we have made a request
312 if events & POLLERR:
313 self._handle_err()
314 if events & POLLIN:
315 self._handle_recv()
316
317 def _handle_err(self):
318 # We don't want to let this go silently, so eventually we should log.
319 raise zmq.ZMQError()
320
321 def _handle_recv(self):
322 # Get all of the messages we can
323 while True:
289 324 try:
290 handler = self.handler_queue.get(False)
291 except Empty:
292 print "Message received with no handler!!!"
293 print msg
294 else:
295 self.call_handler(handler, msg)
296
297 def call_handler(self, handler, msg):
298 if self._overriden_call_handler is not None:
299 self._overriden_call_handler(handler, msg)
300 elif hasattr(self, '_call_handler'):
301 call_handler = getattr(self, '_call_handler')
302 call_handler(handler, msg)
325 msg = self.socket.recv_json(zmq.NOBLOCK)
326 except zmq.ZMQError:
327 # Check the errno?
328 # Will this tigger POLLERR?
329 break
303 330 else:
304 raise RuntimeError('no handler!')
331 self.call_handlers(msg)
332
333 def _flush(self):
334 """Callback for :method:`self.flush`."""
335 self._flushed = True
305 336
306 337
307 338 class RepSocketChannel(ZmqSocketChannel):
339 """A reply channel to handle raw_input requests that the kernel makes."""
340
341 def run(self):
342 """The thread's main activity. Call start() instead."""
343 self.ioloop = ioloop.IOLoop()
344 self.ioloop.start()
345
346 def stop(self):
347 self.ioloop.stop()
348 super(SubSocketChannel, self).stop()
308 349
309 350 def on_raw_input(self):
310 351 pass
311 352
312 353
354 #-----------------------------------------------------------------------------
355 # Main kernel manager class
356 #-----------------------------------------------------------------------------
357
358
313 359 class KernelManager(HasTraits):
314 360 """ Manages a kernel for a frontend.
315 361
@@ -321,60 +367,67 b' class KernelManager(HasTraits):'
321 367 The REP channel is for the kernel to request stdin (raw_input) from the
322 368 frontend.
323 369 """
324
325 # Whether the kernel manager is currently listening on its channels.
326 is_listening = Bool(False)
327
328 370 # The PyZMQ Context to use for communication with the kernel.
329 context = Instance(zmq.Context, ())
371 context = Instance(zmq.Context)
330 372
331 373 # The Session to use for communication with the kernel.
332 session = Instance(Session, ())
374 session = Instance(Session)
333 375
334 376 # The classes to use for the various channels.
335 sub_channel_class = Type(SubSocketChannel)
336 377 xreq_channel_class = Type(XReqSocketChannel)
378 sub_channel_class = Type(SubSocketChannel)
337 379 rep_channel_class = Type(RepSocketChannel)
338 380
339 381 # Protected traits.
340 382 _kernel = Instance(Popen)
341 _sub_channel = Any
383 _xreq_address = Any
384 _sub_address = Any
385 _rep_address = Any
342 386 _xreq_channel = Any
387 _sub_channel = Any
343 388 _rep_channel = Any
344 389
390 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
391 context=None, session=None):
392 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
393 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
394 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
395 self.context = zmq.Context() if context is None else context
396 self.session = Session() if session is None else session
397
345 398 #--------------------------------------------------------------------------
346 399 # Channel management methods:
347 400 #--------------------------------------------------------------------------
348 401
349 def start_listening(self):
350 """Starts listening on the specified ports. If already listening, raises
351 a RuntimeError.
402 def start_channels(self):
403 """Starts the channels for this kernel.
404
405 This will create the channels if they do not exist and then start
406 them. If port numbers of 0 are being used (random ports) then you
407 must first call :method:`start_kernel`. If the channels have been
408 stopped and you call this, :class:`RuntimeError` will be raised.
352 409 """
353 if self.is_listening:
354 raise RuntimeError("Cannot start listening. Already listening!")
355 else:
356 self.is_listening = True
357 self.sub_channel.start()
358 410 self.xreq_channel.start()
411 self.sub_channel.start()
359 412 self.rep_channel.start()
360 413
361 @property
362 def is_alive(self):
363 """ Returns whether the kernel is alive. """
364 if self.is_listening:
365 # TODO: check if alive.
366 return True
367 else:
368 return False
414 def stop_channels(self):
415 """Stops the channels for this kernel.
369 416
370 def stop_listening(self):
371 """Stops listening. If not listening, does nothing. """
372 if self.is_listening:
373 self.is_listening = False
374 self.sub_channel.stop()
417 This stops the channels by joining their threads. If the channels
418 were not started, :class:`RuntimeError` will be raised.
419 """
375 420 self.xreq_channel.stop()
421 self.sub_channel.stop()
376 422 self.rep_channel.stop()
377 423
424 @property
425 def channels_running(self):
426 """Are all of the channels created and running?"""
427 return self.xreq_channel.is_alive() \
428 and self.sub_channel.is_alive() \
429 and self.rep_channel.is_alive()
430
378 431 #--------------------------------------------------------------------------
379 432 # Kernel process management methods:
380 433 #--------------------------------------------------------------------------
@@ -382,9 +435,8 b' class KernelManager(HasTraits):'
382 435 def start_kernel(self):
383 436 """Starts a kernel process and configures the manager to use it.
384 437
385 If ports have been specified via the address attributes, they are used.
386 Otherwise, open ports are chosen by the OS and the channel port
387 attributes are configured as appropriate.
438 If random ports (port=0) are being used, this method must be called
439 before the channels are created.
388 440 """
389 441 xreq, sub = self.xreq_address, self.sub_address
390 442 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
@@ -393,24 +445,13 b' class KernelManager(HasTraits):'
393 445 "configured properly.")
394 446
395 447 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
396 self.set_kernel(kernel)
397 self.xreq_address = (LOCALHOST, xrep)
398 self.sub_address = (LOCALHOST, pub)
399
400 def set_kernel(self, kernel):
401 """Sets the kernel manager's kernel to an existing kernel process.
402
403 It is *not* necessary to a set a kernel to communicate with it via the
404 channels, and those objects must be configured separately. It
405 *is* necessary to set a kernel if you want to use the manager (or
406 frontends that use the manager) to signal and/or kill the kernel.
407
408 Parameters:
409 -----------
410 kernel : Popen
411 An existing kernel process.
412 """
413 448 self._kernel = kernel
449 self._xreq_address = (LOCALHOST, xrep)
450 self._sub_address = (LOCALHOST, pub)
451 # The rep channel is not fully working yet, but its base class makes
452 # sure the port is not 0. We set to -1 for now until the rep channel
453 # is fully working.
454 self._rep_address = (LOCALHOST, -1)
414 455
415 456 @property
416 457 def has_kernel(self):
@@ -423,7 +464,7 b' class KernelManager(HasTraits):'
423 464
424 465 def kill_kernel(self):
425 466 """ Kill the running kernel. """
426 if self._kernel:
467 if self._kernel is not None:
427 468 self._kernel.kill()
428 469 self._kernel = None
429 470 else:
@@ -431,67 +472,65 b' class KernelManager(HasTraits):'
431 472
432 473 def signal_kernel(self, signum):
433 474 """ Sends a signal to the kernel. """
434 if self._kernel:
475 if self._kernel is not None:
435 476 self._kernel.send_signal(signum)
436 477 else:
437 478 raise RuntimeError("Cannot signal kernel. No kernel is running!")
438 479
480 @property
481 def is_alive(self):
482 """Is the kernel process still running?"""
483 if self._kernel is not None:
484 if self._kernel.poll() is None:
485 return True
486 else:
487 return False
488 else:
489 # We didn't start the kernel with this KernelManager so we don't
490 # know if it is running. We should use a heartbeat for this case.
491 return True
492
439 493 #--------------------------------------------------------------------------
440 494 # Channels used for communication with the kernel:
441 495 #--------------------------------------------------------------------------
442 496
443 497 @property
444 def sub_channel(self):
445 """Get the SUB socket channel object."""
446 if self._sub_channel is None:
447 self._sub_channel = self.sub_channel_class(self.context,
448 self.session)
449 return self._sub_channel
450
451 @property
452 498 def xreq_channel(self):
453 499 """Get the REQ socket channel object to make requests of the kernel."""
454 500 if self._xreq_channel is None:
455 501 self._xreq_channel = self.xreq_channel_class(self.context,
456 self.session)
502 self.session,
503 self.xreq_address)
457 504 return self._xreq_channel
458 505
459 506 @property
507 def sub_channel(self):
508 """Get the SUB socket channel object."""
509 if self._sub_channel is None:
510 self._sub_channel = self.sub_channel_class(self.context,
511 self.session,
512 self.sub_address)
513 return self._sub_channel
514
515 @property
460 516 def rep_channel(self):
461 517 """Get the REP socket channel object to handle stdin (raw_input)."""
462 518 if self._rep_channel is None:
463 519 self._rep_channel = self.rep_channel_class(self.context,
464 self.session)
520 self.session,
521 self.rep_address)
465 522 return self._rep_channel
466 523
467 #--------------------------------------------------------------------------
468 # Delegates for the Channel address attributes:
469 #--------------------------------------------------------------------------
470
471 def get_sub_address(self):
472 return self.sub_channel.address
473
474 def set_sub_address(self, address):
475 self.sub_channel.address = address
476
477 sub_address = property(get_sub_address, set_sub_address,
478 doc="The address used by SUB socket channel.")
479
480 def get_xreq_address(self):
481 return self.xreq_channel.address
482
483 def set_xreq_address(self, address):
484 self.xreq_channel.address = address
485
486 xreq_address = property(get_xreq_address, set_xreq_address,
487 doc="The address used by XREQ socket channel.")
524 @property
525 def xreq_address(self):
526 return self._xreq_address
488 527
489 def get_rep_address(self):
490 return self.rep_channel.address
528 @property
529 def sub_address(self):
530 return self._sub_address
491 531
492 def set_rep_address(self, address):
493 self.rep_channel.address = address
532 @property
533 def rep_address(self):
534 return self._rep_address
494 535
495 rep_address = property(get_rep_address, set_rep_address,
496 doc="The address used by REP socket channel.")
497 536
General Comments 0
You need to be logged in to leave comments. Login now