##// END OF EJS Templates
Cleaned up KernelManager.__init__ for better traits usage.
Brian Granger -
Show More
@@ -1,591 +1,571 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from kernel import launch_kernel
33 from kernel import launch_kernel
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
50 """The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
51 """
51 """
52 context = None
52 context = None
53 session = None
53 session = None
54 socket = None
54 socket = None
55 ioloop = None
55 ioloop = None
56 iostate = None
56 iostate = None
57 _address = None
57 _address = None
58
58
59 def __init__(self, context, session, address):
59 def __init__(self, context, session, address):
60 """Create a channel
60 """Create a channel
61
61
62 Parameters
62 Parameters
63 ----------
63 ----------
64 context : :class:`zmq.Context`
64 context : :class:`zmq.Context`
65 The ZMQ context to use.
65 The ZMQ context to use.
66 session : :class:`session.Session`
66 session : :class:`session.Session`
67 The session to use.
67 The session to use.
68 address : tuple
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
70 """
71 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
72 self.daemon = True
72 self.daemon = True
73
73
74 self.context = context
74 self.context = context
75 self.session = session
75 self.session = session
76 if address[1] == 0:
76 if address[1] == 0:
77 message = 'The port number for a channel cannot be 0.'
77 message = 'The port number for a channel cannot be 0.'
78 raise InvalidPortNumber(message)
78 raise InvalidPortNumber(message)
79 self._address = address
79 self._address = address
80
80
81 def stop(self):
81 def stop(self):
82 """Stop the channel's activity.
82 """Stop the channel's activity.
83
83
84 This calls :method:`Thread.join` and returns when the thread
84 This calls :method:`Thread.join` and returns when the thread
85 terminates. :class:`RuntimeError` will be raised if
85 terminates. :class:`RuntimeError` will be raised if
86 :method:`self.start` is called again.
86 :method:`self.start` is called again.
87 """
87 """
88 self.join()
88 self.join()
89
89
90 @property
90 @property
91 def address(self):
91 def address(self):
92 """Get the channel's address as an (ip, port) tuple.
92 """Get the channel's address as an (ip, port) tuple.
93
93
94 By the default, the address is (localhost, 0), where 0 means a random
94 By the default, the address is (localhost, 0), where 0 means a random
95 port.
95 port.
96 """
96 """
97 return self._address
97 return self._address
98
98
99 def add_io_state(self, state):
99 def add_io_state(self, state):
100 """Add IO state to the eventloop.
100 """Add IO state to the eventloop.
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 The IO state flag to set.
105 The IO state flag to set.
106
106
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 """
108 """
109 def add_io_state_callback():
109 def add_io_state_callback():
110 if not self.iostate & state:
110 if not self.iostate & state:
111 self.iostate = self.iostate | state
111 self.iostate = self.iostate | state
112 self.ioloop.update_handler(self.socket, self.iostate)
112 self.ioloop.update_handler(self.socket, self.iostate)
113 self.ioloop.add_callback(add_io_state_callback)
113 self.ioloop.add_callback(add_io_state_callback)
114
114
115 def drop_io_state(self, state):
115 def drop_io_state(self, state):
116 """Drop IO state from the eventloop.
116 """Drop IO state from the eventloop.
117
117
118 Parameters
118 Parameters
119 ----------
119 ----------
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 The IO state flag to set.
121 The IO state flag to set.
122
122
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 """
124 """
125 def drop_io_state_callback():
125 def drop_io_state_callback():
126 if self.iostate & state:
126 if self.iostate & state:
127 self.iostate = self.iostate & (~state)
127 self.iostate = self.iostate & (~state)
128 self.ioloop.update_handler(self.socket, self.iostate)
128 self.ioloop.update_handler(self.socket, self.iostate)
129 self.ioloop.add_callback(drop_io_state_callback)
129 self.ioloop.add_callback(drop_io_state_callback)
130
130
131
131
132 class XReqSocketChannel(ZmqSocketChannel):
132 class XReqSocketChannel(ZmqSocketChannel):
133 """The XREQ channel for issues request/replies to the kernel.
133 """The XREQ channel for issues request/replies to the kernel.
134 """
134 """
135
135
136 command_queue = None
136 command_queue = None
137
137
138 def __init__(self, context, session, address):
138 def __init__(self, context, session, address):
139 self.command_queue = Queue()
139 self.command_queue = Queue()
140 super(XReqSocketChannel, self).__init__(context, session, address)
140 super(XReqSocketChannel, self).__init__(context, session, address)
141
141
142 def run(self):
142 def run(self):
143 """The thread's main activity. Call start() instead."""
143 """The thread's main activity. Call start() instead."""
144 self.socket = self.context.socket(zmq.XREQ)
144 self.socket = self.context.socket(zmq.XREQ)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 self.socket.connect('tcp://%s:%i' % self.address)
146 self.socket.connect('tcp://%s:%i' % self.address)
147 self.ioloop = ioloop.IOLoop()
147 self.ioloop = ioloop.IOLoop()
148 self.iostate = POLLERR|POLLIN
148 self.iostate = POLLERR|POLLIN
149 self.ioloop.add_handler(self.socket, self._handle_events,
149 self.ioloop.add_handler(self.socket, self._handle_events,
150 self.iostate)
150 self.iostate)
151 self.ioloop.start()
151 self.ioloop.start()
152
152
153 def stop(self):
153 def stop(self):
154 self.ioloop.stop()
154 self.ioloop.stop()
155 super(XReqSocketChannel, self).stop()
155 super(XReqSocketChannel, self).stop()
156
156
157 def call_handlers(self, msg):
157 def call_handlers(self, msg):
158 """This method is called in the ioloop thread when a message arrives.
158 """This method is called in the ioloop thread when a message arrives.
159
159
160 Subclasses should override this method to handle incoming messages.
160 Subclasses should override this method to handle incoming messages.
161 It is important to remember that this method is called in the thread
161 It is important to remember that this method is called in the thread
162 so that some logic must be done to ensure that the application leve
162 so that some logic must be done to ensure that the application leve
163 handlers are called in the application thread.
163 handlers are called in the application thread.
164 """
164 """
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166
166
167 def execute(self, code):
167 def execute(self, code):
168 """Execute code in the kernel.
168 """Execute code in the kernel.
169
169
170 Parameters
170 Parameters
171 ----------
171 ----------
172 code : str
172 code : str
173 A string of Python code.
173 A string of Python code.
174
174
175 Returns
175 Returns
176 -------
176 -------
177 The msg_id of the message sent.
177 The msg_id of the message sent.
178 """
178 """
179 # Create class for content/msg creation. Related to, but possibly
179 # Create class for content/msg creation. Related to, but possibly
180 # not in Session.
180 # not in Session.
181 content = dict(code=code)
181 content = dict(code=code)
182 msg = self.session.msg('execute_request', content)
182 msg = self.session.msg('execute_request', content)
183 self._queue_request(msg)
183 self._queue_request(msg)
184 return msg['header']['msg_id']
184 return msg['header']['msg_id']
185
185
186 def complete(self, text, line, block=None):
186 def complete(self, text, line, block=None):
187 """Tab complete text, line, block in the kernel's namespace.
187 """Tab complete text, line, block in the kernel's namespace.
188
188
189 Parameters
189 Parameters
190 ----------
190 ----------
191 text : str
191 text : str
192 The text to complete.
192 The text to complete.
193 line : str
193 line : str
194 The full line of text that is the surrounding context for the
194 The full line of text that is the surrounding context for the
195 text to complete.
195 text to complete.
196 block : str
196 block : str
197 The full block of code in which the completion is being requested.
197 The full block of code in which the completion is being requested.
198
198
199 Returns
199 Returns
200 -------
200 -------
201 The msg_id of the message sent.
201 The msg_id of the message sent.
202 """
202 """
203 content = dict(text=text, line=line)
203 content = dict(text=text, line=line)
204 msg = self.session.msg('complete_request', content)
204 msg = self.session.msg('complete_request', content)
205 self._queue_request(msg)
205 self._queue_request(msg)
206 return msg['header']['msg_id']
206 return msg['header']['msg_id']
207
207
208 def object_info(self, oname):
208 def object_info(self, oname):
209 """Get metadata information about an object.
209 """Get metadata information about an object.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 oname : str
213 oname : str
214 A string specifying the object name.
214 A string specifying the object name.
215
215
216 Returns
216 Returns
217 -------
217 -------
218 The msg_id of the message sent.
218 The msg_id of the message sent.
219 """
219 """
220 content = dict(oname=oname)
220 content = dict(oname=oname)
221 msg = self.session.msg('object_info_request', content)
221 msg = self.session.msg('object_info_request', content)
222 self._queue_request(msg)
222 self._queue_request(msg)
223 return msg['header']['msg_id']
223 return msg['header']['msg_id']
224
224
225 def _handle_events(self, socket, events):
225 def _handle_events(self, socket, events):
226 if events & POLLERR:
226 if events & POLLERR:
227 self._handle_err()
227 self._handle_err()
228 if events & POLLOUT:
228 if events & POLLOUT:
229 self._handle_send()
229 self._handle_send()
230 if events & POLLIN:
230 if events & POLLIN:
231 self._handle_recv()
231 self._handle_recv()
232
232
233 def _handle_recv(self):
233 def _handle_recv(self):
234 msg = self.socket.recv_json()
234 msg = self.socket.recv_json()
235 self.call_handlers(msg)
235 self.call_handlers(msg)
236
236
237 def _handle_send(self):
237 def _handle_send(self):
238 try:
238 try:
239 msg = self.command_queue.get(False)
239 msg = self.command_queue.get(False)
240 except Empty:
240 except Empty:
241 pass
241 pass
242 else:
242 else:
243 self.socket.send_json(msg)
243 self.socket.send_json(msg)
244 if self.command_queue.empty():
244 if self.command_queue.empty():
245 self.drop_io_state(POLLOUT)
245 self.drop_io_state(POLLOUT)
246
246
247 def _handle_err(self):
247 def _handle_err(self):
248 # We don't want to let this go silently, so eventually we should log.
248 # We don't want to let this go silently, so eventually we should log.
249 raise zmq.ZMQError()
249 raise zmq.ZMQError()
250
250
251 def _queue_request(self, msg):
251 def _queue_request(self, msg):
252 self.command_queue.put(msg)
252 self.command_queue.put(msg)
253 self.add_io_state(POLLOUT)
253 self.add_io_state(POLLOUT)
254
254
255
255
256 class SubSocketChannel(ZmqSocketChannel):
256 class SubSocketChannel(ZmqSocketChannel):
257 """The SUB channel which listens for messages that the kernel publishes.
257 """The SUB channel which listens for messages that the kernel publishes.
258 """
258 """
259
259
260 def __init__(self, context, session, address):
260 def __init__(self, context, session, address):
261 super(SubSocketChannel, self).__init__(context, session, address)
261 super(SubSocketChannel, self).__init__(context, session, address)
262
262
263 def run(self):
263 def run(self):
264 """The thread's main activity. Call start() instead."""
264 """The thread's main activity. Call start() instead."""
265 self.socket = self.context.socket(zmq.SUB)
265 self.socket = self.context.socket(zmq.SUB)
266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 self.socket.connect('tcp://%s:%i' % self.address)
268 self.socket.connect('tcp://%s:%i' % self.address)
269 self.ioloop = ioloop.IOLoop()
269 self.ioloop = ioloop.IOLoop()
270 self.iostate = POLLIN|POLLERR
270 self.iostate = POLLIN|POLLERR
271 self.ioloop.add_handler(self.socket, self._handle_events,
271 self.ioloop.add_handler(self.socket, self._handle_events,
272 self.iostate)
272 self.iostate)
273 self.ioloop.start()
273 self.ioloop.start()
274
274
275 def stop(self):
275 def stop(self):
276 self.ioloop.stop()
276 self.ioloop.stop()
277 super(SubSocketChannel, self).stop()
277 super(SubSocketChannel, self).stop()
278
278
279 def call_handlers(self, msg):
279 def call_handlers(self, msg):
280 """This method is called in the ioloop thread when a message arrives.
280 """This method is called in the ioloop thread when a message arrives.
281
281
282 Subclasses should override this method to handle incoming messages.
282 Subclasses should override this method to handle incoming messages.
283 It is important to remember that this method is called in the thread
283 It is important to remember that this method is called in the thread
284 so that some logic must be done to ensure that the application leve
284 so that some logic must be done to ensure that the application leve
285 handlers are called in the application thread.
285 handlers are called in the application thread.
286 """
286 """
287 raise NotImplementedError('call_handlers must be defined in a subclass.')
287 raise NotImplementedError('call_handlers must be defined in a subclass.')
288
288
289 def flush(self, timeout=1.0):
289 def flush(self, timeout=1.0):
290 """Immediately processes all pending messages on the SUB channel.
290 """Immediately processes all pending messages on the SUB channel.
291
291
292 Callers should use this method to ensure that :method:`call_handlers`
292 Callers should use this method to ensure that :method:`call_handlers`
293 has been called for all messages that have been received on the
293 has been called for all messages that have been received on the
294 0MQ SUB socket of this channel.
294 0MQ SUB socket of this channel.
295
295
296 This method is thread safe.
296 This method is thread safe.
297
297
298 Parameters
298 Parameters
299 ----------
299 ----------
300 timeout : float, optional
300 timeout : float, optional
301 The maximum amount of time to spend flushing, in seconds. The
301 The maximum amount of time to spend flushing, in seconds. The
302 default is one second.
302 default is one second.
303 """
303 """
304 # We do the IOLoop callback process twice to ensure that the IOLoop
304 # We do the IOLoop callback process twice to ensure that the IOLoop
305 # gets to perform at least one full poll.
305 # gets to perform at least one full poll.
306 stop_time = time.time() + timeout
306 stop_time = time.time() + timeout
307 for i in xrange(2):
307 for i in xrange(2):
308 self._flushed = False
308 self._flushed = False
309 self.ioloop.add_callback(self._flush)
309 self.ioloop.add_callback(self._flush)
310 while not self._flushed and time.time() < stop_time:
310 while not self._flushed and time.time() < stop_time:
311 time.sleep(0.01)
311 time.sleep(0.01)
312
312
313 def _handle_events(self, socket, events):
313 def _handle_events(self, socket, events):
314 # Turn on and off POLLOUT depending on if we have made a request
314 # Turn on and off POLLOUT depending on if we have made a request
315 if events & POLLERR:
315 if events & POLLERR:
316 self._handle_err()
316 self._handle_err()
317 if events & POLLIN:
317 if events & POLLIN:
318 self._handle_recv()
318 self._handle_recv()
319
319
320 def _handle_err(self):
320 def _handle_err(self):
321 # We don't want to let this go silently, so eventually we should log.
321 # We don't want to let this go silently, so eventually we should log.
322 raise zmq.ZMQError()
322 raise zmq.ZMQError()
323
323
324 def _handle_recv(self):
324 def _handle_recv(self):
325 # Get all of the messages we can
325 # Get all of the messages we can
326 while True:
326 while True:
327 try:
327 try:
328 msg = self.socket.recv_json(zmq.NOBLOCK)
328 msg = self.socket.recv_json(zmq.NOBLOCK)
329 except zmq.ZMQError:
329 except zmq.ZMQError:
330 # Check the errno?
330 # Check the errno?
331 # Will this trigger POLLERR?
331 # Will this trigger POLLERR?
332 break
332 break
333 else:
333 else:
334 self.call_handlers(msg)
334 self.call_handlers(msg)
335
335
336 def _flush(self):
336 def _flush(self):
337 """Callback for :method:`self.flush`."""
337 """Callback for :method:`self.flush`."""
338 self._flushed = True
338 self._flushed = True
339
339
340
340
341 class RepSocketChannel(ZmqSocketChannel):
341 class RepSocketChannel(ZmqSocketChannel):
342 """A reply channel to handle raw_input requests that the kernel makes."""
342 """A reply channel to handle raw_input requests that the kernel makes."""
343
343
344 msg_queue = None
344 msg_queue = None
345
345
346 def __init__(self, context, session, address):
346 def __init__(self, context, session, address):
347 self.msg_queue = Queue()
347 self.msg_queue = Queue()
348 super(RepSocketChannel, self).__init__(context, session, address)
348 super(RepSocketChannel, self).__init__(context, session, address)
349
349
350 def run(self):
350 def run(self):
351 """The thread's main activity. Call start() instead."""
351 """The thread's main activity. Call start() instead."""
352 self.socket = self.context.socket(zmq.XREQ)
352 self.socket = self.context.socket(zmq.XREQ)
353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
354 self.socket.connect('tcp://%s:%i' % self.address)
354 self.socket.connect('tcp://%s:%i' % self.address)
355 self.ioloop = ioloop.IOLoop()
355 self.ioloop = ioloop.IOLoop()
356 self.iostate = POLLERR|POLLIN
356 self.iostate = POLLERR|POLLIN
357 self.ioloop.add_handler(self.socket, self._handle_events,
357 self.ioloop.add_handler(self.socket, self._handle_events,
358 self.iostate)
358 self.iostate)
359 self.ioloop.start()
359 self.ioloop.start()
360
360
361 def stop(self):
361 def stop(self):
362 self.ioloop.stop()
362 self.ioloop.stop()
363 super(RepSocketChannel, self).stop()
363 super(RepSocketChannel, self).stop()
364
364
365 def call_handlers(self, msg):
365 def call_handlers(self, msg):
366 """This method is called in the ioloop thread when a message arrives.
366 """This method is called in the ioloop thread when a message arrives.
367
367
368 Subclasses should override this method to handle incoming messages.
368 Subclasses should override this method to handle incoming messages.
369 It is important to remember that this method is called in the thread
369 It is important to remember that this method is called in the thread
370 so that some logic must be done to ensure that the application leve
370 so that some logic must be done to ensure that the application leve
371 handlers are called in the application thread.
371 handlers are called in the application thread.
372 """
372 """
373 raise NotImplementedError('call_handlers must be defined in a subclass.')
373 raise NotImplementedError('call_handlers must be defined in a subclass.')
374
374
375 def input(self, string):
375 def input(self, string):
376 """Send a string of raw input to the kernel."""
376 """Send a string of raw input to the kernel."""
377 content = dict(value=string)
377 content = dict(value=string)
378 msg = self.session.msg('input_reply', content)
378 msg = self.session.msg('input_reply', content)
379 self._queue_reply(msg)
379 self._queue_reply(msg)
380
380
381 def _handle_events(self, socket, events):
381 def _handle_events(self, socket, events):
382 if events & POLLERR:
382 if events & POLLERR:
383 self._handle_err()
383 self._handle_err()
384 if events & POLLOUT:
384 if events & POLLOUT:
385 self._handle_send()
385 self._handle_send()
386 if events & POLLIN:
386 if events & POLLIN:
387 self._handle_recv()
387 self._handle_recv()
388
388
389 def _handle_recv(self):
389 def _handle_recv(self):
390 msg = self.socket.recv_json()
390 msg = self.socket.recv_json()
391 self.call_handlers(msg)
391 self.call_handlers(msg)
392
392
393 def _handle_send(self):
393 def _handle_send(self):
394 try:
394 try:
395 msg = self.msg_queue.get(False)
395 msg = self.msg_queue.get(False)
396 except Empty:
396 except Empty:
397 pass
397 pass
398 else:
398 else:
399 self.socket.send_json(msg)
399 self.socket.send_json(msg)
400 if self.msg_queue.empty():
400 if self.msg_queue.empty():
401 self.drop_io_state(POLLOUT)
401 self.drop_io_state(POLLOUT)
402
402
403 def _handle_err(self):
403 def _handle_err(self):
404 # We don't want to let this go silently, so eventually we should log.
404 # We don't want to let this go silently, so eventually we should log.
405 raise zmq.ZMQError()
405 raise zmq.ZMQError()
406
406
407 def _queue_reply(self, msg):
407 def _queue_reply(self, msg):
408 self.msg_queue.put(msg)
408 self.msg_queue.put(msg)
409 self.add_io_state(POLLOUT)
409 self.add_io_state(POLLOUT)
410
410
411
411
412 #-----------------------------------------------------------------------------
412 #-----------------------------------------------------------------------------
413 # Main kernel manager class
413 # Main kernel manager class
414 #-----------------------------------------------------------------------------
414 #-----------------------------------------------------------------------------
415
415
416 class KernelManager(HasTraits):
416 class KernelManager(HasTraits):
417 """ Manages a kernel for a frontend.
417 """ Manages a kernel for a frontend.
418
418
419 The SUB channel is for the frontend to receive messages published by the
419 The SUB channel is for the frontend to receive messages published by the
420 kernel.
420 kernel.
421
421
422 The REQ channel is for the frontend to make requests of the kernel.
422 The REQ channel is for the frontend to make requests of the kernel.
423
423
424 The REP channel is for the kernel to request stdin (raw_input) from the
424 The REP channel is for the kernel to request stdin (raw_input) from the
425 frontend.
425 frontend.
426 """
426 """
427 # The PyZMQ Context to use for communication with the kernel.
427 # The PyZMQ Context to use for communication with the kernel.
428 context = Instance(zmq.Context)
428 context = Instance(zmq.Context,(),{})
429
429
430 # The Session to use for communication with the kernel.
430 # The Session to use for communication with the kernel.
431 session = Instance(Session)
431 session = Instance(Session,(),{})
432
432
433 # The kernel process with which the KernelManager is communicating.
433 # The kernel process with which the KernelManager is communicating.
434 kernel = Instance(Popen)
434 kernel = Instance(Popen)
435
435
436 # The classes to use for the various channels.
436 # The classes to use for the various channels.
437 xreq_channel_class = Type(XReqSocketChannel)
437 xreq_channel_class = Type(XReqSocketChannel)
438 sub_channel_class = Type(SubSocketChannel)
438 sub_channel_class = Type(SubSocketChannel)
439 rep_channel_class = Type(RepSocketChannel)
439 rep_channel_class = Type(RepSocketChannel)
440
440
441 # Protected traits.
441 # Protected traits.
442 _xreq_address = TCPAddress
442 xreq_address = TCPAddress((LOCALHOST, 0))
443 _sub_address = TCPAddress
443 sub_address = TCPAddress((LOCALHOST, 0))
444 _rep_address = TCPAddress
444 rep_address = TCPAddress((LOCALHOST, 0))
445 _xreq_channel = Any
445 _xreq_channel = Any
446 _sub_channel = Any
446 _sub_channel = Any
447 _rep_channel = Any
447 _rep_channel = Any
448
448
449 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
449 def __init__(self, **kwargs):
450 context=None, session=None):
450 super(KernelManager, self).__init__(**kwargs)
451 super(KernelManager, self).__init__()
452 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
453 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
454 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
455 self.context = zmq.Context() if context is None else context
456 self.session = Session() if session is None else session
457 super(KernelManager, self).__init__()
458
451
459 #--------------------------------- -----------------------------------------
452 #--------------------------------- -----------------------------------------
460 # Channel management methods:
453 # Channel management methods:
461 #--------------------------------------------------------------------------
454 #--------------------------------------------------------------------------
462
455
463 def start_channels(self):
456 def start_channels(self):
464 """Starts the channels for this kernel.
457 """Starts the channels for this kernel.
465
458
466 This will create the channels if they do not exist and then start
459 This will create the channels if they do not exist and then start
467 them. If port numbers of 0 are being used (random ports) then you
460 them. If port numbers of 0 are being used (random ports) then you
468 must first call :method:`start_kernel`. If the channels have been
461 must first call :method:`start_kernel`. If the channels have been
469 stopped and you call this, :class:`RuntimeError` will be raised.
462 stopped and you call this, :class:`RuntimeError` will be raised.
470 """
463 """
471 self.xreq_channel.start()
464 self.xreq_channel.start()
472 self.sub_channel.start()
465 self.sub_channel.start()
473 self.rep_channel.start()
466 self.rep_channel.start()
474
467
475 def stop_channels(self):
468 def stop_channels(self):
476 """Stops the channels for this kernel.
469 """Stops the channels for this kernel.
477
470
478 This stops the channels by joining their threads. If the channels
471 This stops the channels by joining their threads. If the channels
479 were not started, :class:`RuntimeError` will be raised.
472 were not started, :class:`RuntimeError` will be raised.
480 """
473 """
481 self.xreq_channel.stop()
474 self.xreq_channel.stop()
482 self.sub_channel.stop()
475 self.sub_channel.stop()
483 self.rep_channel.stop()
476 self.rep_channel.stop()
484
477
485 @property
478 @property
486 def channels_running(self):
479 def channels_running(self):
487 """Are all of the channels created and running?"""
480 """Are all of the channels created and running?"""
488 return self.xreq_channel.is_alive() \
481 return self.xreq_channel.is_alive() \
489 and self.sub_channel.is_alive() \
482 and self.sub_channel.is_alive() \
490 and self.rep_channel.is_alive()
483 and self.rep_channel.is_alive()
491
484
492 #--------------------------------------------------------------------------
485 #--------------------------------------------------------------------------
493 # Kernel process management methods:
486 # Kernel process management methods:
494 #--------------------------------------------------------------------------
487 #--------------------------------------------------------------------------
495
488
496 def start_kernel(self):
489 def start_kernel(self):
497 """Starts a kernel process and configures the manager to use it.
490 """Starts a kernel process and configures the manager to use it.
498
491
499 If random ports (port=0) are being used, this method must be called
492 If random ports (port=0) are being used, this method must be called
500 before the channels are created.
493 before the channels are created.
501 """
494 """
502 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
495 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
503 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
496 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
504 raise RuntimeError("Can only launch a kernel on localhost."
497 raise RuntimeError("Can only launch a kernel on localhost."
505 "Make sure that the '*_address' attributes are "
498 "Make sure that the '*_address' attributes are "
506 "configured properly.")
499 "configured properly.")
507
500
508 self.kernel, xrep, pub, req = launch_kernel(
501 self.kernel, xrep, pub, req = launch_kernel(
509 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
502 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
510 self._xreq_address = (LOCALHOST, xrep)
503 self.xreq_address = (LOCALHOST, xrep)
511 self._sub_address = (LOCALHOST, pub)
504 self.sub_address = (LOCALHOST, pub)
512 self._rep_address = (LOCALHOST, req)
505 self.rep_address = (LOCALHOST, req)
513
506
514 @property
507 @property
515 def has_kernel(self):
508 def has_kernel(self):
516 """Returns whether a kernel process has been specified for the kernel
509 """Returns whether a kernel process has been specified for the kernel
517 manager.
510 manager.
518 """
511 """
519 return self.kernel is not None
512 return self.kernel is not None
520
513
521 def kill_kernel(self):
514 def kill_kernel(self):
522 """ Kill the running kernel. """
515 """ Kill the running kernel. """
523 if self.kernel is not None:
516 if self.kernel is not None:
524 self.kernel.kill()
517 self.kernel.kill()
525 self.kernel = None
518 self.kernel = None
526 else:
519 else:
527 raise RuntimeError("Cannot kill kernel. No kernel is running!")
520 raise RuntimeError("Cannot kill kernel. No kernel is running!")
528
521
529 def signal_kernel(self, signum):
522 def signal_kernel(self, signum):
530 """ Sends a signal to the kernel. """
523 """ Sends a signal to the kernel. """
531 if self.kernel is not None:
524 if self.kernel is not None:
532 self.kernel.send_signal(signum)
525 self.kernel.send_signal(signum)
533 else:
526 else:
534 raise RuntimeError("Cannot signal kernel. No kernel is running!")
527 raise RuntimeError("Cannot signal kernel. No kernel is running!")
535
528
536 @property
529 @property
537 def is_alive(self):
530 def is_alive(self):
538 """Is the kernel process still running?"""
531 """Is the kernel process still running?"""
539 if self.kernel is not None:
532 if self.kernel is not None:
540 if self.kernel.poll() is None:
533 if self.kernel.poll() is None:
541 return True
534 return True
542 else:
535 else:
543 return False
536 return False
544 else:
537 else:
545 # We didn't start the kernel with this KernelManager so we don't
538 # We didn't start the kernel with this KernelManager so we don't
546 # know if it is running. We should use a heartbeat for this case.
539 # know if it is running. We should use a heartbeat for this case.
547 return True
540 return True
548
541
549 #--------------------------------------------------------------------------
542 #--------------------------------------------------------------------------
550 # Channels used for communication with the kernel:
543 # Channels used for communication with the kernel:
551 #--------------------------------------------------------------------------
544 #--------------------------------------------------------------------------
552
545
553 @property
546 @property
554 def xreq_channel(self):
547 def xreq_channel(self):
555 """Get the REQ socket channel object to make requests of the kernel."""
548 """Get the REQ socket channel object to make requests of the kernel."""
556 if self._xreq_channel is None:
549 if self._xreq_channel is None:
557 self._xreq_channel = self.xreq_channel_class(self.context,
550 self._xreq_channel = self.xreq_channel_class(self.context,
558 self.session,
551 self.session,
559 self.xreq_address)
552 self.xreq_address)
560 return self._xreq_channel
553 return self._xreq_channel
561
554
562 @property
555 @property
563 def sub_channel(self):
556 def sub_channel(self):
564 """Get the SUB socket channel object."""
557 """Get the SUB socket channel object."""
565 if self._sub_channel is None:
558 if self._sub_channel is None:
566 self._sub_channel = self.sub_channel_class(self.context,
559 self._sub_channel = self.sub_channel_class(self.context,
567 self.session,
560 self.session,
568 self.sub_address)
561 self.sub_address)
569 return self._sub_channel
562 return self._sub_channel
570
563
571 @property
564 @property
572 def rep_channel(self):
565 def rep_channel(self):
573 """Get the REP socket channel object to handle stdin (raw_input)."""
566 """Get the REP socket channel object to handle stdin (raw_input)."""
574 if self._rep_channel is None:
567 if self._rep_channel is None:
575 self._rep_channel = self.rep_channel_class(self.context,
568 self._rep_channel = self.rep_channel_class(self.context,
576 self.session,
569 self.session,
577 self.rep_address)
570 self.rep_address)
578 return self._rep_channel
571 return self._rep_channel
579
580 @property
581 def xreq_address(self):
582 return self._xreq_address
583
584 @property
585 def sub_address(self):
586 return self._sub_address
587
588 @property
589 def rep_address(self):
590 return self._rep_address
591
General Comments 0
You need to be logged in to leave comments. Login now