##// END OF EJS Templates
Fixed 'start_kernel' after breakage from last checkin.
epatters -
Show More
@@ -1,537 +1,535 b''
1 """Classes to manage the interaction with a running kernel.
1 """Classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
33 from kernel import launch_kernel
33 from kernel import launch_kernel
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
50 """The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
51 """
51 """
52 context = None
52 context = None
53 session = None
53 session = None
54 socket = None
54 socket = None
55 ioloop = None
55 ioloop = None
56 iostate = None
56 iostate = None
57 _address = None
57 _address = None
58
58
59 def __init__(self, context, session, address):
59 def __init__(self, context, session, address):
60 """Create a channel
60 """Create a channel
61
61
62 Parameters
62 Parameters
63 ----------
63 ----------
64 context : zmq.Context
64 context : zmq.Context
65 The ZMQ context to use.
65 The ZMQ context to use.
66 session : session.Session
66 session : session.Session
67 The session to use.
67 The session to use.
68 address : tuple
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
70 """
71 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
72 self.daemon = True
72 self.daemon = True
73
73
74 self.context = context
74 self.context = context
75 self.session = session
75 self.session = session
76 if address[1] == 0:
76 if address[1] == 0:
77 message = 'The port number for a channel cannot be 0.'
77 message = 'The port number for a channel cannot be 0.'
78 raise InvalidPortNumber(message)
78 raise InvalidPortNumber(message)
79 self._address = address
79 self._address = address
80
80
81 def stop(self):
81 def stop(self):
82 """Stop the channel's activity.
82 """Stop the channel's activity.
83
83
84 This calls :method:`Thread.join` and returns when the thread
84 This calls :method:`Thread.join` and returns when the thread
85 terminates. :class:`RuntimeError` will be raised if
85 terminates. :class:`RuntimeError` will be raised if
86 :method:`self.start` is called again.
86 :method:`self.start` is called again.
87 """
87 """
88 self.join()
88 self.join()
89
89
90 @property
90 @property
91 def address(self):
91 def address(self):
92 """Get the channel's address as an (ip, port) tuple.
92 """Get the channel's address as an (ip, port) tuple.
93
93
94 By the default, the address is (localhost, 0), where 0 means a random
94 By the default, the address is (localhost, 0), where 0 means a random
95 port.
95 port.
96 """
96 """
97 return self._address
97 return self._address
98
98
99 def add_io_state(self, state):
99 def add_io_state(self, state):
100 """Add IO state to the eventloop.
100 """Add IO state to the eventloop.
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 The IO state flag to set.
105 The IO state flag to set.
106
106
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 """
108 """
109 def add_io_state_callback():
109 def add_io_state_callback():
110 if not self.iostate & state:
110 if not self.iostate & state:
111 self.iostate = self.iostate | state
111 self.iostate = self.iostate | state
112 self.ioloop.update_handler(self.socket, self.iostate)
112 self.ioloop.update_handler(self.socket, self.iostate)
113 self.ioloop.add_callback(add_io_state_callback)
113 self.ioloop.add_callback(add_io_state_callback)
114
114
115 def drop_io_state(self, state):
115 def drop_io_state(self, state):
116 """Drop IO state from the eventloop.
116 """Drop IO state from the eventloop.
117
117
118 Parameters
118 Parameters
119 ----------
119 ----------
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 The IO state flag to set.
121 The IO state flag to set.
122
122
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 """
124 """
125 def drop_io_state_callback():
125 def drop_io_state_callback():
126 if self.iostate & state:
126 if self.iostate & state:
127 self.iostate = self.iostate & (~state)
127 self.iostate = self.iostate & (~state)
128 self.ioloop.update_handler(self.socket, self.iostate)
128 self.ioloop.update_handler(self.socket, self.iostate)
129 self.ioloop.add_callback(drop_io_state_callback)
129 self.ioloop.add_callback(drop_io_state_callback)
130
130
131
131
132 class XReqSocketChannel(ZmqSocketChannel):
132 class XReqSocketChannel(ZmqSocketChannel):
133 """The XREQ channel for issues request/replies to the kernel.
133 """The XREQ channel for issues request/replies to the kernel.
134 """
134 """
135
135
136 command_queue = None
136 command_queue = None
137
137
138 def __init__(self, context, session, address):
138 def __init__(self, context, session, address):
139 self.command_queue = Queue()
139 self.command_queue = Queue()
140 super(XReqSocketChannel, self).__init__(context, session, address)
140 super(XReqSocketChannel, self).__init__(context, session, address)
141
141
142 def run(self):
142 def run(self):
143 """The thread's main activity. Call start() instead."""
143 """The thread's main activity. Call start() instead."""
144 self.socket = self.context.socket(zmq.XREQ)
144 self.socket = self.context.socket(zmq.XREQ)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 self.socket.connect('tcp://%s:%i' % self.address)
146 self.socket.connect('tcp://%s:%i' % self.address)
147 self.ioloop = ioloop.IOLoop()
147 self.ioloop = ioloop.IOLoop()
148 self.iostate = POLLERR|POLLIN
148 self.iostate = POLLERR|POLLIN
149 self.ioloop.add_handler(self.socket, self._handle_events,
149 self.ioloop.add_handler(self.socket, self._handle_events,
150 self.iostate)
150 self.iostate)
151 self.ioloop.start()
151 self.ioloop.start()
152
152
153 def stop(self):
153 def stop(self):
154 self.ioloop.stop()
154 self.ioloop.stop()
155 super(XReqSocketChannel, self).stop()
155 super(XReqSocketChannel, self).stop()
156
156
157 def call_handlers(self, msg):
157 def call_handlers(self, msg):
158 """This method is called in the ioloop thread when a message arrives.
158 """This method is called in the ioloop thread when a message arrives.
159
159
160 Subclasses should override this method to handle incoming messages.
160 Subclasses should override this method to handle incoming messages.
161 It is important to remember that this method is called in the thread
161 It is important to remember that this method is called in the thread
162 so that some logic must be done to ensure that the application leve
162 so that some logic must be done to ensure that the application leve
163 handlers are called in the application thread.
163 handlers are called in the application thread.
164 """
164 """
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166
166
167 def execute(self, code):
167 def execute(self, code):
168 """Execute code in the kernel.
168 """Execute code in the kernel.
169
169
170 Parameters
170 Parameters
171 ----------
171 ----------
172 code : str
172 code : str
173 A string of Python code.
173 A string of Python code.
174
174
175 Returns
175 Returns
176 -------
176 -------
177 The msg_id of the message sent.
177 The msg_id of the message sent.
178 """
178 """
179 # Create class for content/msg creation. Related to, but possibly
179 # Create class for content/msg creation. Related to, but possibly
180 # not in Session.
180 # not in Session.
181 content = dict(code=code)
181 content = dict(code=code)
182 msg = self.session.msg('execute_request', content)
182 msg = self.session.msg('execute_request', content)
183 self._queue_request(msg)
183 self._queue_request(msg)
184 return msg['header']['msg_id']
184 return msg['header']['msg_id']
185
185
186 def complete(self, text, line, block=None):
186 def complete(self, text, line, block=None):
187 """Tab complete text, line, block in the kernel's namespace.
187 """Tab complete text, line, block in the kernel's namespace.
188
188
189 Parameters
189 Parameters
190 ----------
190 ----------
191 text : str
191 text : str
192 The text to complete.
192 The text to complete.
193 line : str
193 line : str
194 The full line of text that is the surrounding context for the
194 The full line of text that is the surrounding context for the
195 text to complete.
195 text to complete.
196 block : str
196 block : str
197 The full block of code in which the completion is being requested.
197 The full block of code in which the completion is being requested.
198
198
199 Returns
199 Returns
200 -------
200 -------
201 The msg_id of the message sent.
201 The msg_id of the message sent.
202
202
203 """
203 """
204 content = dict(text=text, line=line)
204 content = dict(text=text, line=line)
205 msg = self.session.msg('complete_request', content)
205 msg = self.session.msg('complete_request', content)
206 self._queue_request(msg)
206 self._queue_request(msg)
207 return msg['header']['msg_id']
207 return msg['header']['msg_id']
208
208
209 def object_info(self, oname):
209 def object_info(self, oname):
210 """Get metadata information about an object.
210 """Get metadata information about an object.
211
211
212 Parameters
212 Parameters
213 ----------
213 ----------
214 oname : str
214 oname : str
215 A string specifying the object name.
215 A string specifying the object name.
216
216
217 Returns
217 Returns
218 -------
218 -------
219 The msg_id of the message sent.
219 The msg_id of the message sent.
220 """
220 """
221 print oname
221 print oname
222 content = dict(oname=oname)
222 content = dict(oname=oname)
223 msg = self.session.msg('object_info_request', content)
223 msg = self.session.msg('object_info_request', content)
224 self._queue_request(msg)
224 self._queue_request(msg)
225 return msg['header']['msg_id']
225 return msg['header']['msg_id']
226
226
227 def _handle_events(self, socket, events):
227 def _handle_events(self, socket, events):
228 if events & POLLERR:
228 if events & POLLERR:
229 self._handle_err()
229 self._handle_err()
230 if events & POLLOUT:
230 if events & POLLOUT:
231 self._handle_send()
231 self._handle_send()
232 if events & POLLIN:
232 if events & POLLIN:
233 self._handle_recv()
233 self._handle_recv()
234
234
235 def _handle_recv(self):
235 def _handle_recv(self):
236 msg = self.socket.recv_json()
236 msg = self.socket.recv_json()
237 self.call_handlers(msg)
237 self.call_handlers(msg)
238
238
239 def _handle_send(self):
239 def _handle_send(self):
240 try:
240 try:
241 msg = self.command_queue.get(False)
241 msg = self.command_queue.get(False)
242 except Empty:
242 except Empty:
243 pass
243 pass
244 else:
244 else:
245 self.socket.send_json(msg)
245 self.socket.send_json(msg)
246 if self.command_queue.empty():
246 if self.command_queue.empty():
247 self.drop_io_state(POLLOUT)
247 self.drop_io_state(POLLOUT)
248
248
249 def _handle_err(self):
249 def _handle_err(self):
250 # We don't want to let this go silently, so eventually we should log.
250 # We don't want to let this go silently, so eventually we should log.
251 raise zmq.ZMQError()
251 raise zmq.ZMQError()
252
252
253 def _queue_request(self, msg):
253 def _queue_request(self, msg):
254 self.command_queue.put(msg)
254 self.command_queue.put(msg)
255 self.add_io_state(POLLOUT)
255 self.add_io_state(POLLOUT)
256
256
257
257
258 class SubSocketChannel(ZmqSocketChannel):
258 class SubSocketChannel(ZmqSocketChannel):
259 """The SUB channel which listens for messages that the kernel publishes.
259 """The SUB channel which listens for messages that the kernel publishes.
260 """
260 """
261
261
262 def __init__(self, context, session, address):
262 def __init__(self, context, session, address):
263 super(SubSocketChannel, self).__init__(context, session, address)
263 super(SubSocketChannel, self).__init__(context, session, address)
264
264
265 def run(self):
265 def run(self):
266 """The thread's main activity. Call start() instead."""
266 """The thread's main activity. Call start() instead."""
267 self.socket = self.context.socket(zmq.SUB)
267 self.socket = self.context.socket(zmq.SUB)
268 self.socket.setsockopt(zmq.SUBSCRIBE,'')
268 self.socket.setsockopt(zmq.SUBSCRIBE,'')
269 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
269 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
270 self.socket.connect('tcp://%s:%i' % self.address)
270 self.socket.connect('tcp://%s:%i' % self.address)
271 self.ioloop = ioloop.IOLoop()
271 self.ioloop = ioloop.IOLoop()
272 self.iostate = POLLIN|POLLERR
272 self.iostate = POLLIN|POLLERR
273 self.ioloop.add_handler(self.socket, self._handle_events,
273 self.ioloop.add_handler(self.socket, self._handle_events,
274 self.iostate)
274 self.iostate)
275 self.ioloop.start()
275 self.ioloop.start()
276
276
277 def stop(self):
277 def stop(self):
278 self.ioloop.stop()
278 self.ioloop.stop()
279 super(SubSocketChannel, self).stop()
279 super(SubSocketChannel, self).stop()
280
280
281 def call_handlers(self, msg):
281 def call_handlers(self, msg):
282 """This method is called in the ioloop thread when a message arrives.
282 """This method is called in the ioloop thread when a message arrives.
283
283
284 Subclasses should override this method to handle incoming messages.
284 Subclasses should override this method to handle incoming messages.
285 It is important to remember that this method is called in the thread
285 It is important to remember that this method is called in the thread
286 so that some logic must be done to ensure that the application leve
286 so that some logic must be done to ensure that the application leve
287 handlers are called in the application thread.
287 handlers are called in the application thread.
288 """
288 """
289 raise NotImplementedError('call_handlers must be defined in a subclass.')
289 raise NotImplementedError('call_handlers must be defined in a subclass.')
290
290
291 def flush(self, timeout=1.0):
291 def flush(self, timeout=1.0):
292 """Immediately processes all pending messages on the SUB channel.
292 """Immediately processes all pending messages on the SUB channel.
293
293
294 This method is thread safe.
294 This method is thread safe.
295
295
296 Parameters
296 Parameters
297 ----------
297 ----------
298 timeout : float, optional
298 timeout : float, optional
299 The maximum amount of time to spend flushing, in seconds. The
299 The maximum amount of time to spend flushing, in seconds. The
300 default is one second.
300 default is one second.
301 """
301 """
302 # We do the IOLoop callback process twice to ensure that the IOLoop
302 # We do the IOLoop callback process twice to ensure that the IOLoop
303 # gets to perform at least one full poll.
303 # gets to perform at least one full poll.
304 stop_time = time.time() + timeout
304 stop_time = time.time() + timeout
305 for i in xrange(2):
305 for i in xrange(2):
306 self._flushed = False
306 self._flushed = False
307 self.ioloop.add_callback(self._flush)
307 self.ioloop.add_callback(self._flush)
308 while not self._flushed and time.time() < stop_time:
308 while not self._flushed and time.time() < stop_time:
309 time.sleep(0.01)
309 time.sleep(0.01)
310
310
311 def _handle_events(self, socket, events):
311 def _handle_events(self, socket, events):
312 # Turn on and off POLLOUT depending on if we have made a request
312 # Turn on and off POLLOUT depending on if we have made a request
313 if events & POLLERR:
313 if events & POLLERR:
314 self._handle_err()
314 self._handle_err()
315 if events & POLLIN:
315 if events & POLLIN:
316 self._handle_recv()
316 self._handle_recv()
317
317
318 def _handle_err(self):
318 def _handle_err(self):
319 # We don't want to let this go silently, so eventually we should log.
319 # We don't want to let this go silently, so eventually we should log.
320 raise zmq.ZMQError()
320 raise zmq.ZMQError()
321
321
322 def _handle_recv(self):
322 def _handle_recv(self):
323 # Get all of the messages we can
323 # Get all of the messages we can
324 while True:
324 while True:
325 try:
325 try:
326 msg = self.socket.recv_json(zmq.NOBLOCK)
326 msg = self.socket.recv_json(zmq.NOBLOCK)
327 except zmq.ZMQError:
327 except zmq.ZMQError:
328 # Check the errno?
328 # Check the errno?
329 # Will this tigger POLLERR?
329 # Will this tigger POLLERR?
330 break
330 break
331 else:
331 else:
332 self.call_handlers(msg)
332 self.call_handlers(msg)
333
333
334 def _flush(self):
334 def _flush(self):
335 """Callback for :method:`self.flush`."""
335 """Callback for :method:`self.flush`."""
336 self._flushed = True
336 self._flushed = True
337
337
338
338
339 class RepSocketChannel(ZmqSocketChannel):
339 class RepSocketChannel(ZmqSocketChannel):
340 """A reply channel to handle raw_input requests that the kernel makes."""
340 """A reply channel to handle raw_input requests that the kernel makes."""
341
341
342 def run(self):
342 def run(self):
343 """The thread's main activity. Call start() instead."""
343 """The thread's main activity. Call start() instead."""
344 self.ioloop = ioloop.IOLoop()
344 self.ioloop = ioloop.IOLoop()
345 self.ioloop.start()
345 self.ioloop.start()
346
346
347 def stop(self):
347 def stop(self):
348 self.ioloop.stop()
348 self.ioloop.stop()
349 super(RepSocketChannel, self).stop()
349 super(RepSocketChannel, self).stop()
350
350
351 def on_raw_input(self):
351 def on_raw_input(self):
352 pass
352 pass
353
353
354
354
355 #-----------------------------------------------------------------------------
355 #-----------------------------------------------------------------------------
356 # Main kernel manager class
356 # Main kernel manager class
357 #-----------------------------------------------------------------------------
357 #-----------------------------------------------------------------------------
358
358
359
359
360 class KernelManager(HasTraits):
360 class KernelManager(HasTraits):
361 """ Manages a kernel for a frontend.
361 """ Manages a kernel for a frontend.
362
362
363 The SUB channel is for the frontend to receive messages published by the
363 The SUB channel is for the frontend to receive messages published by the
364 kernel.
364 kernel.
365
365
366 The REQ channel is for the frontend to make requests of the kernel.
366 The REQ channel is for the frontend to make requests of the kernel.
367
367
368 The REP channel is for the kernel to request stdin (raw_input) from the
368 The REP channel is for the kernel to request stdin (raw_input) from the
369 frontend.
369 frontend.
370 """
370 """
371 # The PyZMQ Context to use for communication with the kernel.
371 # The PyZMQ Context to use for communication with the kernel.
372 context = Instance(zmq.Context)
372 context = Instance(zmq.Context)
373
373
374 # The Session to use for communication with the kernel.
374 # The Session to use for communication with the kernel.
375 session = Instance(Session)
375 session = Instance(Session)
376
376
377 # The classes to use for the various channels.
377 # The classes to use for the various channels.
378 xreq_channel_class = Type(XReqSocketChannel)
378 xreq_channel_class = Type(XReqSocketChannel)
379 sub_channel_class = Type(SubSocketChannel)
379 sub_channel_class = Type(SubSocketChannel)
380 rep_channel_class = Type(RepSocketChannel)
380 rep_channel_class = Type(RepSocketChannel)
381
381
382 # Protected traits.
382 # Protected traits.
383 _kernel = Instance(Popen)
383 _kernel = Instance(Popen)
384 _xreq_address = Any
384 _xreq_address = Any
385 _sub_address = Any
385 _sub_address = Any
386 _rep_address = Any
386 _rep_address = Any
387 _xreq_channel = Any
387 _xreq_channel = Any
388 _sub_channel = Any
388 _sub_channel = Any
389 _rep_channel = Any
389 _rep_channel = Any
390
390
391 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
391 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
392 context=None, session=None):
392 context=None, session=None):
393 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
393 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
394 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
394 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
395 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
395 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
396 self.context = zmq.Context() if context is None else context
396 self.context = zmq.Context() if context is None else context
397 self.session = Session() if session is None else session
397 self.session = Session() if session is None else session
398
398
399 #--------------------------------------------------------------------------
399 #--------------------------------------------------------------------------
400 # Channel management methods:
400 # Channel management methods:
401 #--------------------------------------------------------------------------
401 #--------------------------------------------------------------------------
402
402
403 def start_channels(self):
403 def start_channels(self):
404 """Starts the channels for this kernel.
404 """Starts the channels for this kernel.
405
405
406 This will create the channels if they do not exist and then start
406 This will create the channels if they do not exist and then start
407 them. If port numbers of 0 are being used (random ports) then you
407 them. If port numbers of 0 are being used (random ports) then you
408 must first call :method:`start_kernel`. If the channels have been
408 must first call :method:`start_kernel`. If the channels have been
409 stopped and you call this, :class:`RuntimeError` will be raised.
409 stopped and you call this, :class:`RuntimeError` will be raised.
410 """
410 """
411 self.xreq_channel.start()
411 self.xreq_channel.start()
412 self.sub_channel.start()
412 self.sub_channel.start()
413 self.rep_channel.start()
413 self.rep_channel.start()
414
414
415 def stop_channels(self):
415 def stop_channels(self):
416 """Stops the channels for this kernel.
416 """Stops the channels for this kernel.
417
417
418 This stops the channels by joining their threads. If the channels
418 This stops the channels by joining their threads. If the channels
419 were not started, :class:`RuntimeError` will be raised.
419 were not started, :class:`RuntimeError` will be raised.
420 """
420 """
421 self.xreq_channel.stop()
421 self.xreq_channel.stop()
422 self.sub_channel.stop()
422 self.sub_channel.stop()
423 self.rep_channel.stop()
423 self.rep_channel.stop()
424
424
425 @property
425 @property
426 def channels_running(self):
426 def channels_running(self):
427 """Are all of the channels created and running?"""
427 """Are all of the channels created and running?"""
428 return self.xreq_channel.is_alive() \
428 return self.xreq_channel.is_alive() \
429 and self.sub_channel.is_alive() \
429 and self.sub_channel.is_alive() \
430 and self.rep_channel.is_alive()
430 and self.rep_channel.is_alive()
431
431
432 #--------------------------------------------------------------------------
432 #--------------------------------------------------------------------------
433 # Kernel process management methods:
433 # Kernel process management methods:
434 #--------------------------------------------------------------------------
434 #--------------------------------------------------------------------------
435
435
436 def start_kernel(self):
436 def start_kernel(self):
437 """Starts a kernel process and configures the manager to use it.
437 """Starts a kernel process and configures the manager to use it.
438
438
439 If random ports (port=0) are being used, this method must be called
439 If random ports (port=0) are being used, this method must be called
440 before the channels are created.
440 before the channels are created.
441 """
441 """
442 xreq, sub = self.xreq_address, self.sub_address
442 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
443 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
443 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
444 raise RuntimeError("Can only launch a kernel on localhost."
444 raise RuntimeError("Can only launch a kernel on localhost."
445 "Make sure that the '*_address' attributes are "
445 "Make sure that the '*_address' attributes are "
446 "configured properly.")
446 "configured properly.")
447
447
448 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
448 kernel, xrep, pub, req = launch_kernel(
449 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
449 self._kernel = kernel
450 self._kernel = kernel
450 self._xreq_address = (LOCALHOST, xrep)
451 self._xreq_address = (LOCALHOST, xrep)
451 self._sub_address = (LOCALHOST, pub)
452 self._sub_address = (LOCALHOST, pub)
452 # The rep channel is not fully working yet, but its base class makes
453 self._rep_address = (LOCALHOST, req)
453 # sure the port is not 0. We set to -1 for now until the rep channel
454 # is fully working.
455 self._rep_address = (LOCALHOST, -1)
456
454
457 @property
455 @property
458 def has_kernel(self):
456 def has_kernel(self):
459 """Returns whether a kernel process has been specified for the kernel
457 """Returns whether a kernel process has been specified for the kernel
460 manager.
458 manager.
461
459
462 A kernel process can be set via 'start_kernel' or 'set_kernel'.
460 A kernel process can be set via 'start_kernel' or 'set_kernel'.
463 """
461 """
464 return self._kernel is not None
462 return self._kernel is not None
465
463
466 def kill_kernel(self):
464 def kill_kernel(self):
467 """ Kill the running kernel. """
465 """ Kill the running kernel. """
468 if self._kernel is not None:
466 if self._kernel is not None:
469 self._kernel.kill()
467 self._kernel.kill()
470 self._kernel = None
468 self._kernel = None
471 else:
469 else:
472 raise RuntimeError("Cannot kill kernel. No kernel is running!")
470 raise RuntimeError("Cannot kill kernel. No kernel is running!")
473
471
474 def signal_kernel(self, signum):
472 def signal_kernel(self, signum):
475 """ Sends a signal to the kernel. """
473 """ Sends a signal to the kernel. """
476 if self._kernel is not None:
474 if self._kernel is not None:
477 self._kernel.send_signal(signum)
475 self._kernel.send_signal(signum)
478 else:
476 else:
479 raise RuntimeError("Cannot signal kernel. No kernel is running!")
477 raise RuntimeError("Cannot signal kernel. No kernel is running!")
480
478
481 @property
479 @property
482 def is_alive(self):
480 def is_alive(self):
483 """Is the kernel process still running?"""
481 """Is the kernel process still running?"""
484 if self._kernel is not None:
482 if self._kernel is not None:
485 if self._kernel.poll() is None:
483 if self._kernel.poll() is None:
486 return True
484 return True
487 else:
485 else:
488 return False
486 return False
489 else:
487 else:
490 # We didn't start the kernel with this KernelManager so we don't
488 # We didn't start the kernel with this KernelManager so we don't
491 # know if it is running. We should use a heartbeat for this case.
489 # know if it is running. We should use a heartbeat for this case.
492 return True
490 return True
493
491
494 #--------------------------------------------------------------------------
492 #--------------------------------------------------------------------------
495 # Channels used for communication with the kernel:
493 # Channels used for communication with the kernel:
496 #--------------------------------------------------------------------------
494 #--------------------------------------------------------------------------
497
495
498 @property
496 @property
499 def xreq_channel(self):
497 def xreq_channel(self):
500 """Get the REQ socket channel object to make requests of the kernel."""
498 """Get the REQ socket channel object to make requests of the kernel."""
501 if self._xreq_channel is None:
499 if self._xreq_channel is None:
502 self._xreq_channel = self.xreq_channel_class(self.context,
500 self._xreq_channel = self.xreq_channel_class(self.context,
503 self.session,
501 self.session,
504 self.xreq_address)
502 self.xreq_address)
505 return self._xreq_channel
503 return self._xreq_channel
506
504
507 @property
505 @property
508 def sub_channel(self):
506 def sub_channel(self):
509 """Get the SUB socket channel object."""
507 """Get the SUB socket channel object."""
510 if self._sub_channel is None:
508 if self._sub_channel is None:
511 self._sub_channel = self.sub_channel_class(self.context,
509 self._sub_channel = self.sub_channel_class(self.context,
512 self.session,
510 self.session,
513 self.sub_address)
511 self.sub_address)
514 return self._sub_channel
512 return self._sub_channel
515
513
516 @property
514 @property
517 def rep_channel(self):
515 def rep_channel(self):
518 """Get the REP socket channel object to handle stdin (raw_input)."""
516 """Get the REP socket channel object to handle stdin (raw_input)."""
519 if self._rep_channel is None:
517 if self._rep_channel is None:
520 self._rep_channel = self.rep_channel_class(self.context,
518 self._rep_channel = self.rep_channel_class(self.context,
521 self.session,
519 self.session,
522 self.rep_address)
520 self.rep_address)
523 return self._rep_channel
521 return self._rep_channel
524
522
525 @property
523 @property
526 def xreq_address(self):
524 def xreq_address(self):
527 return self._xreq_address
525 return self._xreq_address
528
526
529 @property
527 @property
530 def sub_address(self):
528 def sub_address(self):
531 return self._sub_address
529 return self._sub_address
532
530
533 @property
531 @property
534 def rep_address(self):
532 def rep_address(self):
535 return self._rep_address
533 return self._rep_address
536
534
537
535
General Comments 0
You need to be logged in to leave comments. Login now