##// END OF EJS Templates
Fix serious bug in heartbeat logic that can result in no-timeout polls.
epatters -
Show More
@@ -1,927 +1,928 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 from Queue import Queue, Empty
20 from Queue import Queue, Empty
21 from subprocess import Popen
21 from subprocess import Popen
22 import signal
22 import signal
23 import sys
23 import sys
24 from threading import Thread
24 from threading import Thread
25 import time
25 import time
26 import logging
26 import logging
27
27
28 # System library imports.
28 # System library imports.
29 import zmq
29 import zmq
30 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq.eventloop import ioloop
31 from zmq.eventloop import ioloop
32
32
33 # Local imports.
33 # Local imports.
34 from IPython.utils import io
34 from IPython.utils import io
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 from session import Session, Message
37 from session import Session, Message
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Constants and exceptions
40 # Constants and exceptions
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
44 pass
44 pass
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Utility functions
47 # Utility functions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
51 # if they prove to have more generic utility
51 # if they prove to have more generic utility
52
52
53 def validate_string_list(lst):
53 def validate_string_list(lst):
54 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
55
55
56 Raises ValueError if not."""
56 Raises ValueError if not."""
57 if not isinstance(lst, list):
57 if not isinstance(lst, list):
58 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
59 for x in lst:
59 for x in lst:
60 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
61 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
62
62
63
63
64 def validate_string_dict(dct):
64 def validate_string_dict(dct):
65 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
66
66
67 Raises ValueError if not."""
67 Raises ValueError if not."""
68 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
69 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
70 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
71 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
72 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
73
73
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
80 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
81 """
81 """
82 context = None
82 context = None
83 session = None
83 session = None
84 socket = None
84 socket = None
85 ioloop = None
85 ioloop = None
86 iostate = None
86 iostate = None
87 _address = None
87 _address = None
88
88
89 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
90 """Create a channel
90 """Create a channel
91
91
92 Parameters
92 Parameters
93 ----------
93 ----------
94 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
95 The ZMQ context to use.
95 The ZMQ context to use.
96 session : :class:`session.Session`
96 session : :class:`session.Session`
97 The session to use.
97 The session to use.
98 address : tuple
98 address : tuple
99 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
100 """
100 """
101 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
102 self.daemon = True
102 self.daemon = True
103
103
104 self.context = context
104 self.context = context
105 self.session = session
105 self.session = session
106 if address[1] == 0:
106 if address[1] == 0:
107 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
108 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
109 self._address = address
109 self._address = address
110
110
111 def stop(self):
111 def stop(self):
112 """Stop the channel's activity.
112 """Stop the channel's activity.
113
113
114 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
115 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
116 :method:`self.start` is called again.
116 :method:`self.start` is called again.
117 """
117 """
118 self.join()
118 self.join()
119
119
120 @property
120 @property
121 def address(self):
121 def address(self):
122 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
123
123
124 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
125 port.
125 port.
126 """
126 """
127 return self._address
127 return self._address
128
128
129 def add_io_state(self, state):
129 def add_io_state(self, state):
130 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
131
131
132 Parameters
132 Parameters
133 ----------
133 ----------
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 The IO state flag to set.
135 The IO state flag to set.
136
136
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 """
138 """
139 def add_io_state_callback():
139 def add_io_state_callback():
140 if not self.iostate & state:
140 if not self.iostate & state:
141 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
143 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
144
144
145 def drop_io_state(self, state):
145 def drop_io_state(self, state):
146 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 The IO state flag to set.
151 The IO state flag to set.
152
152
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 """
154 """
155 def drop_io_state_callback():
155 def drop_io_state_callback():
156 if self.iostate & state:
156 if self.iostate & state:
157 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
159 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
160
160
161
161
162 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
163 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
164 """
164 """
165
165
166 command_queue = None
166 command_queue = None
167
167
168 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
169 super(XReqSocketChannel, self).__init__(context, session, address)
169 super(XReqSocketChannel, self).__init__(context, session, address)
170 self.command_queue = Queue()
170 self.command_queue = Queue()
171 self.ioloop = ioloop.IOLoop()
171 self.ioloop = ioloop.IOLoop()
172
172
173 def run(self):
173 def run(self):
174 """The thread's main activity. Call start() instead."""
174 """The thread's main activity. Call start() instead."""
175 self.socket = self.context.socket(zmq.XREQ)
175 self.socket = self.context.socket(zmq.XREQ)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 self.socket.connect('tcp://%s:%i' % self.address)
177 self.socket.connect('tcp://%s:%i' % self.address)
178 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
180 self.iostate)
180 self.iostate)
181 self.ioloop.start()
181 self.ioloop.start()
182
182
183 def stop(self):
183 def stop(self):
184 self.ioloop.stop()
184 self.ioloop.stop()
185 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
186
186
187 def call_handlers(self, msg):
187 def call_handlers(self, msg):
188 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
189
189
190 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
191 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
192 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
193 handlers are called in the application thread.
193 handlers are called in the application thread.
194 """
194 """
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196
196
197 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
198 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
199 """Execute code in the kernel.
199 """Execute code in the kernel.
200
200
201 Parameters
201 Parameters
202 ----------
202 ----------
203 code : str
203 code : str
204 A string of Python code.
204 A string of Python code.
205
205
206 silent : bool, optional (default False)
206 silent : bool, optional (default False)
207 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
208
208
209 user_variables : list, optional
209 user_variables : list, optional
210 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
211 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
212 :func:`repr` as values.
212 :func:`repr` as values.
213
213
214 user_expressions : dict, optional
214 user_expressions : dict, optional
215 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
216 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
217 and their :func:`repr` as values.
217 and their :func:`repr` as values.
218
218
219 Returns
219 Returns
220 -------
220 -------
221 The msg_id of the message sent.
221 The msg_id of the message sent.
222 """
222 """
223 if user_variables is None:
223 if user_variables is None:
224 user_variables = []
224 user_variables = []
225 if user_expressions is None:
225 if user_expressions is None:
226 user_expressions = {}
226 user_expressions = {}
227
227
228 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
229 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
230 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
231 validate_string_list(user_variables)
231 validate_string_list(user_variables)
232 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
233
233
234 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
235 # not in Session.
235 # not in Session.
236 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
237 user_variables=user_variables,
237 user_variables=user_variables,
238 user_expressions=user_expressions)
238 user_expressions=user_expressions)
239 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
240 self._queue_request(msg)
240 self._queue_request(msg)
241 return msg['header']['msg_id']
241 return msg['header']['msg_id']
242
242
243 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
244 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
245
245
246 Parameters
246 Parameters
247 ----------
247 ----------
248 text : str
248 text : str
249 The text to complete.
249 The text to complete.
250 line : str
250 line : str
251 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
252 text to complete.
252 text to complete.
253 cursor_pos : int
253 cursor_pos : int
254 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
255 requested.
255 requested.
256 block : str, optional
256 block : str, optional
257 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
258
258
259 Returns
259 Returns
260 -------
260 -------
261 The msg_id of the message sent.
261 The msg_id of the message sent.
262 """
262 """
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
265 self._queue_request(msg)
265 self._queue_request(msg)
266 return msg['header']['msg_id']
266 return msg['header']['msg_id']
267
267
268 def object_info(self, oname):
268 def object_info(self, oname):
269 """Get metadata information about an object.
269 """Get metadata information about an object.
270
270
271 Parameters
271 Parameters
272 ----------
272 ----------
273 oname : str
273 oname : str
274 A string specifying the object name.
274 A string specifying the object name.
275
275
276 Returns
276 Returns
277 -------
277 -------
278 The msg_id of the message sent.
278 The msg_id of the message sent.
279 """
279 """
280 content = dict(oname=oname)
280 content = dict(oname=oname)
281 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
282 self._queue_request(msg)
282 self._queue_request(msg)
283 return msg['header']['msg_id']
283 return msg['header']['msg_id']
284
284
285 def history_tail(self, n=10, raw=True, output=False):
285 def history_tail(self, n=10, raw=True, output=False):
286 """Get the history list.
286 """Get the history list.
287
287
288 Parameters
288 Parameters
289 ----------
289 ----------
290 n : int
290 n : int
291 The number of lines of history to get.
291 The number of lines of history to get.
292 raw : bool
292 raw : bool
293 If True, return the raw input.
293 If True, return the raw input.
294 output : bool
294 output : bool
295 If True, then return the output as well.
295 If True, then return the output as well.
296
296
297 Returns
297 Returns
298 -------
298 -------
299 The msg_id of the message sent.
299 The msg_id of the message sent.
300 """
300 """
301 content = dict(n=n, raw=raw, output=output)
301 content = dict(n=n, raw=raw, output=output)
302 msg = self.session.msg('history_tail_request', content)
302 msg = self.session.msg('history_tail_request', content)
303 self._queue_request(msg)
303 self._queue_request(msg)
304 return msg['header']['msg_id']
304 return msg['header']['msg_id']
305
305
306 def shutdown(self, restart=False):
306 def shutdown(self, restart=False):
307 """Request an immediate kernel shutdown.
307 """Request an immediate kernel shutdown.
308
308
309 Upon receipt of the (empty) reply, client code can safely assume that
309 Upon receipt of the (empty) reply, client code can safely assume that
310 the kernel has shut down and it's safe to forcefully terminate it if
310 the kernel has shut down and it's safe to forcefully terminate it if
311 it's still alive.
311 it's still alive.
312
312
313 The kernel will send the reply via a function registered with Python's
313 The kernel will send the reply via a function registered with Python's
314 atexit module, ensuring it's truly done as the kernel is done with all
314 atexit module, ensuring it's truly done as the kernel is done with all
315 normal operation.
315 normal operation.
316 """
316 """
317 # Send quit message to kernel. Once we implement kernel-side setattr,
317 # Send quit message to kernel. Once we implement kernel-side setattr,
318 # this should probably be done that way, but for now this will do.
318 # this should probably be done that way, but for now this will do.
319 msg = self.session.msg('shutdown_request', {'restart':restart})
319 msg = self.session.msg('shutdown_request', {'restart':restart})
320 self._queue_request(msg)
320 self._queue_request(msg)
321 return msg['header']['msg_id']
321 return msg['header']['msg_id']
322
322
323 def _handle_events(self, socket, events):
323 def _handle_events(self, socket, events):
324 if events & POLLERR:
324 if events & POLLERR:
325 self._handle_err()
325 self._handle_err()
326 if events & POLLOUT:
326 if events & POLLOUT:
327 self._handle_send()
327 self._handle_send()
328 if events & POLLIN:
328 if events & POLLIN:
329 self._handle_recv()
329 self._handle_recv()
330
330
331 def _handle_recv(self):
331 def _handle_recv(self):
332 ident,msg = self.session.recv(self.socket, 0)
332 ident,msg = self.session.recv(self.socket, 0)
333 self.call_handlers(msg)
333 self.call_handlers(msg)
334
334
335 def _handle_send(self):
335 def _handle_send(self):
336 try:
336 try:
337 msg = self.command_queue.get(False)
337 msg = self.command_queue.get(False)
338 except Empty:
338 except Empty:
339 pass
339 pass
340 else:
340 else:
341 self.session.send(self.socket,msg)
341 self.session.send(self.socket,msg)
342 if self.command_queue.empty():
342 if self.command_queue.empty():
343 self.drop_io_state(POLLOUT)
343 self.drop_io_state(POLLOUT)
344
344
345 def _handle_err(self):
345 def _handle_err(self):
346 # We don't want to let this go silently, so eventually we should log.
346 # We don't want to let this go silently, so eventually we should log.
347 raise zmq.ZMQError()
347 raise zmq.ZMQError()
348
348
349 def _queue_request(self, msg):
349 def _queue_request(self, msg):
350 self.command_queue.put(msg)
350 self.command_queue.put(msg)
351 self.add_io_state(POLLOUT)
351 self.add_io_state(POLLOUT)
352
352
353
353
354 class SubSocketChannel(ZmqSocketChannel):
354 class SubSocketChannel(ZmqSocketChannel):
355 """The SUB channel which listens for messages that the kernel publishes.
355 """The SUB channel which listens for messages that the kernel publishes.
356 """
356 """
357
357
358 def __init__(self, context, session, address):
358 def __init__(self, context, session, address):
359 super(SubSocketChannel, self).__init__(context, session, address)
359 super(SubSocketChannel, self).__init__(context, session, address)
360 self.ioloop = ioloop.IOLoop()
360 self.ioloop = ioloop.IOLoop()
361
361
362 def run(self):
362 def run(self):
363 """The thread's main activity. Call start() instead."""
363 """The thread's main activity. Call start() instead."""
364 self.socket = self.context.socket(zmq.SUB)
364 self.socket = self.context.socket(zmq.SUB)
365 self.socket.setsockopt(zmq.SUBSCRIBE,'')
365 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
366 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 self.socket.connect('tcp://%s:%i' % self.address)
367 self.socket.connect('tcp://%s:%i' % self.address)
368 self.iostate = POLLIN|POLLERR
368 self.iostate = POLLIN|POLLERR
369 self.ioloop.add_handler(self.socket, self._handle_events,
369 self.ioloop.add_handler(self.socket, self._handle_events,
370 self.iostate)
370 self.iostate)
371 self.ioloop.start()
371 self.ioloop.start()
372
372
373 def stop(self):
373 def stop(self):
374 self.ioloop.stop()
374 self.ioloop.stop()
375 super(SubSocketChannel, self).stop()
375 super(SubSocketChannel, self).stop()
376
376
377 def call_handlers(self, msg):
377 def call_handlers(self, msg):
378 """This method is called in the ioloop thread when a message arrives.
378 """This method is called in the ioloop thread when a message arrives.
379
379
380 Subclasses should override this method to handle incoming messages.
380 Subclasses should override this method to handle incoming messages.
381 It is important to remember that this method is called in the thread
381 It is important to remember that this method is called in the thread
382 so that some logic must be done to ensure that the application leve
382 so that some logic must be done to ensure that the application leve
383 handlers are called in the application thread.
383 handlers are called in the application thread.
384 """
384 """
385 raise NotImplementedError('call_handlers must be defined in a subclass.')
385 raise NotImplementedError('call_handlers must be defined in a subclass.')
386
386
387 def flush(self, timeout=1.0):
387 def flush(self, timeout=1.0):
388 """Immediately processes all pending messages on the SUB channel.
388 """Immediately processes all pending messages on the SUB channel.
389
389
390 Callers should use this method to ensure that :method:`call_handlers`
390 Callers should use this method to ensure that :method:`call_handlers`
391 has been called for all messages that have been received on the
391 has been called for all messages that have been received on the
392 0MQ SUB socket of this channel.
392 0MQ SUB socket of this channel.
393
393
394 This method is thread safe.
394 This method is thread safe.
395
395
396 Parameters
396 Parameters
397 ----------
397 ----------
398 timeout : float, optional
398 timeout : float, optional
399 The maximum amount of time to spend flushing, in seconds. The
399 The maximum amount of time to spend flushing, in seconds. The
400 default is one second.
400 default is one second.
401 """
401 """
402 # We do the IOLoop callback process twice to ensure that the IOLoop
402 # We do the IOLoop callback process twice to ensure that the IOLoop
403 # gets to perform at least one full poll.
403 # gets to perform at least one full poll.
404 stop_time = time.time() + timeout
404 stop_time = time.time() + timeout
405 for i in xrange(2):
405 for i in xrange(2):
406 self._flushed = False
406 self._flushed = False
407 self.ioloop.add_callback(self._flush)
407 self.ioloop.add_callback(self._flush)
408 while not self._flushed and time.time() < stop_time:
408 while not self._flushed and time.time() < stop_time:
409 time.sleep(0.01)
409 time.sleep(0.01)
410
410
411 def _handle_events(self, socket, events):
411 def _handle_events(self, socket, events):
412 # Turn on and off POLLOUT depending on if we have made a request
412 # Turn on and off POLLOUT depending on if we have made a request
413 if events & POLLERR:
413 if events & POLLERR:
414 self._handle_err()
414 self._handle_err()
415 if events & POLLIN:
415 if events & POLLIN:
416 self._handle_recv()
416 self._handle_recv()
417
417
418 def _handle_err(self):
418 def _handle_err(self):
419 # We don't want to let this go silently, so eventually we should log.
419 # We don't want to let this go silently, so eventually we should log.
420 raise zmq.ZMQError()
420 raise zmq.ZMQError()
421
421
422 def _handle_recv(self):
422 def _handle_recv(self):
423 # Get all of the messages we can
423 # Get all of the messages we can
424 while True:
424 while True:
425 try:
425 try:
426 ident,msg = self.session.recv(self.socket)
426 ident,msg = self.session.recv(self.socket)
427 except zmq.ZMQError:
427 except zmq.ZMQError:
428 # Check the errno?
428 # Check the errno?
429 # Will this trigger POLLERR?
429 # Will this trigger POLLERR?
430 break
430 break
431 else:
431 else:
432 if msg is None:
432 if msg is None:
433 break
433 break
434 self.call_handlers(msg)
434 self.call_handlers(msg)
435
435
436 def _flush(self):
436 def _flush(self):
437 """Callback for :method:`self.flush`."""
437 """Callback for :method:`self.flush`."""
438 self._flushed = True
438 self._flushed = True
439
439
440
440
441 class RepSocketChannel(ZmqSocketChannel):
441 class RepSocketChannel(ZmqSocketChannel):
442 """A reply channel to handle raw_input requests that the kernel makes."""
442 """A reply channel to handle raw_input requests that the kernel makes."""
443
443
444 msg_queue = None
444 msg_queue = None
445
445
446 def __init__(self, context, session, address):
446 def __init__(self, context, session, address):
447 super(RepSocketChannel, self).__init__(context, session, address)
447 super(RepSocketChannel, self).__init__(context, session, address)
448 self.ioloop = ioloop.IOLoop()
448 self.ioloop = ioloop.IOLoop()
449 self.msg_queue = Queue()
449 self.msg_queue = Queue()
450
450
451 def run(self):
451 def run(self):
452 """The thread's main activity. Call start() instead."""
452 """The thread's main activity. Call start() instead."""
453 self.socket = self.context.socket(zmq.XREQ)
453 self.socket = self.context.socket(zmq.XREQ)
454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 self.socket.connect('tcp://%s:%i' % self.address)
455 self.socket.connect('tcp://%s:%i' % self.address)
456 self.iostate = POLLERR|POLLIN
456 self.iostate = POLLERR|POLLIN
457 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.ioloop.add_handler(self.socket, self._handle_events,
458 self.iostate)
458 self.iostate)
459 self.ioloop.start()
459 self.ioloop.start()
460
460
461 def stop(self):
461 def stop(self):
462 self.ioloop.stop()
462 self.ioloop.stop()
463 super(RepSocketChannel, self).stop()
463 super(RepSocketChannel, self).stop()
464
464
465 def call_handlers(self, msg):
465 def call_handlers(self, msg):
466 """This method is called in the ioloop thread when a message arrives.
466 """This method is called in the ioloop thread when a message arrives.
467
467
468 Subclasses should override this method to handle incoming messages.
468 Subclasses should override this method to handle incoming messages.
469 It is important to remember that this method is called in the thread
469 It is important to remember that this method is called in the thread
470 so that some logic must be done to ensure that the application leve
470 so that some logic must be done to ensure that the application leve
471 handlers are called in the application thread.
471 handlers are called in the application thread.
472 """
472 """
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
474
474
475 def input(self, string):
475 def input(self, string):
476 """Send a string of raw input to the kernel."""
476 """Send a string of raw input to the kernel."""
477 content = dict(value=string)
477 content = dict(value=string)
478 msg = self.session.msg('input_reply', content)
478 msg = self.session.msg('input_reply', content)
479 self._queue_reply(msg)
479 self._queue_reply(msg)
480
480
481 def _handle_events(self, socket, events):
481 def _handle_events(self, socket, events):
482 if events & POLLERR:
482 if events & POLLERR:
483 self._handle_err()
483 self._handle_err()
484 if events & POLLOUT:
484 if events & POLLOUT:
485 self._handle_send()
485 self._handle_send()
486 if events & POLLIN:
486 if events & POLLIN:
487 self._handle_recv()
487 self._handle_recv()
488
488
489 def _handle_recv(self):
489 def _handle_recv(self):
490 ident,msg = self.session.recv(self.socket, 0)
490 ident,msg = self.session.recv(self.socket, 0)
491 self.call_handlers(msg)
491 self.call_handlers(msg)
492
492
493 def _handle_send(self):
493 def _handle_send(self):
494 try:
494 try:
495 msg = self.msg_queue.get(False)
495 msg = self.msg_queue.get(False)
496 except Empty:
496 except Empty:
497 pass
497 pass
498 else:
498 else:
499 self.session.send(self.socket,msg)
499 self.session.send(self.socket,msg)
500 if self.msg_queue.empty():
500 if self.msg_queue.empty():
501 self.drop_io_state(POLLOUT)
501 self.drop_io_state(POLLOUT)
502
502
503 def _handle_err(self):
503 def _handle_err(self):
504 # We don't want to let this go silently, so eventually we should log.
504 # We don't want to let this go silently, so eventually we should log.
505 raise zmq.ZMQError()
505 raise zmq.ZMQError()
506
506
507 def _queue_reply(self, msg):
507 def _queue_reply(self, msg):
508 self.msg_queue.put(msg)
508 self.msg_queue.put(msg)
509 self.add_io_state(POLLOUT)
509 self.add_io_state(POLLOUT)
510
510
511
511
512 class HBSocketChannel(ZmqSocketChannel):
512 class HBSocketChannel(ZmqSocketChannel):
513 """The heartbeat channel which monitors the kernel heartbeat.
513 """The heartbeat channel which monitors the kernel heartbeat.
514
514
515 Note that the heartbeat channel is paused by default. As long as you start
515 Note that the heartbeat channel is paused by default. As long as you start
516 this channel, the kernel manager will ensure that it is paused and un-paused
516 this channel, the kernel manager will ensure that it is paused and un-paused
517 as appropriate.
517 as appropriate.
518 """
518 """
519
519
520 time_to_dead = 3.0
520 time_to_dead = 3.0
521 socket = None
521 socket = None
522 poller = None
522 poller = None
523 _running = None
523 _running = None
524 _pause = None
524 _pause = None
525
525
526 def __init__(self, context, session, address):
526 def __init__(self, context, session, address):
527 super(HBSocketChannel, self).__init__(context, session, address)
527 super(HBSocketChannel, self).__init__(context, session, address)
528 self._running = False
528 self._running = False
529 self._pause = True
529 self._pause = True
530
530
531 def _create_socket(self):
531 def _create_socket(self):
532 self.socket = self.context.socket(zmq.REQ)
532 self.socket = self.context.socket(zmq.REQ)
533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
534 self.socket.connect('tcp://%s:%i' % self.address)
534 self.socket.connect('tcp://%s:%i' % self.address)
535 self.poller = zmq.Poller()
535 self.poller = zmq.Poller()
536 self.poller.register(self.socket, zmq.POLLIN)
536 self.poller.register(self.socket, zmq.POLLIN)
537
537
538 def run(self):
538 def run(self):
539 """The thread's main activity. Call start() instead."""
539 """The thread's main activity. Call start() instead."""
540 self._create_socket()
540 self._create_socket()
541 self._running = True
541 self._running = True
542 while self._running:
542 while self._running:
543 if self._pause:
543 if self._pause:
544 time.sleep(self.time_to_dead)
544 time.sleep(self.time_to_dead)
545 else:
545 else:
546 since_last_heartbeat = 0.0
546 since_last_heartbeat = 0.0
547 request_time = time.time()
547 request_time = time.time()
548 try:
548 try:
549 #io.rprint('Ping from HB channel') # dbg
549 #io.rprint('Ping from HB channel') # dbg
550 self.socket.send(b'ping')
550 self.socket.send(b'ping')
551 except zmq.ZMQError, e:
551 except zmq.ZMQError, e:
552 #io.rprint('*** HB Error:', e) # dbg
552 #io.rprint('*** HB Error:', e) # dbg
553 if e.errno == zmq.EFSM:
553 if e.errno == zmq.EFSM:
554 #io.rprint('sleep...', self.time_to_dead) # dbg
554 #io.rprint('sleep...', self.time_to_dead) # dbg
555 time.sleep(self.time_to_dead)
555 time.sleep(self.time_to_dead)
556 self._create_socket()
556 self._create_socket()
557 else:
557 else:
558 raise
558 raise
559 else:
559 else:
560 while True:
560 while True:
561 try:
561 try:
562 self.socket.recv(zmq.NOBLOCK)
562 self.socket.recv(zmq.NOBLOCK)
563 except zmq.ZMQError, e:
563 except zmq.ZMQError, e:
564 #io.rprint('*** HB Error 2:', e) # dbg
564 #io.rprint('*** HB Error 2:', e) # dbg
565 if e.errno == zmq.EAGAIN:
565 if e.errno == zmq.EAGAIN:
566 before_poll = time.time()
566 before_poll = time.time()
567 until_dead = self.time_to_dead - (before_poll -
567 until_dead = self.time_to_dead - (before_poll -
568 request_time)
568 request_time)
569
569
570 # When the return value of poll() is an empty
570 # When the return value of poll() is an empty
571 # list, that is when things have gone wrong
571 # list, that is when things have gone wrong
572 # (zeromq bug). As long as it is not an empty
572 # (zeromq bug). As long as it is not an empty
573 # list, poll is working correctly even if it
573 # list, poll is working correctly even if it
574 # returns quickly. Note: poll timeout is in
574 # returns quickly. Note: poll timeout is in
575 # milliseconds.
575 # milliseconds.
576 self.poller.poll(1000*until_dead)
576 if until_dead > 0.0:
577 self.poller.poll(1000 * until_dead)
577
578
578 since_last_heartbeat = time.time()-request_time
579 since_last_heartbeat = time.time()-request_time
579 if since_last_heartbeat > self.time_to_dead:
580 if since_last_heartbeat > self.time_to_dead:
580 self.call_handlers(since_last_heartbeat)
581 self.call_handlers(since_last_heartbeat)
581 break
582 break
582 else:
583 else:
583 # FIXME: We should probably log this instead.
584 # FIXME: We should probably log this instead.
584 raise
585 raise
585 else:
586 else:
586 until_dead = self.time_to_dead - (time.time() -
587 until_dead = self.time_to_dead - (time.time() -
587 request_time)
588 request_time)
588 if until_dead > 0.0:
589 if until_dead > 0.0:
589 #io.rprint('sleep...', self.time_to_dead) # dbg
590 #io.rprint('sleep...', self.time_to_dead) # dbg
590 time.sleep(until_dead)
591 time.sleep(until_dead)
591 break
592 break
592
593
593 def pause(self):
594 def pause(self):
594 """Pause the heartbeat."""
595 """Pause the heartbeat."""
595 self._pause = True
596 self._pause = True
596
597
597 def unpause(self):
598 def unpause(self):
598 """Unpause the heartbeat."""
599 """Unpause the heartbeat."""
599 self._pause = False
600 self._pause = False
600
601
601 def is_beating(self):
602 def is_beating(self):
602 """Is the heartbeat running and not paused."""
603 """Is the heartbeat running and not paused."""
603 if self.is_alive() and not self._pause:
604 if self.is_alive() and not self._pause:
604 return True
605 return True
605 else:
606 else:
606 return False
607 return False
607
608
608 def stop(self):
609 def stop(self):
609 self._running = False
610 self._running = False
610 super(HBSocketChannel, self).stop()
611 super(HBSocketChannel, self).stop()
611
612
612 def call_handlers(self, since_last_heartbeat):
613 def call_handlers(self, since_last_heartbeat):
613 """This method is called in the ioloop thread when a message arrives.
614 """This method is called in the ioloop thread when a message arrives.
614
615
615 Subclasses should override this method to handle incoming messages.
616 Subclasses should override this method to handle incoming messages.
616 It is important to remember that this method is called in the thread
617 It is important to remember that this method is called in the thread
617 so that some logic must be done to ensure that the application leve
618 so that some logic must be done to ensure that the application leve
618 handlers are called in the application thread.
619 handlers are called in the application thread.
619 """
620 """
620 raise NotImplementedError('call_handlers must be defined in a subclass.')
621 raise NotImplementedError('call_handlers must be defined in a subclass.')
621
622
622
623
623 #-----------------------------------------------------------------------------
624 #-----------------------------------------------------------------------------
624 # Main kernel manager class
625 # Main kernel manager class
625 #-----------------------------------------------------------------------------
626 #-----------------------------------------------------------------------------
626
627
627 class KernelManager(HasTraits):
628 class KernelManager(HasTraits):
628 """ Manages a kernel for a frontend.
629 """ Manages a kernel for a frontend.
629
630
630 The SUB channel is for the frontend to receive messages published by the
631 The SUB channel is for the frontend to receive messages published by the
631 kernel.
632 kernel.
632
633
633 The REQ channel is for the frontend to make requests of the kernel.
634 The REQ channel is for the frontend to make requests of the kernel.
634
635
635 The REP channel is for the kernel to request stdin (raw_input) from the
636 The REP channel is for the kernel to request stdin (raw_input) from the
636 frontend.
637 frontend.
637 """
638 """
638 # The PyZMQ Context to use for communication with the kernel.
639 # The PyZMQ Context to use for communication with the kernel.
639 context = Instance(zmq.Context,(),{})
640 context = Instance(zmq.Context,(),{})
640
641
641 # The Session to use for communication with the kernel.
642 # The Session to use for communication with the kernel.
642 session = Instance(Session,(),{})
643 session = Instance(Session,(),{})
643
644
644 # The kernel process with which the KernelManager is communicating.
645 # The kernel process with which the KernelManager is communicating.
645 kernel = Instance(Popen)
646 kernel = Instance(Popen)
646
647
647 # The addresses for the communication channels.
648 # The addresses for the communication channels.
648 xreq_address = TCPAddress((LOCALHOST, 0))
649 xreq_address = TCPAddress((LOCALHOST, 0))
649 sub_address = TCPAddress((LOCALHOST, 0))
650 sub_address = TCPAddress((LOCALHOST, 0))
650 rep_address = TCPAddress((LOCALHOST, 0))
651 rep_address = TCPAddress((LOCALHOST, 0))
651 hb_address = TCPAddress((LOCALHOST, 0))
652 hb_address = TCPAddress((LOCALHOST, 0))
652
653
653 # The classes to use for the various channels.
654 # The classes to use for the various channels.
654 xreq_channel_class = Type(XReqSocketChannel)
655 xreq_channel_class = Type(XReqSocketChannel)
655 sub_channel_class = Type(SubSocketChannel)
656 sub_channel_class = Type(SubSocketChannel)
656 rep_channel_class = Type(RepSocketChannel)
657 rep_channel_class = Type(RepSocketChannel)
657 hb_channel_class = Type(HBSocketChannel)
658 hb_channel_class = Type(HBSocketChannel)
658
659
659 # Protected traits.
660 # Protected traits.
660 _launch_args = Any
661 _launch_args = Any
661 _xreq_channel = Any
662 _xreq_channel = Any
662 _sub_channel = Any
663 _sub_channel = Any
663 _rep_channel = Any
664 _rep_channel = Any
664 _hb_channel = Any
665 _hb_channel = Any
665
666
666 def __init__(self, **kwargs):
667 def __init__(self, **kwargs):
667 super(KernelManager, self).__init__(**kwargs)
668 super(KernelManager, self).__init__(**kwargs)
668 # Uncomment this to try closing the context.
669 # Uncomment this to try closing the context.
669 # atexit.register(self.context.close)
670 # atexit.register(self.context.close)
670
671
671 #--------------------------------------------------------------------------
672 #--------------------------------------------------------------------------
672 # Channel management methods:
673 # Channel management methods:
673 #--------------------------------------------------------------------------
674 #--------------------------------------------------------------------------
674
675
675 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
676 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
676 """Starts the channels for this kernel.
677 """Starts the channels for this kernel.
677
678
678 This will create the channels if they do not exist and then start
679 This will create the channels if they do not exist and then start
679 them. If port numbers of 0 are being used (random ports) then you
680 them. If port numbers of 0 are being used (random ports) then you
680 must first call :method:`start_kernel`. If the channels have been
681 must first call :method:`start_kernel`. If the channels have been
681 stopped and you call this, :class:`RuntimeError` will be raised.
682 stopped and you call this, :class:`RuntimeError` will be raised.
682 """
683 """
683 if xreq:
684 if xreq:
684 self.xreq_channel.start()
685 self.xreq_channel.start()
685 if sub:
686 if sub:
686 self.sub_channel.start()
687 self.sub_channel.start()
687 if rep:
688 if rep:
688 self.rep_channel.start()
689 self.rep_channel.start()
689 if hb:
690 if hb:
690 self.hb_channel.start()
691 self.hb_channel.start()
691
692
692 def stop_channels(self):
693 def stop_channels(self):
693 """Stops all the running channels for this kernel.
694 """Stops all the running channels for this kernel.
694 """
695 """
695 if self.xreq_channel.is_alive():
696 if self.xreq_channel.is_alive():
696 self.xreq_channel.stop()
697 self.xreq_channel.stop()
697 if self.sub_channel.is_alive():
698 if self.sub_channel.is_alive():
698 self.sub_channel.stop()
699 self.sub_channel.stop()
699 if self.rep_channel.is_alive():
700 if self.rep_channel.is_alive():
700 self.rep_channel.stop()
701 self.rep_channel.stop()
701 if self.hb_channel.is_alive():
702 if self.hb_channel.is_alive():
702 self.hb_channel.stop()
703 self.hb_channel.stop()
703
704
704 @property
705 @property
705 def channels_running(self):
706 def channels_running(self):
706 """Are any of the channels created and running?"""
707 """Are any of the channels created and running?"""
707 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
708 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
708 self.rep_channel.is_alive() or self.hb_channel.is_alive())
709 self.rep_channel.is_alive() or self.hb_channel.is_alive())
709
710
710 #--------------------------------------------------------------------------
711 #--------------------------------------------------------------------------
711 # Kernel process management methods:
712 # Kernel process management methods:
712 #--------------------------------------------------------------------------
713 #--------------------------------------------------------------------------
713
714
714 def start_kernel(self, **kw):
715 def start_kernel(self, **kw):
715 """Starts a kernel process and configures the manager to use it.
716 """Starts a kernel process and configures the manager to use it.
716
717
717 If random ports (port=0) are being used, this method must be called
718 If random ports (port=0) are being used, this method must be called
718 before the channels are created.
719 before the channels are created.
719
720
720 Parameters:
721 Parameters:
721 -----------
722 -----------
722 ipython : bool, optional (default True)
723 ipython : bool, optional (default True)
723 Whether to use an IPython kernel instead of a plain Python kernel.
724 Whether to use an IPython kernel instead of a plain Python kernel.
724
725
725 **kw : optional
726 **kw : optional
726 See respective options for IPython and Python kernels.
727 See respective options for IPython and Python kernels.
727 """
728 """
728 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
729 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
729 self.rep_address, self.hb_address
730 self.rep_address, self.hb_address
730 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
731 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
731 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
732 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
732 raise RuntimeError("Can only launch a kernel on a local interface. "
733 raise RuntimeError("Can only launch a kernel on a local interface. "
733 "Make sure that the '*_address' attributes are "
734 "Make sure that the '*_address' attributes are "
734 "configured properly. "
735 "configured properly. "
735 "Currently valid addresses are: %s"%LOCAL_IPS
736 "Currently valid addresses are: %s"%LOCAL_IPS
736 )
737 )
737
738
738 self._launch_args = kw.copy()
739 self._launch_args = kw.copy()
739 if kw.pop('ipython', True):
740 if kw.pop('ipython', True):
740 from ipkernel import launch_kernel
741 from ipkernel import launch_kernel
741 else:
742 else:
742 from pykernel import launch_kernel
743 from pykernel import launch_kernel
743 self.kernel, xrep, pub, req, _hb = launch_kernel(
744 self.kernel, xrep, pub, req, _hb = launch_kernel(
744 xrep_port=xreq[1], pub_port=sub[1],
745 xrep_port=xreq[1], pub_port=sub[1],
745 req_port=rep[1], hb_port=hb[1], **kw)
746 req_port=rep[1], hb_port=hb[1], **kw)
746 self.xreq_address = (xreq[0], xrep)
747 self.xreq_address = (xreq[0], xrep)
747 self.sub_address = (sub[0], pub)
748 self.sub_address = (sub[0], pub)
748 self.rep_address = (rep[0], req)
749 self.rep_address = (rep[0], req)
749 self.hb_address = (hb[0], _hb)
750 self.hb_address = (hb[0], _hb)
750
751
751 def shutdown_kernel(self, restart=False):
752 def shutdown_kernel(self, restart=False):
752 """ Attempts to the stop the kernel process cleanly. If the kernel
753 """ Attempts to the stop the kernel process cleanly. If the kernel
753 cannot be stopped, it is killed, if possible.
754 cannot be stopped, it is killed, if possible.
754 """
755 """
755 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
756 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
756 if sys.platform == 'win32':
757 if sys.platform == 'win32':
757 self.kill_kernel()
758 self.kill_kernel()
758 return
759 return
759
760
760 # Pause the heart beat channel if it exists.
761 # Pause the heart beat channel if it exists.
761 if self._hb_channel is not None:
762 if self._hb_channel is not None:
762 self._hb_channel.pause()
763 self._hb_channel.pause()
763
764
764 # Don't send any additional kernel kill messages immediately, to give
765 # Don't send any additional kernel kill messages immediately, to give
765 # the kernel a chance to properly execute shutdown actions. Wait for at
766 # the kernel a chance to properly execute shutdown actions. Wait for at
766 # most 1s, checking every 0.1s.
767 # most 1s, checking every 0.1s.
767 self.xreq_channel.shutdown(restart=restart)
768 self.xreq_channel.shutdown(restart=restart)
768 for i in range(10):
769 for i in range(10):
769 if self.is_alive:
770 if self.is_alive:
770 time.sleep(0.1)
771 time.sleep(0.1)
771 else:
772 else:
772 break
773 break
773 else:
774 else:
774 # OK, we've waited long enough.
775 # OK, we've waited long enough.
775 if self.has_kernel:
776 if self.has_kernel:
776 self.kill_kernel()
777 self.kill_kernel()
777
778
778 def restart_kernel(self, now=False, **kw):
779 def restart_kernel(self, now=False, **kw):
779 """Restarts a kernel with the arguments that were used to launch it.
780 """Restarts a kernel with the arguments that were used to launch it.
780
781
781 If the old kernel was launched with random ports, the same ports will be
782 If the old kernel was launched with random ports, the same ports will be
782 used for the new kernel.
783 used for the new kernel.
783
784
784 Parameters
785 Parameters
785 ----------
786 ----------
786 now : bool, optional
787 now : bool, optional
787 If True, the kernel is forcefully restarted *immediately*, without
788 If True, the kernel is forcefully restarted *immediately*, without
788 having a chance to do any cleanup action. Otherwise the kernel is
789 having a chance to do any cleanup action. Otherwise the kernel is
789 given 1s to clean up before a forceful restart is issued.
790 given 1s to clean up before a forceful restart is issued.
790
791
791 In all cases the kernel is restarted, the only difference is whether
792 In all cases the kernel is restarted, the only difference is whether
792 it is given a chance to perform a clean shutdown or not.
793 it is given a chance to perform a clean shutdown or not.
793
794
794 **kw : optional
795 **kw : optional
795 Any options specified here will replace those used to launch the
796 Any options specified here will replace those used to launch the
796 kernel.
797 kernel.
797 """
798 """
798 if self._launch_args is None:
799 if self._launch_args is None:
799 raise RuntimeError("Cannot restart the kernel. "
800 raise RuntimeError("Cannot restart the kernel. "
800 "No previous call to 'start_kernel'.")
801 "No previous call to 'start_kernel'.")
801 else:
802 else:
802 # Stop currently running kernel.
803 # Stop currently running kernel.
803 if self.has_kernel:
804 if self.has_kernel:
804 if now:
805 if now:
805 self.kill_kernel()
806 self.kill_kernel()
806 else:
807 else:
807 self.shutdown_kernel(restart=True)
808 self.shutdown_kernel(restart=True)
808
809
809 # Start new kernel.
810 # Start new kernel.
810 self._launch_args.update(kw)
811 self._launch_args.update(kw)
811 self.start_kernel(**self._launch_args)
812 self.start_kernel(**self._launch_args)
812
813
813 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
814 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
814 # unless there is some delay here.
815 # unless there is some delay here.
815 if sys.platform == 'win32':
816 if sys.platform == 'win32':
816 time.sleep(0.2)
817 time.sleep(0.2)
817
818
818 @property
819 @property
819 def has_kernel(self):
820 def has_kernel(self):
820 """Returns whether a kernel process has been specified for the kernel
821 """Returns whether a kernel process has been specified for the kernel
821 manager.
822 manager.
822 """
823 """
823 return self.kernel is not None
824 return self.kernel is not None
824
825
825 def kill_kernel(self):
826 def kill_kernel(self):
826 """ Kill the running kernel. """
827 """ Kill the running kernel. """
827 if self.has_kernel:
828 if self.has_kernel:
828 # Pause the heart beat channel if it exists.
829 # Pause the heart beat channel if it exists.
829 if self._hb_channel is not None:
830 if self._hb_channel is not None:
830 self._hb_channel.pause()
831 self._hb_channel.pause()
831
832
832 # Attempt to kill the kernel.
833 # Attempt to kill the kernel.
833 try:
834 try:
834 self.kernel.kill()
835 self.kernel.kill()
835 except OSError, e:
836 except OSError, e:
836 # In Windows, we will get an Access Denied error if the process
837 # In Windows, we will get an Access Denied error if the process
837 # has already terminated. Ignore it.
838 # has already terminated. Ignore it.
838 if sys.platform == 'win32':
839 if sys.platform == 'win32':
839 if e.winerror != 5:
840 if e.winerror != 5:
840 raise
841 raise
841 # On Unix, we may get an ESRCH error if the process has already
842 # On Unix, we may get an ESRCH error if the process has already
842 # terminated. Ignore it.
843 # terminated. Ignore it.
843 else:
844 else:
844 from errno import ESRCH
845 from errno import ESRCH
845 if e.errno != ESRCH:
846 if e.errno != ESRCH:
846 raise
847 raise
847 self.kernel = None
848 self.kernel = None
848 else:
849 else:
849 raise RuntimeError("Cannot kill kernel. No kernel is running!")
850 raise RuntimeError("Cannot kill kernel. No kernel is running!")
850
851
851 def interrupt_kernel(self):
852 def interrupt_kernel(self):
852 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
853 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
853 well supported on all platforms.
854 well supported on all platforms.
854 """
855 """
855 if self.has_kernel:
856 if self.has_kernel:
856 if sys.platform == 'win32':
857 if sys.platform == 'win32':
857 from parentpoller import ParentPollerWindows as Poller
858 from parentpoller import ParentPollerWindows as Poller
858 Poller.send_interrupt(self.kernel.win32_interrupt_event)
859 Poller.send_interrupt(self.kernel.win32_interrupt_event)
859 else:
860 else:
860 self.kernel.send_signal(signal.SIGINT)
861 self.kernel.send_signal(signal.SIGINT)
861 else:
862 else:
862 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
863 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
863
864
864 def signal_kernel(self, signum):
865 def signal_kernel(self, signum):
865 """ Sends a signal to the kernel. Note that since only SIGTERM is
866 """ Sends a signal to the kernel. Note that since only SIGTERM is
866 supported on Windows, this function is only useful on Unix systems.
867 supported on Windows, this function is only useful on Unix systems.
867 """
868 """
868 if self.has_kernel:
869 if self.has_kernel:
869 self.kernel.send_signal(signum)
870 self.kernel.send_signal(signum)
870 else:
871 else:
871 raise RuntimeError("Cannot signal kernel. No kernel is running!")
872 raise RuntimeError("Cannot signal kernel. No kernel is running!")
872
873
873 @property
874 @property
874 def is_alive(self):
875 def is_alive(self):
875 """Is the kernel process still running?"""
876 """Is the kernel process still running?"""
876 # FIXME: not using a heartbeat means this method is broken for any
877 # FIXME: not using a heartbeat means this method is broken for any
877 # remote kernel, it's only capable of handling local kernels.
878 # remote kernel, it's only capable of handling local kernels.
878 if self.has_kernel:
879 if self.has_kernel:
879 if self.kernel.poll() is None:
880 if self.kernel.poll() is None:
880 return True
881 return True
881 else:
882 else:
882 return False
883 return False
883 else:
884 else:
884 # We didn't start the kernel with this KernelManager so we don't
885 # We didn't start the kernel with this KernelManager so we don't
885 # know if it is running. We should use a heartbeat for this case.
886 # know if it is running. We should use a heartbeat for this case.
886 return True
887 return True
887
888
888 #--------------------------------------------------------------------------
889 #--------------------------------------------------------------------------
889 # Channels used for communication with the kernel:
890 # Channels used for communication with the kernel:
890 #--------------------------------------------------------------------------
891 #--------------------------------------------------------------------------
891
892
892 @property
893 @property
893 def xreq_channel(self):
894 def xreq_channel(self):
894 """Get the REQ socket channel object to make requests of the kernel."""
895 """Get the REQ socket channel object to make requests of the kernel."""
895 if self._xreq_channel is None:
896 if self._xreq_channel is None:
896 self._xreq_channel = self.xreq_channel_class(self.context,
897 self._xreq_channel = self.xreq_channel_class(self.context,
897 self.session,
898 self.session,
898 self.xreq_address)
899 self.xreq_address)
899 return self._xreq_channel
900 return self._xreq_channel
900
901
901 @property
902 @property
902 def sub_channel(self):
903 def sub_channel(self):
903 """Get the SUB socket channel object."""
904 """Get the SUB socket channel object."""
904 if self._sub_channel is None:
905 if self._sub_channel is None:
905 self._sub_channel = self.sub_channel_class(self.context,
906 self._sub_channel = self.sub_channel_class(self.context,
906 self.session,
907 self.session,
907 self.sub_address)
908 self.sub_address)
908 return self._sub_channel
909 return self._sub_channel
909
910
910 @property
911 @property
911 def rep_channel(self):
912 def rep_channel(self):
912 """Get the REP socket channel object to handle stdin (raw_input)."""
913 """Get the REP socket channel object to handle stdin (raw_input)."""
913 if self._rep_channel is None:
914 if self._rep_channel is None:
914 self._rep_channel = self.rep_channel_class(self.context,
915 self._rep_channel = self.rep_channel_class(self.context,
915 self.session,
916 self.session,
916 self.rep_address)
917 self.rep_address)
917 return self._rep_channel
918 return self._rep_channel
918
919
919 @property
920 @property
920 def hb_channel(self):
921 def hb_channel(self):
921 """Get the heartbeat socket channel object to check that the
922 """Get the heartbeat socket channel object to check that the
922 kernel is alive."""
923 kernel is alive."""
923 if self._hb_channel is None:
924 if self._hb_channel is None:
924 self._hb_channel = self.hb_channel_class(self.context,
925 self._hb_channel = self.hb_channel_class(self.context,
925 self.session,
926 self.session,
926 self.hb_address)
927 self.hb_address)
927 return self._hb_channel
928 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now