##// END OF EJS Templates
Kernel manager is cleaned up and simplified. Still bugs though.
Brian Granger -
Show More
@@ -175,7 +175,7 b' class FrontendWidget(HistoryConsoleWidget):'
175 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
175 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
176
176
177 # Handle the case where the old kernel manager is still listening.
177 # Handle the case where the old kernel manager is still listening.
178 if self._kernel_manager.is_listening:
178 if self._kernel_manager.channels_running:
179 self._stopped_listening()
179 self._stopped_listening()
180
180
181 # Set the new kernel manager.
181 # Set the new kernel manager.
@@ -197,7 +197,7 b' class FrontendWidget(HistoryConsoleWidget):'
197
197
198 # Handle the case where the kernel manager started listening before
198 # Handle the case where the kernel manager started listening before
199 # we connected.
199 # we connected.
200 if kernel_manager.is_listening:
200 if kernel_manager.channels_running:
201 self._started_listening()
201 self._started_listening()
202
202
203 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
203 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
@@ -79,7 +79,7 b" if __name__ == '__main__':"
79 # Create a KernelManager.
79 # Create a KernelManager.
80 kernel_manager = QtKernelManager()
80 kernel_manager = QtKernelManager()
81 kernel_manager.start_kernel()
81 kernel_manager.start_kernel()
82 kernel_manager.start_listening()
82 kernel_manager.start_channels()
83
83
84 # Don't let Qt or ZMQ swallow KeyboardInterupts.
84 # Don't let Qt or ZMQ swallow KeyboardInterupts.
85 # FIXME: Gah, ZMQ swallows even custom signal handlers. So for now we leave
85 # FIXME: Gah, ZMQ swallows even custom signal handlers. So for now we leave
@@ -92,7 +92,7 b" if __name__ == '__main__':"
92 # Create the application, making sure to clean up nicely when we exit.
92 # Create the application, making sure to clean up nicely when we exit.
93 app = QtGui.QApplication([])
93 app = QtGui.QApplication([])
94 def quit_hook():
94 def quit_hook():
95 kernel_manager.stop_listening()
95 kernel_manager.stop_channels()
96 kernel_manager.kill_kernel()
96 kernel_manager.kill_kernel()
97 app.aboutToQuit.connect(quit_hook)
97 app.aboutToQuit.connect(quit_hook)
98
98
@@ -128,14 +128,14 b' class QtKernelManager(KernelManager, QtCore.QObject):'
128 # 'KernelManager' interface
128 # 'KernelManager' interface
129 #---------------------------------------------------------------------------
129 #---------------------------------------------------------------------------
130
130
131 def start_listening(self):
131 def start_channels(self):
132 """ Reimplemented to emit signal.
132 """ Reimplemented to emit signal.
133 """
133 """
134 super(QtKernelManager, self).start_listening()
134 super(QtKernelManager, self).start_channels()
135 self.started_listening.emit()
135 self.started_listening.emit()
136
136
137 def stop_listening(self):
137 def stop_channels(self):
138 """ Reimplemented to emit signal.
138 """ Reimplemented to emit signal.
139 """
139 """
140 super(QtKernelManager, self).stop_listening()
140 super(QtKernelManager, self).stop_channels()
141 self.stopped_listening.emit()
141 self.stopped_listening.emit()
@@ -1,15 +1,27 b''
1 """Kernel frontend classes.
1 """Classes to manage the interaction with a running kernel.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 Todo
4 ====
4
5
6 * Create logger to handle debugging and console messages.
5 """
7 """
6
8
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
15
16 #-----------------------------------------------------------------------------
17 # Imports
18 #-----------------------------------------------------------------------------
19
7 # Standard library imports.
20 # Standard library imports.
8 from Queue import Queue, Empty
21 from Queue import Queue, Empty
9 from subprocess import Popen
22 from subprocess import Popen
10 from threading import Thread
23 from threading import Thread
11 import time
24 import time
12 import traceback
13
25
14 # System library imports.
26 # System library imports.
15 import zmq
27 import zmq
@@ -17,66 +29,80 b' from zmq import POLLIN, POLLOUT, POLLERR'
17 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
18
30
19 # Local imports.
31 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
21 Type
22 from kernel import launch_kernel
33 from kernel import launch_kernel
23 from session import Session
34 from session import Session
24
35
25 # Constants.
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
39
26 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
27
41
42 class InvalidPortNumber(Exception):
43 pass
28
44
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
29
48
30 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
31 """ The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
32 """
51 """
33 context = None
52 context = None
34 session = None
53 session = None
35 socket = None
54 socket = None
36 ioloop = None
55 ioloop = None
37 iostate = None
56 iostate = None
57 _address = None
58
59 def __init__(self, context, session, address):
60 """Create a channel
38
61
39 def __init__(self, context, session, address=None):
62 Parameters
63 ----------
64 context : zmq.Context
65 The ZMQ context to use.
66 session : session.Session
67 The session to use.
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
40 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
41 self.daemon = True
72 self.daemon = True
42
73
43 self.context = context
74 self.context = context
44 self.session = session
75 self.session = session
45 self.address = address
76 if address[1] == 0:
77 raise InvalidPortNumber('The port number for a channel cannot be 0.')
78 self._address = address
46
79
47 def stop(self):
80 def stop(self):
48 """Stop the thread's activity. Returns when the thread terminates.
81 """Stop the channel's activity.
49
82
50 The thread will raise :class:`RuntimeError` if :method:`self.start`
83 This calls :method:`Thread.join` and returns when the thread
51 is called again.
84 terminates. :class:`RuntimeError` will be raised if
85 :method:`self.start` is called again.
52 """
86 """
53 self.join()
87 self.join()
54
88
55
89 @property
56 def get_address(self):
90 def address(self):
57 """ Get the channel's address. By the default, a channel is on
91 """Get the channel's address as an (ip, port) tuple.
58 localhost with no port specified (a negative port number).
92
93 By the default, the address is (localhost, 0), where 0 means a random
94 port.
59 """
95 """
60 return self._address
96 return self._address
61
97
62 def set_adresss(self, address):
63 """ Set the channel's address. Should be a tuple of form:
64 (ip address [str], port [int]).
65 or None, in which case the address is reset to its default value.
66 """
67 # FIXME: Validate address.
68 if self.is_alive(): # This is Thread.is_alive
69 raise RuntimeError("Cannot set address on a running channel!")
70 else:
71 if address is None:
72 address = (LOCALHOST, 0)
73 self._address = address
74
75 address = property(get_address, set_adresss)
76
77 def add_io_state(self, state):
98 def add_io_state(self, state):
78 """Add IO state to the eventloop.
99 """Add IO state to the eventloop.
79
100
101 Parameters
102 ----------
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 The IO state flag to set.
105
80 This is thread safe as it uses the thread safe IOLoop.add_callback.
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
81 """
107 """
82 def add_io_state_callback():
108 def add_io_state_callback():
@@ -88,6 +114,11 b' class ZmqSocketChannel(Thread):'
88 def drop_io_state(self, state):
114 def drop_io_state(self, state):
89 """Drop IO state from the eventloop.
115 """Drop IO state from the eventloop.
90
116
117 Parameters
118 ----------
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 The IO state flag to set.
121
91 This is thread safe as it uses the thread safe IOLoop.add_callback.
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
92 """
123 """
93 def drop_io_state_callback():
124 def drop_io_state_callback():
@@ -97,48 +128,30 b' class ZmqSocketChannel(Thread):'
97 self.ioloop.add_callback(drop_io_state_callback)
128 self.ioloop.add_callback(drop_io_state_callback)
98
129
99
130
100 class SubSocketChannel(ZmqSocketChannel):
131 class XReqSocketChannel(ZmqSocketChannel):
132 """The XREQ channel for issues request/replies to the kernel.
133 """
101
134
102 def __init__(self, context, session, address=None):
135 command_queue = None
103 super(SubSocketChannel, self).__init__(context, session, address)
136
137 def __init__(self, context, session, address):
138 self.command_queue = Queue()
139 super(XReqSocketChannel, self).__init__(context, session, address)
104
140
105 def run(self):
141 def run(self):
106 self.socket = self.context.socket(zmq.SUB)
142 """The thread's main activity. Call start() instead."""
107 self.socket.setsockopt(zmq.SUBSCRIBE,'')
143 self.socket = self.context.socket(zmq.XREQ)
108 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
109 self.socket.connect('tcp://%s:%i' % self.address)
145 self.socket.connect('tcp://%s:%i' % self.address)
110 self.ioloop = ioloop.IOLoop()
146 self.ioloop = ioloop.IOLoop()
111 self.iostate = POLLIN|POLLERR
147 self.iostate = POLLERR|POLLIN
112 self.ioloop.add_handler(self.socket, self._handle_events,
148 self.ioloop.add_handler(self.socket, self._handle_events,
113 self.iostate)
149 self.iostate)
114 self.ioloop.start()
150 self.ioloop.start()
115
151
116 def stop(self):
152 def stop(self):
117 self.ioloop.stop()
153 self.ioloop.stop()
118 super(SubSocketChannel, self).stop()
154 super(XReqSocketChannel, self).stop()
119
120 def _handle_events(self, socket, events):
121 # Turn on and off POLLOUT depending on if we have made a request
122 if events & POLLERR:
123 self._handle_err()
124 if events & POLLIN:
125 self._handle_recv()
126
127 def _handle_err(self):
128 # We don't want to let this go silently, so eventually we should log.
129 raise zmq.ZMQError()
130
131 def _handle_recv(self):
132 # Get all of the messages we can
133 while True:
134 try:
135 msg = self.socket.recv_json(zmq.NOBLOCK)
136 except zmq.ZMQError:
137 # Check the errno?
138 # Will this tigger POLLERR?
139 break
140 else:
141 self.call_handlers(msg)
142
155
143 def call_handlers(self, msg):
156 def call_handlers(self, msg):
144 """This method is called in the ioloop thread when a message arrives.
157 """This method is called in the ioloop thread when a message arrives.
@@ -150,54 +163,65 b' class SubSocketChannel(ZmqSocketChannel):'
150 """
163 """
151 raise NotImplementedError('call_handlers must be defined in a subclass.')
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
152
165
153 def flush(self, timeout=1.0):
166 def execute(self, code):
154 """Immediately processes all pending messages on the SUB channel.
167 """Execute code in the kernel.
155
156 This method is thread safe.
157
168
158 Parameters
169 Parameters
159 ----------
170 ----------
160 timeout : float, optional
171 code : str
161 The maximum amount of time to spend flushing, in seconds. The
172 A string of Python code.
162 default is one second.
163 """
164 # We do the IOLoop callback process twice to ensure that the IOLoop
165 # gets to perform at least one full poll.
166 stop_time = time.time() + timeout
167 for i in xrange(2):
168 self._flushed = False
169 self.ioloop.add_callback(self._flush)
170 while not self._flushed and time.time() < stop_time:
171 time.sleep(0.01)
172
173 def _flush(self):
174 """Called in this thread by the IOLoop to indicate that all events have
175 been processed.
176 """
177 self._flushed = True
178
173
174 Returns
175 -------
176 The msg_id of the message sent.
177 """
178 # Create class for content/msg creation. Related to, but possibly
179 # not in Session.
180 content = dict(code=code)
181 msg = self.session.msg('execute_request', content)
182 self._queue_request(msg)
183 return msg['header']['msg_id']
179
184
180 class XReqSocketChannel(ZmqSocketChannel):
185 def complete(self, text, line, block=None):
186 """Tab complete text, line, block in the kernel's namespace.
181
187
182 command_queue = None
188 Parameters
189 ----------
190 text : str
191 The text to complete.
192 line : str
193 The full line of text that is the surrounding context for the
194 text to complete.
195 block : str
196 The full block of code in which the completion is being requested.
197
198 Returns
199 -------
200 The msg_id of the message sent.
183
201
184 def __init__(self, context, session, address=None):
202 """
185 self.command_queue = Queue()
203 content = dict(text=text, line=line)
186 super(XReqSocketChannel, self).__init__(context, session, address)
204 msg = self.session.msg('complete_request', content)
205 self._queue_request(msg)
206 return msg['header']['msg_id']
187
207
188 def run(self):
208 def object_info(self, oname):
189 self.socket = self.context.socket(zmq.XREQ)
209 """Get metadata information about an object.
190 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
191 self.socket.connect('tcp://%s:%i' % self.address)
192 self.ioloop = ioloop.IOLoop()
193 self.iostate = POLLERR|POLLIN
194 self.ioloop.add_handler(self.socket, self._handle_events,
195 self.iostate)
196 self.ioloop.start()
197
210
198 def stop(self):
211 Parameters
199 self.ioloop.stop()
212 ----------
200 super(XReqSocketChannel, self).stop()
213 oname : str
214 A string specifying the object name.
215
216 Returns
217 -------
218 The msg_id of the message sent.
219 """
220 print oname
221 content = dict(oname=oname)
222 msg = self.session.msg('object_info_request', content)
223 self._queue_request(msg)
224 return msg['header']['msg_id']
201
225
202 def _handle_events(self, socket, events):
226 def _handle_events(self, socket, events):
203 if events & POLLERR:
227 if events & POLLERR:
@@ -225,10 +249,34 b' class XReqSocketChannel(ZmqSocketChannel):'
225 # We don't want to let this go silently, so eventually we should log.
249 # We don't want to let this go silently, so eventually we should log.
226 raise zmq.ZMQError()
250 raise zmq.ZMQError()
227
251
228 def _queue_request(self, msg, callback):
252 def _queue_request(self, msg):
229 self.command_queue.put(msg)
253 self.command_queue.put(msg)
230 self.add_io_state(POLLOUT)
254 self.add_io_state(POLLOUT)
231
255
256
257 class SubSocketChannel(ZmqSocketChannel):
258 """The SUB channel which listens for messages that the kernel publishes.
259 """
260
261 def __init__(self, context, session, address):
262 super(SubSocketChannel, self).__init__(context, session, address)
263
264 def run(self):
265 """The thread's main activity. Call start() instead."""
266 self.socket = self.context.socket(zmq.SUB)
267 self.socket.setsockopt(zmq.SUBSCRIBE,'')
268 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
269 self.socket.connect('tcp://%s:%i' % self.address)
270 self.ioloop = ioloop.IOLoop()
271 self.iostate = POLLIN|POLLERR
272 self.ioloop.add_handler(self.socket, self._handle_events,
273 self.iostate)
274 self.ioloop.start()
275
276 def stop(self):
277 self.ioloop.stop()
278 super(SubSocketChannel, self).stop()
279
232 def call_handlers(self, msg):
280 def call_handlers(self, msg):
233 """This method is called in the ioloop thread when a message arrives.
281 """This method is called in the ioloop thread when a message arrives.
234
282
@@ -239,33 +287,66 b' class XReqSocketChannel(ZmqSocketChannel):'
239 """
287 """
240 raise NotImplementedError('call_handlers must be defined in a subclass.')
288 raise NotImplementedError('call_handlers must be defined in a subclass.')
241
289
242 def execute(self, code, callback=None):
290 def flush(self, timeout=1.0):
243 # Create class for content/msg creation. Related to, but possibly
291 """Immediately processes all pending messages on the SUB channel.
244 # not in Session.
245 content = dict(code=code)
246 msg = self.session.msg('execute_request', content)
247 self._queue_request(msg, callback)
248 return msg['header']['msg_id']
249
292
250 def complete(self, text, line, block=None, callback=None):
293 This method is thread safe.
251 content = dict(text=text, line=line)
252 msg = self.session.msg('complete_request', content)
253 self._queue_request(msg, callback)
254 return msg['header']['msg_id']
255
294
256 def object_info(self, oname, callback=None):
295 Parameters
257 content = dict(oname=oname)
296 ----------
258 msg = self.session.msg('object_info_request', content)
297 timeout : float, optional
259 self._queue_request(msg, callback)
298 The maximum amount of time to spend flushing, in seconds. The
260 return msg['header']['msg_id']
299 default is one second.
300 """
301 # We do the IOLoop callback process twice to ensure that the IOLoop
302 # gets to perform at least one full poll.
303 stop_time = time.time() + timeout
304 for i in xrange(2):
305 self._flushed = False
306 self.ioloop.add_callback(self._flush)
307 while not self._flushed and time.time() < stop_time:
308 time.sleep(0.01)
309
310 def _handle_events(self, socket, events):
311 # Turn on and off POLLOUT depending on if we have made a request
312 if events & POLLERR:
313 self._handle_err()
314 if events & POLLIN:
315 self._handle_recv()
316
317 def _handle_err(self):
318 # We don't want to let this go silently, so eventually we should log.
319 raise zmq.ZMQError()
320
321 def _handle_recv(self):
322 # Get all of the messages we can
323 while True:
324 try:
325 msg = self.socket.recv_json(zmq.NOBLOCK)
326 except zmq.ZMQError:
327 # Check the errno?
328 # Will this tigger POLLERR?
329 break
330 else:
331 self.call_handlers(msg)
332
333 def _flush(self):
334 """Callback for :method:`self.flush`."""
335 self._flushed = True
261
336
262
337
263 class RepSocketChannel(ZmqSocketChannel):
338 class RepSocketChannel(ZmqSocketChannel):
339 """A reply channel to handle raw_input requests that the kernel makes."""
264
340
265 def on_raw_input(self):
341 def on_raw_input(self):
266 pass
342 pass
267
343
268
344
345 #-----------------------------------------------------------------------------
346 # Main kernel manager class
347 #-----------------------------------------------------------------------------
348
349
269 class KernelManager(HasTraits):
350 class KernelManager(HasTraits):
270 """ Manages a kernel for a frontend.
351 """ Manages a kernel for a frontend.
271
352
@@ -277,59 +358,66 b' class KernelManager(HasTraits):'
277 The REP channel is for the kernel to request stdin (raw_input) from the
358 The REP channel is for the kernel to request stdin (raw_input) from the
278 frontend.
359 frontend.
279 """
360 """
280
281 # Whether the kernel manager is currently listening on its channels.
282 is_listening = Bool(False)
283
284 # The PyZMQ Context to use for communication with the kernel.
361 # The PyZMQ Context to use for communication with the kernel.
285 context = Instance(zmq.Context, ())
362 context = Instance(zmq.Context)
286
363
287 # The Session to use for communication with the kernel.
364 # The Session to use for communication with the kernel.
288 session = Instance(Session, ())
365 session = Instance(Session)
289
366
290 # The classes to use for the various channels.
367 # The classes to use for the various channels.
291 sub_channel_class = Type(SubSocketChannel)
292 xreq_channel_class = Type(XReqSocketChannel)
368 xreq_channel_class = Type(XReqSocketChannel)
369 sub_channel_class = Type(SubSocketChannel)
293 rep_channel_class = Type(RepSocketChannel)
370 rep_channel_class = Type(RepSocketChannel)
294
371
295 # Protected traits.
372 # Protected traits.
296 _kernel = Instance(Popen)
373 _kernel = Instance(Popen)
297 _sub_channel = Any
374 _xreq_address = Any
375 _sub_address = Any
376 _rep_address = Any
298 _xreq_channel = Any
377 _xreq_channel = Any
378 _sub_channel = Any
299 _rep_channel = Any
379 _rep_channel = Any
300
380
381 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
382 context=None, session=None):
383 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
384 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
385 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
386 self.context = zmq.Context() if context is None else context
387 self.session = Session() if session is None else session
388
301 #--------------------------------------------------------------------------
389 #--------------------------------------------------------------------------
302 # Channel management methods:
390 # Channel management methods:
303 #--------------------------------------------------------------------------
391 #--------------------------------------------------------------------------
304
392
305 def start_listening(self):
393 def start_channels(self):
306 """Starts listening on the specified ports. If already listening, raises
394 """Starts the channels for this kernel.
307 a RuntimeError.
395
396 This will create the channels if they do not exist and then start
397 them. If port numbers of 0 are being used (random ports) then you
398 must first call :method:`start_kernel`. If the channels have been
399 stopped and you call this, :class:`RuntimeError` will be raised.
308 """
400 """
309 if self.is_listening:
401 self.xreq_channel.start()
310 raise RuntimeError("Cannot start listening. Already listening!")
402 self.sub_channel.start()
311 else:
403 self.rep_channel.start()
312 self.is_listening = True
313 self.sub_channel.start()
314 self.xreq_channel.start()
315 self.rep_channel.start()
316
404
317 @property
405 def stop_channels(self):
318 def is_alive(self):
406 """Stops the channels for this kernel.
319 """ Returns whether the kernel is alive. """
407
320 if self.is_listening:
408 This stops the channels by joining their threads. If the channels
321 # TODO: check if alive.
409 were not started, :class:`RuntimeError` will be raised.
322 return True
410 """
323 else:
411 self.xreq_channel.stop()
324 return False
412 self.sub_channel.stop()
413 self.rep_channel.stop()
325
414
326 def stop_listening(self):
415 @property
327 """Stops listening. If not listening, does nothing. """
416 def channels_running(self):
328 if self.is_listening:
417 """Are all of the channels created and running?"""
329 self.is_listening = False
418 return self.xreq_channel.is_alive() \
330 self.sub_channel.stop()
419 and self.sub_channel.is_alive() \
331 self.xreq_channel.stop()
420 and self.rep_channel.is_alive()
332 self.rep_channel.stop()
333
421
334 #--------------------------------------------------------------------------
422 #--------------------------------------------------------------------------
335 # Kernel process management methods:
423 # Kernel process management methods:
@@ -338,9 +426,8 b' class KernelManager(HasTraits):'
338 def start_kernel(self):
426 def start_kernel(self):
339 """Starts a kernel process and configures the manager to use it.
427 """Starts a kernel process and configures the manager to use it.
340
428
341 If ports have been specified via the address attributes, they are used.
429 If random ports (port=0) are being used, this method must be called
342 Otherwise, open ports are chosen by the OS and the channel port
430 before the channels are created.
343 attributes are configured as appropriate.
344 """
431 """
345 xreq, sub = self.xreq_address, self.sub_address
432 xreq, sub = self.xreq_address, self.sub_address
346 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
433 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
@@ -349,24 +436,14 b' class KernelManager(HasTraits):'
349 "configured properly.")
436 "configured properly.")
350
437
351 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
438 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
352 self.set_kernel(kernel)
353 self.xreq_address = (LOCALHOST, xrep)
354 self.sub_address = (LOCALHOST, pub)
355
356 def set_kernel(self, kernel):
357 """Sets the kernel manager's kernel to an existing kernel process.
358
359 It is *not* necessary to a set a kernel to communicate with it via the
360 channels, and those objects must be configured separately. It
361 *is* necessary to set a kernel if you want to use the manager (or
362 frontends that use the manager) to signal and/or kill the kernel.
363
364 Parameters:
365 -----------
366 kernel : Popen
367 An existing kernel process.
368 """
369 self._kernel = kernel
439 self._kernel = kernel
440 print xrep, pub
441 self._xreq_address = (LOCALHOST, xrep)
442 self._sub_address = (LOCALHOST, pub)
443 # The rep channel is not fully working yet, but its base class makes
444 # sure the port is not 0. We set to -1 for now until the rep channel
445 # is fully working.
446 self._rep_address = (LOCALHOST, -1)
370
447
371 @property
448 @property
372 def has_kernel(self):
449 def has_kernel(self):
@@ -379,7 +456,7 b' class KernelManager(HasTraits):'
379
456
380 def kill_kernel(self):
457 def kill_kernel(self):
381 """ Kill the running kernel. """
458 """ Kill the running kernel. """
382 if self._kernel:
459 if self._kernel is not None:
383 self._kernel.kill()
460 self._kernel.kill()
384 self._kernel = None
461 self._kernel = None
385 else:
462 else:
@@ -387,67 +464,65 b' class KernelManager(HasTraits):'
387
464
388 def signal_kernel(self, signum):
465 def signal_kernel(self, signum):
389 """ Sends a signal to the kernel. """
466 """ Sends a signal to the kernel. """
390 if self._kernel:
467 if self._kernel is not None:
391 self._kernel.send_signal(signum)
468 self._kernel.send_signal(signum)
392 else:
469 else:
393 raise RuntimeError("Cannot signal kernel. No kernel is running!")
470 raise RuntimeError("Cannot signal kernel. No kernel is running!")
394
471
472 @property
473 def is_alive(self):
474 """Is the kernel process still running?"""
475 if self._kernel is not None:
476 if self._kernel.poll() is None:
477 return True
478 else:
479 return False
480 else:
481 # We didn't start the kernel with this KernelManager so we don't
482 # know if it is running. We should use a heartbeat for this case.
483 return True
484
395 #--------------------------------------------------------------------------
485 #--------------------------------------------------------------------------
396 # Channels used for communication with the kernel:
486 # Channels used for communication with the kernel:
397 #--------------------------------------------------------------------------
487 #--------------------------------------------------------------------------
398
488
399 @property
489 @property
400 def sub_channel(self):
401 """Get the SUB socket channel object."""
402 if self._sub_channel is None:
403 self._sub_channel = self.sub_channel_class(self.context,
404 self.session)
405 return self._sub_channel
406
407 @property
408 def xreq_channel(self):
490 def xreq_channel(self):
409 """Get the REQ socket channel object to make requests of the kernel."""
491 """Get the REQ socket channel object to make requests of the kernel."""
410 if self._xreq_channel is None:
492 if self._xreq_channel is None:
411 self._xreq_channel = self.xreq_channel_class(self.context,
493 self._xreq_channel = self.xreq_channel_class(self.context,
412 self.session)
494 self.session,
495 self.xreq_address)
413 return self._xreq_channel
496 return self._xreq_channel
414
497
415 @property
498 @property
499 def sub_channel(self):
500 """Get the SUB socket channel object."""
501 if self._sub_channel is None:
502 self._sub_channel = self.sub_channel_class(self.context,
503 self.session,
504 self.sub_address)
505 return self._sub_channel
506
507 @property
416 def rep_channel(self):
508 def rep_channel(self):
417 """Get the REP socket channel object to handle stdin (raw_input)."""
509 """Get the REP socket channel object to handle stdin (raw_input)."""
418 if self._rep_channel is None:
510 if self._rep_channel is None:
419 self._rep_channel = self.rep_channel_class(self.context,
511 self._rep_channel = self.rep_channel_class(self.context,
420 self.session)
512 self.session,
513 self.rep_address)
421 return self._rep_channel
514 return self._rep_channel
422
515
423 #--------------------------------------------------------------------------
516 @property
424 # Delegates for the Channel address attributes:
517 def xreq_address(self):
425 #--------------------------------------------------------------------------
518 return self._xreq_address
426
427 def get_sub_address(self):
428 return self.sub_channel.address
429
430 def set_sub_address(self, address):
431 self.sub_channel.address = address
432
433 sub_address = property(get_sub_address, set_sub_address,
434 doc="The address used by SUB socket channel.")
435
436 def get_xreq_address(self):
437 return self.xreq_channel.address
438
439 def set_xreq_address(self, address):
440 self.xreq_channel.address = address
441
519
442 xreq_address = property(get_xreq_address, set_xreq_address,
520 @property
443 doc="The address used by XREQ socket channel.")
521 def sub_address(self):
444
522 return self._sub_address
445 def get_rep_address(self):
446 return self.rep_channel.address
447
523
448 def set_rep_address(self, address):
524 @property
449 self.rep_channel.address = address
525 def rep_address(self):
526 return self._rep_address
450
527
451 rep_address = property(get_rep_address, set_rep_address,
452 doc="The address used by REP socket channel.")
453
528
General Comments 0
You need to be logged in to leave comments. Login now