##// END OF EJS Templates
Added pausing to the heartbeat channel.
Brian Granger -
Show More
@@ -1,851 +1,867
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 import sys
23 import sys
24 from threading import Thread
24 from threading import Thread
25 import time
25 import time
26
26
27 # System library imports.
27 # System library imports.
28 import zmq
28 import zmq
29 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 # Local imports.
32 # Local imports.
33 from IPython.utils import io
33 from IPython.utils import io
34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 from session import Session
35 from session import Session
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Constants and exceptions
38 # Constants and exceptions
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 LOCALHOST = '127.0.0.1'
41 LOCALHOST = '127.0.0.1'
42
42
43 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
44 pass
44 pass
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Utility functions
47 # Utility functions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
51 # if they prove to have more generic utility
51 # if they prove to have more generic utility
52
52
53 def validate_string_list(lst):
53 def validate_string_list(lst):
54 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
55
55
56 Raises ValueError if not."""
56 Raises ValueError if not."""
57 if not isinstance(lst, list):
57 if not isinstance(lst, list):
58 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
59 for x in lst:
59 for x in lst:
60 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
61 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
62
62
63
63
64 def validate_string_dict(dct):
64 def validate_string_dict(dct):
65 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
66
66
67 Raises ValueError if not."""
67 Raises ValueError if not."""
68 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
69 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
70 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
71 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
72 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
73
73
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
80 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
81 """
81 """
82 context = None
82 context = None
83 session = None
83 session = None
84 socket = None
84 socket = None
85 ioloop = None
85 ioloop = None
86 iostate = None
86 iostate = None
87 _address = None
87 _address = None
88
88
89 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
90 """Create a channel
90 """Create a channel
91
91
92 Parameters
92 Parameters
93 ----------
93 ----------
94 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
95 The ZMQ context to use.
95 The ZMQ context to use.
96 session : :class:`session.Session`
96 session : :class:`session.Session`
97 The session to use.
97 The session to use.
98 address : tuple
98 address : tuple
99 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
100 """
100 """
101 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
102 self.daemon = True
102 self.daemon = True
103
103
104 self.context = context
104 self.context = context
105 self.session = session
105 self.session = session
106 if address[1] == 0:
106 if address[1] == 0:
107 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
108 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
109 self._address = address
109 self._address = address
110
110
111 def stop(self):
111 def stop(self):
112 """Stop the channel's activity.
112 """Stop the channel's activity.
113
113
114 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
115 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
116 :method:`self.start` is called again.
116 :method:`self.start` is called again.
117 """
117 """
118 self.join()
118 self.join()
119
119
120 @property
120 @property
121 def address(self):
121 def address(self):
122 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
123
123
124 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
125 port.
125 port.
126 """
126 """
127 return self._address
127 return self._address
128
128
129 def add_io_state(self, state):
129 def add_io_state(self, state):
130 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
131
131
132 Parameters
132 Parameters
133 ----------
133 ----------
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 The IO state flag to set.
135 The IO state flag to set.
136
136
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 """
138 """
139 def add_io_state_callback():
139 def add_io_state_callback():
140 if not self.iostate & state:
140 if not self.iostate & state:
141 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
143 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
144
144
145 def drop_io_state(self, state):
145 def drop_io_state(self, state):
146 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 The IO state flag to set.
151 The IO state flag to set.
152
152
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 """
154 """
155 def drop_io_state_callback():
155 def drop_io_state_callback():
156 if self.iostate & state:
156 if self.iostate & state:
157 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
159 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
160
160
161
161
162 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
163 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
164 """
164 """
165
165
166 command_queue = None
166 command_queue = None
167
167
168 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
169 super(XReqSocketChannel, self).__init__(context, session, address)
169 super(XReqSocketChannel, self).__init__(context, session, address)
170 self.command_queue = Queue()
170 self.command_queue = Queue()
171 self.ioloop = ioloop.IOLoop()
171 self.ioloop = ioloop.IOLoop()
172
172
173 def run(self):
173 def run(self):
174 """The thread's main activity. Call start() instead."""
174 """The thread's main activity. Call start() instead."""
175 self.socket = self.context.socket(zmq.XREQ)
175 self.socket = self.context.socket(zmq.XREQ)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 self.socket.connect('tcp://%s:%i' % self.address)
177 self.socket.connect('tcp://%s:%i' % self.address)
178 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
180 self.iostate)
180 self.iostate)
181 self.ioloop.start()
181 self.ioloop.start()
182
182
183 def stop(self):
183 def stop(self):
184 self.ioloop.stop()
184 self.ioloop.stop()
185 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
186
186
187 def call_handlers(self, msg):
187 def call_handlers(self, msg):
188 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
189
189
190 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
191 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
192 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
193 handlers are called in the application thread.
193 handlers are called in the application thread.
194 """
194 """
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196
196
197 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
198 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
199 """Execute code in the kernel.
199 """Execute code in the kernel.
200
200
201 Parameters
201 Parameters
202 ----------
202 ----------
203 code : str
203 code : str
204 A string of Python code.
204 A string of Python code.
205
205
206 silent : bool, optional (default False)
206 silent : bool, optional (default False)
207 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
208
208
209 user_variables : list, optional
209 user_variables : list, optional
210 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
211 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
212 :func:`repr` as values.
212 :func:`repr` as values.
213
213
214 user_expressions : dict, optional
214 user_expressions : dict, optional
215 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
216 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
217 and their :func:`repr` as values.
217 and their :func:`repr` as values.
218
218
219 Returns
219 Returns
220 -------
220 -------
221 The msg_id of the message sent.
221 The msg_id of the message sent.
222 """
222 """
223 if user_variables is None:
223 if user_variables is None:
224 user_variables = []
224 user_variables = []
225 if user_expressions is None:
225 if user_expressions is None:
226 user_expressions = {}
226 user_expressions = {}
227
227
228 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
229 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
230 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
231 validate_string_list(user_variables)
231 validate_string_list(user_variables)
232 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
233
233
234 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
235 # not in Session.
235 # not in Session.
236 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
237 user_variables=user_variables,
237 user_variables=user_variables,
238 user_expressions=user_expressions)
238 user_expressions=user_expressions)
239 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
240 self._queue_request(msg)
240 self._queue_request(msg)
241 return msg['header']['msg_id']
241 return msg['header']['msg_id']
242
242
243 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
244 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
245
245
246 Parameters
246 Parameters
247 ----------
247 ----------
248 text : str
248 text : str
249 The text to complete.
249 The text to complete.
250 line : str
250 line : str
251 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
252 text to complete.
252 text to complete.
253 cursor_pos : int
253 cursor_pos : int
254 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
255 requested.
255 requested.
256 block : str, optional
256 block : str, optional
257 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
258
258
259 Returns
259 Returns
260 -------
260 -------
261 The msg_id of the message sent.
261 The msg_id of the message sent.
262 """
262 """
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
265 self._queue_request(msg)
265 self._queue_request(msg)
266 return msg['header']['msg_id']
266 return msg['header']['msg_id']
267
267
268 def object_info(self, oname):
268 def object_info(self, oname):
269 """Get metadata information about an object.
269 """Get metadata information about an object.
270
270
271 Parameters
271 Parameters
272 ----------
272 ----------
273 oname : str
273 oname : str
274 A string specifying the object name.
274 A string specifying the object name.
275
275
276 Returns
276 Returns
277 -------
277 -------
278 The msg_id of the message sent.
278 The msg_id of the message sent.
279 """
279 """
280 content = dict(oname=oname)
280 content = dict(oname=oname)
281 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
282 self._queue_request(msg)
282 self._queue_request(msg)
283 return msg['header']['msg_id']
283 return msg['header']['msg_id']
284
284
285 def history(self, index=None, raw=False, output=True):
285 def history(self, index=None, raw=False, output=True):
286 """Get the history list.
286 """Get the history list.
287
287
288 Parameters
288 Parameters
289 ----------
289 ----------
290 index : n or (n1, n2) or None
290 index : n or (n1, n2) or None
291 If n, then the last entries. If a tuple, then all in
291 If n, then the last entries. If a tuple, then all in
292 range(n1, n2). If None, then all entries. Raises IndexError if
292 range(n1, n2). If None, then all entries. Raises IndexError if
293 the format of index is incorrect.
293 the format of index is incorrect.
294 raw : bool
294 raw : bool
295 If True, return the raw input.
295 If True, return the raw input.
296 output : bool
296 output : bool
297 If True, then return the output as well.
297 If True, then return the output as well.
298
298
299 Returns
299 Returns
300 -------
300 -------
301 The msg_id of the message sent.
301 The msg_id of the message sent.
302 """
302 """
303 content = dict(index=index, raw=raw, output=output)
303 content = dict(index=index, raw=raw, output=output)
304 msg = self.session.msg('history_request', content)
304 msg = self.session.msg('history_request', content)
305 self._queue_request(msg)
305 self._queue_request(msg)
306 return msg['header']['msg_id']
306 return msg['header']['msg_id']
307
307
308 def shutdown(self):
308 def shutdown(self):
309 """Request an immediate kernel shutdown.
309 """Request an immediate kernel shutdown.
310
310
311 Upon receipt of the (empty) reply, client code can safely assume that
311 Upon receipt of the (empty) reply, client code can safely assume that
312 the kernel has shut down and it's safe to forcefully terminate it if
312 the kernel has shut down and it's safe to forcefully terminate it if
313 it's still alive.
313 it's still alive.
314
314
315 The kernel will send the reply via a function registered with Python's
315 The kernel will send the reply via a function registered with Python's
316 atexit module, ensuring it's truly done as the kernel is done with all
316 atexit module, ensuring it's truly done as the kernel is done with all
317 normal operation.
317 normal operation.
318 """
318 """
319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # Send quit message to kernel. Once we implement kernel-side setattr,
320 # this should probably be done that way, but for now this will do.
320 # this should probably be done that way, but for now this will do.
321 msg = self.session.msg('shutdown_request', {})
321 msg = self.session.msg('shutdown_request', {})
322 self._queue_request(msg)
322 self._queue_request(msg)
323 return msg['header']['msg_id']
323 return msg['header']['msg_id']
324
324
325 def _handle_events(self, socket, events):
325 def _handle_events(self, socket, events):
326 if events & POLLERR:
326 if events & POLLERR:
327 self._handle_err()
327 self._handle_err()
328 if events & POLLOUT:
328 if events & POLLOUT:
329 self._handle_send()
329 self._handle_send()
330 if events & POLLIN:
330 if events & POLLIN:
331 self._handle_recv()
331 self._handle_recv()
332
332
333 def _handle_recv(self):
333 def _handle_recv(self):
334 msg = self.socket.recv_json()
334 msg = self.socket.recv_json()
335 self.call_handlers(msg)
335 self.call_handlers(msg)
336
336
337 def _handle_send(self):
337 def _handle_send(self):
338 try:
338 try:
339 msg = self.command_queue.get(False)
339 msg = self.command_queue.get(False)
340 except Empty:
340 except Empty:
341 pass
341 pass
342 else:
342 else:
343 self.socket.send_json(msg)
343 self.socket.send_json(msg)
344 if self.command_queue.empty():
344 if self.command_queue.empty():
345 self.drop_io_state(POLLOUT)
345 self.drop_io_state(POLLOUT)
346
346
347 def _handle_err(self):
347 def _handle_err(self):
348 # We don't want to let this go silently, so eventually we should log.
348 # We don't want to let this go silently, so eventually we should log.
349 raise zmq.ZMQError()
349 raise zmq.ZMQError()
350
350
351 def _queue_request(self, msg):
351 def _queue_request(self, msg):
352 self.command_queue.put(msg)
352 self.command_queue.put(msg)
353 self.add_io_state(POLLOUT)
353 self.add_io_state(POLLOUT)
354
354
355
355
356 class SubSocketChannel(ZmqSocketChannel):
356 class SubSocketChannel(ZmqSocketChannel):
357 """The SUB channel which listens for messages that the kernel publishes.
357 """The SUB channel which listens for messages that the kernel publishes.
358 """
358 """
359
359
360 def __init__(self, context, session, address):
360 def __init__(self, context, session, address):
361 super(SubSocketChannel, self).__init__(context, session, address)
361 super(SubSocketChannel, self).__init__(context, session, address)
362 self.ioloop = ioloop.IOLoop()
362 self.ioloop = ioloop.IOLoop()
363
363
364 def run(self):
364 def run(self):
365 """The thread's main activity. Call start() instead."""
365 """The thread's main activity. Call start() instead."""
366 self.socket = self.context.socket(zmq.SUB)
366 self.socket = self.context.socket(zmq.SUB)
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 self.socket.connect('tcp://%s:%i' % self.address)
369 self.socket.connect('tcp://%s:%i' % self.address)
370 self.iostate = POLLIN|POLLERR
370 self.iostate = POLLIN|POLLERR
371 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.ioloop.add_handler(self.socket, self._handle_events,
372 self.iostate)
372 self.iostate)
373 self.ioloop.start()
373 self.ioloop.start()
374
374
375 def stop(self):
375 def stop(self):
376 self.ioloop.stop()
376 self.ioloop.stop()
377 super(SubSocketChannel, self).stop()
377 super(SubSocketChannel, self).stop()
378
378
379 def call_handlers(self, msg):
379 def call_handlers(self, msg):
380 """This method is called in the ioloop thread when a message arrives.
380 """This method is called in the ioloop thread when a message arrives.
381
381
382 Subclasses should override this method to handle incoming messages.
382 Subclasses should override this method to handle incoming messages.
383 It is important to remember that this method is called in the thread
383 It is important to remember that this method is called in the thread
384 so that some logic must be done to ensure that the application leve
384 so that some logic must be done to ensure that the application leve
385 handlers are called in the application thread.
385 handlers are called in the application thread.
386 """
386 """
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
388
388
389 def flush(self, timeout=1.0):
389 def flush(self, timeout=1.0):
390 """Immediately processes all pending messages on the SUB channel.
390 """Immediately processes all pending messages on the SUB channel.
391
391
392 Callers should use this method to ensure that :method:`call_handlers`
392 Callers should use this method to ensure that :method:`call_handlers`
393 has been called for all messages that have been received on the
393 has been called for all messages that have been received on the
394 0MQ SUB socket of this channel.
394 0MQ SUB socket of this channel.
395
395
396 This method is thread safe.
396 This method is thread safe.
397
397
398 Parameters
398 Parameters
399 ----------
399 ----------
400 timeout : float, optional
400 timeout : float, optional
401 The maximum amount of time to spend flushing, in seconds. The
401 The maximum amount of time to spend flushing, in seconds. The
402 default is one second.
402 default is one second.
403 """
403 """
404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # We do the IOLoop callback process twice to ensure that the IOLoop
405 # gets to perform at least one full poll.
405 # gets to perform at least one full poll.
406 stop_time = time.time() + timeout
406 stop_time = time.time() + timeout
407 for i in xrange(2):
407 for i in xrange(2):
408 self._flushed = False
408 self._flushed = False
409 self.ioloop.add_callback(self._flush)
409 self.ioloop.add_callback(self._flush)
410 while not self._flushed and time.time() < stop_time:
410 while not self._flushed and time.time() < stop_time:
411 time.sleep(0.01)
411 time.sleep(0.01)
412
412
413 def _handle_events(self, socket, events):
413 def _handle_events(self, socket, events):
414 # Turn on and off POLLOUT depending on if we have made a request
414 # Turn on and off POLLOUT depending on if we have made a request
415 if events & POLLERR:
415 if events & POLLERR:
416 self._handle_err()
416 self._handle_err()
417 if events & POLLIN:
417 if events & POLLIN:
418 self._handle_recv()
418 self._handle_recv()
419
419
420 def _handle_err(self):
420 def _handle_err(self):
421 # We don't want to let this go silently, so eventually we should log.
421 # We don't want to let this go silently, so eventually we should log.
422 raise zmq.ZMQError()
422 raise zmq.ZMQError()
423
423
424 def _handle_recv(self):
424 def _handle_recv(self):
425 # Get all of the messages we can
425 # Get all of the messages we can
426 while True:
426 while True:
427 try:
427 try:
428 msg = self.socket.recv_json(zmq.NOBLOCK)
428 msg = self.socket.recv_json(zmq.NOBLOCK)
429 except zmq.ZMQError:
429 except zmq.ZMQError:
430 # Check the errno?
430 # Check the errno?
431 # Will this trigger POLLERR?
431 # Will this trigger POLLERR?
432 break
432 break
433 else:
433 else:
434 self.call_handlers(msg)
434 self.call_handlers(msg)
435
435
436 def _flush(self):
436 def _flush(self):
437 """Callback for :method:`self.flush`."""
437 """Callback for :method:`self.flush`."""
438 self._flushed = True
438 self._flushed = True
439
439
440
440
441 class RepSocketChannel(ZmqSocketChannel):
441 class RepSocketChannel(ZmqSocketChannel):
442 """A reply channel to handle raw_input requests that the kernel makes."""
442 """A reply channel to handle raw_input requests that the kernel makes."""
443
443
444 msg_queue = None
444 msg_queue = None
445
445
446 def __init__(self, context, session, address):
446 def __init__(self, context, session, address):
447 super(RepSocketChannel, self).__init__(context, session, address)
447 super(RepSocketChannel, self).__init__(context, session, address)
448 self.ioloop = ioloop.IOLoop()
448 self.ioloop = ioloop.IOLoop()
449 self.msg_queue = Queue()
449 self.msg_queue = Queue()
450
450
451 def run(self):
451 def run(self):
452 """The thread's main activity. Call start() instead."""
452 """The thread's main activity. Call start() instead."""
453 self.socket = self.context.socket(zmq.XREQ)
453 self.socket = self.context.socket(zmq.XREQ)
454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 self.socket.connect('tcp://%s:%i' % self.address)
455 self.socket.connect('tcp://%s:%i' % self.address)
456 self.iostate = POLLERR|POLLIN
456 self.iostate = POLLERR|POLLIN
457 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.ioloop.add_handler(self.socket, self._handle_events,
458 self.iostate)
458 self.iostate)
459 self.ioloop.start()
459 self.ioloop.start()
460
460
461 def stop(self):
461 def stop(self):
462 self.ioloop.stop()
462 self.ioloop.stop()
463 super(RepSocketChannel, self).stop()
463 super(RepSocketChannel, self).stop()
464
464
465 def call_handlers(self, msg):
465 def call_handlers(self, msg):
466 """This method is called in the ioloop thread when a message arrives.
466 """This method is called in the ioloop thread when a message arrives.
467
467
468 Subclasses should override this method to handle incoming messages.
468 Subclasses should override this method to handle incoming messages.
469 It is important to remember that this method is called in the thread
469 It is important to remember that this method is called in the thread
470 so that some logic must be done to ensure that the application leve
470 so that some logic must be done to ensure that the application leve
471 handlers are called in the application thread.
471 handlers are called in the application thread.
472 """
472 """
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
474
474
475 def input(self, string):
475 def input(self, string):
476 """Send a string of raw input to the kernel."""
476 """Send a string of raw input to the kernel."""
477 content = dict(value=string)
477 content = dict(value=string)
478 msg = self.session.msg('input_reply', content)
478 msg = self.session.msg('input_reply', content)
479 self._queue_reply(msg)
479 self._queue_reply(msg)
480
480
481 def _handle_events(self, socket, events):
481 def _handle_events(self, socket, events):
482 if events & POLLERR:
482 if events & POLLERR:
483 self._handle_err()
483 self._handle_err()
484 if events & POLLOUT:
484 if events & POLLOUT:
485 self._handle_send()
485 self._handle_send()
486 if events & POLLIN:
486 if events & POLLIN:
487 self._handle_recv()
487 self._handle_recv()
488
488
489 def _handle_recv(self):
489 def _handle_recv(self):
490 msg = self.socket.recv_json()
490 msg = self.socket.recv_json()
491 self.call_handlers(msg)
491 self.call_handlers(msg)
492
492
493 def _handle_send(self):
493 def _handle_send(self):
494 try:
494 try:
495 msg = self.msg_queue.get(False)
495 msg = self.msg_queue.get(False)
496 except Empty:
496 except Empty:
497 pass
497 pass
498 else:
498 else:
499 self.socket.send_json(msg)
499 self.socket.send_json(msg)
500 if self.msg_queue.empty():
500 if self.msg_queue.empty():
501 self.drop_io_state(POLLOUT)
501 self.drop_io_state(POLLOUT)
502
502
503 def _handle_err(self):
503 def _handle_err(self):
504 # We don't want to let this go silently, so eventually we should log.
504 # We don't want to let this go silently, so eventually we should log.
505 raise zmq.ZMQError()
505 raise zmq.ZMQError()
506
506
507 def _queue_reply(self, msg):
507 def _queue_reply(self, msg):
508 self.msg_queue.put(msg)
508 self.msg_queue.put(msg)
509 self.add_io_state(POLLOUT)
509 self.add_io_state(POLLOUT)
510
510
511
511
512 class HBSocketChannel(ZmqSocketChannel):
512 class HBSocketChannel(ZmqSocketChannel):
513 """The heartbeat channel which monitors the kernel heartbeat."""
513 """The heartbeat channel which monitors the kernel heartbeat."""
514
514
515 time_to_dead = 3.0
515 time_to_dead = 3.0
516 socket = None
516 socket = None
517 poller = None
517 poller = None
518 _running = None
519 _pause = None
518
520
519 def __init__(self, context, session, address):
521 def __init__(self, context, session, address):
520 super(HBSocketChannel, self).__init__(context, session, address)
522 super(HBSocketChannel, self).__init__(context, session, address)
521 self._running = False
523 self._running = False
524 self._pause = False
522
525
523 def _create_socket(self):
526 def _create_socket(self):
524 self.socket = self.context.socket(zmq.REQ)
527 self.socket = self.context.socket(zmq.REQ)
525 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
528 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
526 self.socket.connect('tcp://%s:%i' % self.address)
529 self.socket.connect('tcp://%s:%i' % self.address)
527 self.poller = zmq.Poller()
530 self.poller = zmq.Poller()
528 self.poller.register(self.socket, zmq.POLLIN)
531 self.poller.register(self.socket, zmq.POLLIN)
529
532
530 def run(self):
533 def run(self):
531 """The thread's main activity. Call start() instead."""
534 """The thread's main activity. Call start() instead."""
532 self._create_socket()
535 self._create_socket()
533 self._running = True
536 self._running = True
534 # Wait 2 seconds for the kernel to come up and the sockets to auto
537 # Wait 2 seconds for the kernel to come up and the sockets to auto
535 # connect. If we don't we will see the kernel as dead. Also, before
538 # connect. If we don't we will see the kernel as dead. Also, before
536 # the sockets are connected, the poller.poll line below is returning
539 # the sockets are connected, the poller.poll line below is returning
537 # too fast. This avoids that because the polling doesn't start until
540 # too fast. This avoids that because the polling doesn't start until
538 # after the sockets are connected.
541 # after the sockets are connected.
539 time.sleep(2.0)
542 time.sleep(2.0)
540 while self._running:
543 while self._running:
544 if self._pause:
545 time.sleep(self.time_to_dead)
546 else:
541 since_last_heartbeat = 0.0
547 since_last_heartbeat = 0.0
542 request_time = time.time()
548 request_time = time.time()
543 try:
549 try:
544 #io.rprint('Ping from HB channel') # dbg
550 #io.rprint('Ping from HB channel') # dbg
545 self.socket.send_json('ping')
551 self.socket.send_json('ping')
546 except zmq.ZMQError, e:
552 except zmq.ZMQError, e:
547 #io.rprint('*** HB Error:', e) # dbg
553 #io.rprint('*** HB Error:', e) # dbg
548 if e.errno == zmq.EFSM:
554 if e.errno == zmq.EFSM:
549 #io.rprint('sleep...', self.time_to_dead) # dbg
555 #io.rprint('sleep...', self.time_to_dead) # dbg
550 time.sleep(self.time_to_dead)
556 time.sleep(self.time_to_dead)
551 self._create_socket()
557 self._create_socket()
552 else:
558 else:
553 raise
559 raise
554 else:
560 else:
555 while True:
561 while True:
556 try:
562 try:
557 self.socket.recv_json(zmq.NOBLOCK)
563 self.socket.recv_json(zmq.NOBLOCK)
558 except zmq.ZMQError, e:
564 except zmq.ZMQError, e:
559 #io.rprint('*** HB Error 2:', e) # dbg
565 #io.rprint('*** HB Error 2:', e) # dbg
560 if e.errno == zmq.EAGAIN:
566 if e.errno == zmq.EAGAIN:
561 before_poll = time.time()
567 before_poll = time.time()
562 until_dead = self.time_to_dead - (before_poll -
568 until_dead = self.time_to_dead - (before_poll -
563 request_time)
569 request_time)
564
570
565 # When the return value of poll() is an empty list,
571 # When the return value of poll() is an empty list,
566 # that is when things have gone wrong (zeromq bug).
572 # that is when things have gone wrong (zeromq bug).
567 # As long as it is not an empty list, poll is
573 # As long as it is not an empty list, poll is
568 # working correctly even if it returns quickly.
574 # working correctly even if it returns quickly.
569 # Note: poll timeout is in milliseconds.
575 # Note: poll timeout is in milliseconds.
570 self.poller.poll(1000*until_dead)
576 self.poller.poll(1000*until_dead)
571
577
572 since_last_heartbeat = time.time() - request_time
578 since_last_heartbeat = time.time() - request_time
573 if since_last_heartbeat > self.time_to_dead:
579 if since_last_heartbeat > self.time_to_dead:
574 self.call_handlers(since_last_heartbeat)
580 self.call_handlers(since_last_heartbeat)
575 break
581 break
576 else:
582 else:
577 # FIXME: We should probably log this instead.
583 # FIXME: We should probably log this instead.
578 raise
584 raise
579 else:
585 else:
580 until_dead = self.time_to_dead - (time.time() -
586 until_dead = self.time_to_dead - (time.time() -
581 request_time)
587 request_time)
582 if until_dead > 0.0:
588 if until_dead > 0.0:
583 #io.rprint('sleep...', self.time_to_dead) # dbg
589 #io.rprint('sleep...', self.time_to_dead) # dbg
584 time.sleep(until_dead)
590 time.sleep(until_dead)
585 break
591 break
586
592
593 def pause(self):
594 """Pause the heartbeat."""
595 self._pause = True
596
597 def unpause(self):
598 """Unpause the heartbeat."""
599 self._pause = False
600
601 def is_beating(self):
602 """Is the heartbeat running and not paused."""
603 if self.is_alive() and not self._pause:
604 return True
605 else:
606 return False
607
587 def stop(self):
608 def stop(self):
588 self._running = False
609 self._running = False
589 super(HBSocketChannel, self).stop()
610 super(HBSocketChannel, self).stop()
590
611
591 def call_handlers(self, since_last_heartbeat):
612 def call_handlers(self, since_last_heartbeat):
592 """This method is called in the ioloop thread when a message arrives.
613 """This method is called in the ioloop thread when a message arrives.
593
614
594 Subclasses should override this method to handle incoming messages.
615 Subclasses should override this method to handle incoming messages.
595 It is important to remember that this method is called in the thread
616 It is important to remember that this method is called in the thread
596 so that some logic must be done to ensure that the application leve
617 so that some logic must be done to ensure that the application leve
597 handlers are called in the application thread.
618 handlers are called in the application thread.
598 """
619 """
599 raise NotImplementedError('call_handlers must be defined in a subclass.')
620 raise NotImplementedError('call_handlers must be defined in a subclass.')
600
621
601
622
602 #-----------------------------------------------------------------------------
623 #-----------------------------------------------------------------------------
603 # Main kernel manager class
624 # Main kernel manager class
604 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
605
626
606 class KernelManager(HasTraits):
627 class KernelManager(HasTraits):
607 """ Manages a kernel for a frontend.
628 """ Manages a kernel for a frontend.
608
629
609 The SUB channel is for the frontend to receive messages published by the
630 The SUB channel is for the frontend to receive messages published by the
610 kernel.
631 kernel.
611
632
612 The REQ channel is for the frontend to make requests of the kernel.
633 The REQ channel is for the frontend to make requests of the kernel.
613
634
614 The REP channel is for the kernel to request stdin (raw_input) from the
635 The REP channel is for the kernel to request stdin (raw_input) from the
615 frontend.
636 frontend.
616 """
637 """
617 # The PyZMQ Context to use for communication with the kernel.
638 # The PyZMQ Context to use for communication with the kernel.
618 context = Instance(zmq.Context,(),{})
639 context = Instance(zmq.Context,(),{})
619
640
620 # The Session to use for communication with the kernel.
641 # The Session to use for communication with the kernel.
621 session = Instance(Session,(),{})
642 session = Instance(Session,(),{})
622
643
623 # The kernel process with which the KernelManager is communicating.
644 # The kernel process with which the KernelManager is communicating.
624 kernel = Instance(Popen)
645 kernel = Instance(Popen)
625
646
626 # The addresses for the communication channels.
647 # The addresses for the communication channels.
627 xreq_address = TCPAddress((LOCALHOST, 0))
648 xreq_address = TCPAddress((LOCALHOST, 0))
628 sub_address = TCPAddress((LOCALHOST, 0))
649 sub_address = TCPAddress((LOCALHOST, 0))
629 rep_address = TCPAddress((LOCALHOST, 0))
650 rep_address = TCPAddress((LOCALHOST, 0))
630 hb_address = TCPAddress((LOCALHOST, 0))
651 hb_address = TCPAddress((LOCALHOST, 0))
631
652
632 # The classes to use for the various channels.
653 # The classes to use for the various channels.
633 xreq_channel_class = Type(XReqSocketChannel)
654 xreq_channel_class = Type(XReqSocketChannel)
634 sub_channel_class = Type(SubSocketChannel)
655 sub_channel_class = Type(SubSocketChannel)
635 rep_channel_class = Type(RepSocketChannel)
656 rep_channel_class = Type(RepSocketChannel)
636 hb_channel_class = Type(HBSocketChannel)
657 hb_channel_class = Type(HBSocketChannel)
637
658
638 # Protected traits.
659 # Protected traits.
639 _launch_args = Any
660 _launch_args = Any
640 _xreq_channel = Any
661 _xreq_channel = Any
641 _sub_channel = Any
662 _sub_channel = Any
642 _rep_channel = Any
663 _rep_channel = Any
643 _hb_channel = Any
664 _hb_channel = Any
644
665
645 #--------------------------------------------------------------------------
666 #--------------------------------------------------------------------------
646 # Channel management methods:
667 # Channel management methods:
647 #--------------------------------------------------------------------------
668 #--------------------------------------------------------------------------
648
669
649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 def start_channels(self, xreq=True, sub=True, rep=True):
650 """Starts the channels for this kernel.
671 """Starts the channels for this kernel, but not the heartbeat.
651
672
652 This will create the channels if they do not exist and then start
673 This will create the channels if they do not exist and then start
653 them. If port numbers of 0 are being used (random ports) then you
674 them. If port numbers of 0 are being used (random ports) then you
654 must first call :method:`start_kernel`. If the channels have been
675 must first call :method:`start_kernel`. If the channels have been
655 stopped and you call this, :class:`RuntimeError` will be raised.
676 stopped and you call this, :class:`RuntimeError` will be raised.
656 """
677 """
657 if xreq:
678 if xreq:
658 self.xreq_channel.start()
679 self.xreq_channel.start()
659 if sub:
680 if sub:
660 self.sub_channel.start()
681 self.sub_channel.start()
661 if rep:
682 if rep:
662 self.rep_channel.start()
683 self.rep_channel.start()
663 if hb:
664 self.hb_channel.start()
665
684
666 def stop_channels(self):
685 def stop_channels(self):
667 """Stops all the running channels for this kernel.
686 """Stops all the running channels for this kernel.
668 """
687 """
669 if self.xreq_channel.is_alive():
688 if self.xreq_channel.is_alive():
670 self.xreq_channel.stop()
689 self.xreq_channel.stop()
671 if self.sub_channel.is_alive():
690 if self.sub_channel.is_alive():
672 self.sub_channel.stop()
691 self.sub_channel.stop()
673 if self.rep_channel.is_alive():
692 if self.rep_channel.is_alive():
674 self.rep_channel.stop()
693 self.rep_channel.stop()
675 if self.hb_channel.is_alive():
676 self.hb_channel.stop()
677
694
678 @property
695 @property
679 def channels_running(self):
696 def channels_running(self):
680 """Are any of the channels created and running?"""
697 """Are any of the channels created and running?"""
681 return self.xreq_channel.is_alive() \
698 return self.xreq_channel.is_alive() \
682 or self.sub_channel.is_alive() \
699 or self.sub_channel.is_alive() \
683 or self.rep_channel.is_alive() \
700 or self.rep_channel.is_alive()
684 or self.hb_channel.is_alive()
685
701
686 #--------------------------------------------------------------------------
702 #--------------------------------------------------------------------------
687 # Kernel process management methods:
703 # Kernel process management methods:
688 #--------------------------------------------------------------------------
704 #--------------------------------------------------------------------------
689
705
690 def start_kernel(self, **kw):
706 def start_kernel(self, **kw):
691 """Starts a kernel process and configures the manager to use it.
707 """Starts a kernel process and configures the manager to use it.
692
708
693 If random ports (port=0) are being used, this method must be called
709 If random ports (port=0) are being used, this method must be called
694 before the channels are created.
710 before the channels are created.
695
711
696 Parameters:
712 Parameters:
697 -----------
713 -----------
698 ipython : bool, optional (default True)
714 ipython : bool, optional (default True)
699 Whether to use an IPython kernel instead of a plain Python kernel.
715 Whether to use an IPython kernel instead of a plain Python kernel.
700 """
716 """
701 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
717 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
702 self.rep_address, self.hb_address
718 self.rep_address, self.hb_address
703 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
719 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
704 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
720 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
705 raise RuntimeError("Can only launch a kernel on localhost."
721 raise RuntimeError("Can only launch a kernel on localhost."
706 "Make sure that the '*_address' attributes are "
722 "Make sure that the '*_address' attributes are "
707 "configured properly.")
723 "configured properly.")
708
724
709 self._launch_args = kw.copy()
725 self._launch_args = kw.copy()
710 if kw.pop('ipython', True):
726 if kw.pop('ipython', True):
711 from ipkernel import launch_kernel
727 from ipkernel import launch_kernel
712 else:
728 else:
713 from pykernel import launch_kernel
729 from pykernel import launch_kernel
714 self.kernel, xrep, pub, req, hb = launch_kernel(
730 self.kernel, xrep, pub, req, hb = launch_kernel(
715 xrep_port=xreq[1], pub_port=sub[1],
731 xrep_port=xreq[1], pub_port=sub[1],
716 req_port=rep[1], hb_port=hb[1], **kw)
732 req_port=rep[1], hb_port=hb[1], **kw)
717 self.xreq_address = (LOCALHOST, xrep)
733 self.xreq_address = (LOCALHOST, xrep)
718 self.sub_address = (LOCALHOST, pub)
734 self.sub_address = (LOCALHOST, pub)
719 self.rep_address = (LOCALHOST, req)
735 self.rep_address = (LOCALHOST, req)
720 self.hb_address = (LOCALHOST, hb)
736 self.hb_address = (LOCALHOST, hb)
721
737
722 def shutdown_kernel(self):
738 def shutdown_kernel(self):
723 """ Attempts to the stop the kernel process cleanly. If the kernel
739 """ Attempts to the stop the kernel process cleanly. If the kernel
724 cannot be stopped, it is killed, if possible.
740 cannot be stopped, it is killed, if possible.
725 """
741 """
726 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
742 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
727 if sys.platform == 'win32':
743 if sys.platform == 'win32':
728 self.kill_kernel()
744 self.kill_kernel()
729 return
745 return
730
746
731 self.xreq_channel.shutdown()
747 self.xreq_channel.shutdown()
732 # Don't send any additional kernel kill messages immediately, to give
748 # Don't send any additional kernel kill messages immediately, to give
733 # the kernel a chance to properly execute shutdown actions. Wait for at
749 # the kernel a chance to properly execute shutdown actions. Wait for at
734 # most 1s, checking every 0.1s.
750 # most 1s, checking every 0.1s.
735 for i in range(10):
751 for i in range(10):
736 if self.is_alive:
752 if self.is_alive:
737 time.sleep(0.1)
753 time.sleep(0.1)
738 else:
754 else:
739 break
755 break
740 else:
756 else:
741 # OK, we've waited long enough.
757 # OK, we've waited long enough.
742 if self.has_kernel:
758 if self.has_kernel:
743 self.kill_kernel()
759 self.kill_kernel()
744
760
745 def restart_kernel(self, instant_death=False):
761 def restart_kernel(self, instant_death=False):
746 """Restarts a kernel with the same arguments that were used to launch
762 """Restarts a kernel with the same arguments that were used to launch
747 it. If the old kernel was launched with random ports, the same ports
763 it. If the old kernel was launched with random ports, the same ports
748 will be used for the new kernel.
764 will be used for the new kernel.
749
765
750 Parameters
766 Parameters
751 ----------
767 ----------
752 instant_death : bool, optional
768 instant_death : bool, optional
753 If True, the kernel is forcefully restarted *immediately*, without
769 If True, the kernel is forcefully restarted *immediately*, without
754 having a chance to do any cleanup action. Otherwise the kernel is
770 having a chance to do any cleanup action. Otherwise the kernel is
755 given 1s to clean up before a forceful restart is issued.
771 given 1s to clean up before a forceful restart is issued.
756
772
757 In all cases the kernel is restarted, the only difference is whether
773 In all cases the kernel is restarted, the only difference is whether
758 it is given a chance to perform a clean shutdown or not.
774 it is given a chance to perform a clean shutdown or not.
759 """
775 """
760 if self._launch_args is None:
776 if self._launch_args is None:
761 raise RuntimeError("Cannot restart the kernel. "
777 raise RuntimeError("Cannot restart the kernel. "
762 "No previous call to 'start_kernel'.")
778 "No previous call to 'start_kernel'.")
763 else:
779 else:
764 if self.has_kernel:
780 if self.has_kernel:
765 if instant_death:
781 if instant_death:
766 self.kill_kernel()
782 self.kill_kernel()
767 else:
783 else:
768 self.shutdown_kernel()
784 self.shutdown_kernel()
769 self.start_kernel(**self._launch_args)
785 self.start_kernel(**self._launch_args)
770
786
771 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
787 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
772 # unless there is some delay here.
788 # unless there is some delay here.
773 if sys.platform == 'win32':
789 if sys.platform == 'win32':
774 time.sleep(0.2)
790 time.sleep(0.2)
775
791
776 @property
792 @property
777 def has_kernel(self):
793 def has_kernel(self):
778 """Returns whether a kernel process has been specified for the kernel
794 """Returns whether a kernel process has been specified for the kernel
779 manager.
795 manager.
780 """
796 """
781 return self.kernel is not None
797 return self.kernel is not None
782
798
783 def kill_kernel(self):
799 def kill_kernel(self):
784 """ Kill the running kernel. """
800 """ Kill the running kernel. """
785 if self.kernel is not None:
801 if self.kernel is not None:
786 self.kernel.kill()
802 self.kernel.kill()
787 self.kernel = None
803 self.kernel = None
788 else:
804 else:
789 raise RuntimeError("Cannot kill kernel. No kernel is running!")
805 raise RuntimeError("Cannot kill kernel. No kernel is running!")
790
806
791 def signal_kernel(self, signum):
807 def signal_kernel(self, signum):
792 """ Sends a signal to the kernel. """
808 """ Sends a signal to the kernel. """
793 if self.kernel is not None:
809 if self.kernel is not None:
794 self.kernel.send_signal(signum)
810 self.kernel.send_signal(signum)
795 else:
811 else:
796 raise RuntimeError("Cannot signal kernel. No kernel is running!")
812 raise RuntimeError("Cannot signal kernel. No kernel is running!")
797
813
798 @property
814 @property
799 def is_alive(self):
815 def is_alive(self):
800 """Is the kernel process still running?"""
816 """Is the kernel process still running?"""
801 # FIXME: not using a heartbeat means this method is broken for any
817 # FIXME: not using a heartbeat means this method is broken for any
802 # remote kernel, it's only capable of handling local kernels.
818 # remote kernel, it's only capable of handling local kernels.
803 if self.kernel is not None:
819 if self.kernel is not None:
804 if self.kernel.poll() is None:
820 if self.kernel.poll() is None:
805 return True
821 return True
806 else:
822 else:
807 return False
823 return False
808 else:
824 else:
809 # We didn't start the kernel with this KernelManager so we don't
825 # We didn't start the kernel with this KernelManager so we don't
810 # know if it is running. We should use a heartbeat for this case.
826 # know if it is running. We should use a heartbeat for this case.
811 return True
827 return True
812
828
813 #--------------------------------------------------------------------------
829 #--------------------------------------------------------------------------
814 # Channels used for communication with the kernel:
830 # Channels used for communication with the kernel:
815 #--------------------------------------------------------------------------
831 #--------------------------------------------------------------------------
816
832
817 @property
833 @property
818 def xreq_channel(self):
834 def xreq_channel(self):
819 """Get the REQ socket channel object to make requests of the kernel."""
835 """Get the REQ socket channel object to make requests of the kernel."""
820 if self._xreq_channel is None:
836 if self._xreq_channel is None:
821 self._xreq_channel = self.xreq_channel_class(self.context,
837 self._xreq_channel = self.xreq_channel_class(self.context,
822 self.session,
838 self.session,
823 self.xreq_address)
839 self.xreq_address)
824 return self._xreq_channel
840 return self._xreq_channel
825
841
826 @property
842 @property
827 def sub_channel(self):
843 def sub_channel(self):
828 """Get the SUB socket channel object."""
844 """Get the SUB socket channel object."""
829 if self._sub_channel is None:
845 if self._sub_channel is None:
830 self._sub_channel = self.sub_channel_class(self.context,
846 self._sub_channel = self.sub_channel_class(self.context,
831 self.session,
847 self.session,
832 self.sub_address)
848 self.sub_address)
833 return self._sub_channel
849 return self._sub_channel
834
850
835 @property
851 @property
836 def rep_channel(self):
852 def rep_channel(self):
837 """Get the REP socket channel object to handle stdin (raw_input)."""
853 """Get the REP socket channel object to handle stdin (raw_input)."""
838 if self._rep_channel is None:
854 if self._rep_channel is None:
839 self._rep_channel = self.rep_channel_class(self.context,
855 self._rep_channel = self.rep_channel_class(self.context,
840 self.session,
856 self.session,
841 self.rep_address)
857 self.rep_address)
842 return self._rep_channel
858 return self._rep_channel
843
859
844 @property
860 @property
845 def hb_channel(self):
861 def hb_channel(self):
846 """Get the REP socket channel object to handle stdin (raw_input)."""
862 """Get the REP socket channel object to handle stdin (raw_input)."""
847 if self._hb_channel is None:
863 if self._hb_channel is None:
848 self._hb_channel = self.hb_channel_class(self.context,
864 self._hb_channel = self.hb_channel_class(self.context,
849 self.session,
865 self.session,
850 self.hb_address)
866 self.hb_address)
851 return self._hb_channel
867 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now