##// END OF EJS Templates
Correction to docstring...
Thomas Kluyver -
Show More
@@ -1,909 +1,910 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 from Queue import Queue, Empty
20 from Queue import Queue, Empty
21 from subprocess import Popen
21 from subprocess import Popen
22 import signal
22 import signal
23 import sys
23 import sys
24 from threading import Thread
24 from threading import Thread
25 import time
25 import time
26 import logging
26 import logging
27
27
28 # System library imports.
28 # System library imports.
29 import zmq
29 import zmq
30 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq.eventloop import ioloop
31 from zmq.eventloop import ioloop
32
32
33 # Local imports.
33 # Local imports.
34 from IPython.utils import io
34 from IPython.utils import io
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 from session import Session, Message
37 from session import Session, Message
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Constants and exceptions
40 # Constants and exceptions
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
44 pass
44 pass
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Utility functions
47 # Utility functions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
51 # if they prove to have more generic utility
51 # if they prove to have more generic utility
52
52
53 def validate_string_list(lst):
53 def validate_string_list(lst):
54 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
55
55
56 Raises ValueError if not."""
56 Raises ValueError if not."""
57 if not isinstance(lst, list):
57 if not isinstance(lst, list):
58 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
59 for x in lst:
59 for x in lst:
60 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
61 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
62
62
63
63
64 def validate_string_dict(dct):
64 def validate_string_dict(dct):
65 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
66
66
67 Raises ValueError if not."""
67 Raises ValueError if not."""
68 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
69 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
70 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
71 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
72 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
73
73
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
80 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
81 """
81 """
82 context = None
82 context = None
83 session = None
83 session = None
84 socket = None
84 socket = None
85 ioloop = None
85 ioloop = None
86 iostate = None
86 iostate = None
87 _address = None
87 _address = None
88
88
89 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
90 """Create a channel
90 """Create a channel
91
91
92 Parameters
92 Parameters
93 ----------
93 ----------
94 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
95 The ZMQ context to use.
95 The ZMQ context to use.
96 session : :class:`session.Session`
96 session : :class:`session.Session`
97 The session to use.
97 The session to use.
98 address : tuple
98 address : tuple
99 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
100 """
100 """
101 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
102 self.daemon = True
102 self.daemon = True
103
103
104 self.context = context
104 self.context = context
105 self.session = session
105 self.session = session
106 if address[1] == 0:
106 if address[1] == 0:
107 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
108 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
109 self._address = address
109 self._address = address
110
110
111 def stop(self):
111 def stop(self):
112 """Stop the channel's activity.
112 """Stop the channel's activity.
113
113
114 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
115 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
116 :method:`self.start` is called again.
116 :method:`self.start` is called again.
117 """
117 """
118 self.join()
118 self.join()
119
119
120 @property
120 @property
121 def address(self):
121 def address(self):
122 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
123
123
124 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
125 port.
125 port.
126 """
126 """
127 return self._address
127 return self._address
128
128
129 def add_io_state(self, state):
129 def add_io_state(self, state):
130 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
131
131
132 Parameters
132 Parameters
133 ----------
133 ----------
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 The IO state flag to set.
135 The IO state flag to set.
136
136
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 """
138 """
139 def add_io_state_callback():
139 def add_io_state_callback():
140 if not self.iostate & state:
140 if not self.iostate & state:
141 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
143 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
144
144
145 def drop_io_state(self, state):
145 def drop_io_state(self, state):
146 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 The IO state flag to set.
151 The IO state flag to set.
152
152
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 """
154 """
155 def drop_io_state_callback():
155 def drop_io_state_callback():
156 if self.iostate & state:
156 if self.iostate & state:
157 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
159 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
160
160
161
161
162 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
163 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
164 """
164 """
165
165
166 command_queue = None
166 command_queue = None
167
167
168 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
169 super(XReqSocketChannel, self).__init__(context, session, address)
169 super(XReqSocketChannel, self).__init__(context, session, address)
170 self.command_queue = Queue()
170 self.command_queue = Queue()
171 self.ioloop = ioloop.IOLoop()
171 self.ioloop = ioloop.IOLoop()
172
172
173 def run(self):
173 def run(self):
174 """The thread's main activity. Call start() instead."""
174 """The thread's main activity. Call start() instead."""
175 self.socket = self.context.socket(zmq.XREQ)
175 self.socket = self.context.socket(zmq.XREQ)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 self.socket.connect('tcp://%s:%i' % self.address)
177 self.socket.connect('tcp://%s:%i' % self.address)
178 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
180 self.iostate)
180 self.iostate)
181 self.ioloop.start()
181 self.ioloop.start()
182
182
183 def stop(self):
183 def stop(self):
184 self.ioloop.stop()
184 self.ioloop.stop()
185 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
186
186
187 def call_handlers(self, msg):
187 def call_handlers(self, msg):
188 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
189
189
190 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
191 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
192 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
193 handlers are called in the application thread.
193 handlers are called in the application thread.
194 """
194 """
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196
196
197 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
198 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
199 """Execute code in the kernel.
199 """Execute code in the kernel.
200
200
201 Parameters
201 Parameters
202 ----------
202 ----------
203 code : str
203 code : str
204 A string of Python code.
204 A string of Python code.
205
205
206 silent : bool, optional (default False)
206 silent : bool, optional (default False)
207 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
208
208
209 user_variables : list, optional
209 user_variables : list, optional
210 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
211 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
212 :func:`repr` as values.
212 :func:`repr` as values.
213
213
214 user_expressions : dict, optional
214 user_expressions : dict, optional
215 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
216 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
217 and their :func:`repr` as values.
217 and their :func:`repr` as values.
218
218
219 Returns
219 Returns
220 -------
220 -------
221 The msg_id of the message sent.
221 The msg_id of the message sent.
222 """
222 """
223 if user_variables is None:
223 if user_variables is None:
224 user_variables = []
224 user_variables = []
225 if user_expressions is None:
225 if user_expressions is None:
226 user_expressions = {}
226 user_expressions = {}
227
227
228 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
229 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
230 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
231 validate_string_list(user_variables)
231 validate_string_list(user_variables)
232 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
233
233
234 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
235 # not in Session.
235 # not in Session.
236 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
237 user_variables=user_variables,
237 user_variables=user_variables,
238 user_expressions=user_expressions)
238 user_expressions=user_expressions)
239 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
240 self._queue_request(msg)
240 self._queue_request(msg)
241 return msg['header']['msg_id']
241 return msg['header']['msg_id']
242
242
243 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
244 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
245
245
246 Parameters
246 Parameters
247 ----------
247 ----------
248 text : str
248 text : str
249 The text to complete.
249 The text to complete.
250 line : str
250 line : str
251 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
252 text to complete.
252 text to complete.
253 cursor_pos : int
253 cursor_pos : int
254 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
255 requested.
255 requested.
256 block : str, optional
256 block : str, optional
257 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
258
258
259 Returns
259 Returns
260 -------
260 -------
261 The msg_id of the message sent.
261 The msg_id of the message sent.
262 """
262 """
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
265 self._queue_request(msg)
265 self._queue_request(msg)
266 return msg['header']['msg_id']
266 return msg['header']['msg_id']
267
267
268 def object_info(self, oname):
268 def object_info(self, oname):
269 """Get metadata information about an object.
269 """Get metadata information about an object.
270
270
271 Parameters
271 Parameters
272 ----------
272 ----------
273 oname : str
273 oname : str
274 A string specifying the object name.
274 A string specifying the object name.
275
275
276 Returns
276 Returns
277 -------
277 -------
278 The msg_id of the message sent.
278 The msg_id of the message sent.
279 """
279 """
280 content = dict(oname=oname)
280 content = dict(oname=oname)
281 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
282 self._queue_request(msg)
282 self._queue_request(msg)
283 return msg['header']['msg_id']
283 return msg['header']['msg_id']
284
284
285 def history(self, index=None, raw=False, output=True):
285 def history(self, index=None, raw=False, output=True):
286 """Get the history list.
286 """Get the history list.
287
287
288 Parameters
288 Parameters
289 ----------
289 ----------
290 index : n or (n1, n2) or None
290 index : n or (n1, n2) or None
291 If n, then the last entries. If a tuple, then all in
291 If n, then the last entries. If a tuple, then all in
292 range(n1, n2). If None, then all entries. Raises IndexError if
292 range(n1, n2). If None, then all entries. Raises IndexError if
293 the format of index is incorrect.
293 the format of index is incorrect.
294 raw : bool
294 raw : bool
295 If True, return the raw input.
295 If True, return the raw input.
296 output : bool
296 output : bool
297 If True, then return the output as well.
297 If True, then return the output as well.
298
298
299 Returns
299 Returns
300 -------
300 -------
301 The msg_id of the message sent.
301 The msg_id of the message sent.
302 """
302 """
303 content = dict(index=index, raw=raw, output=output)
303 content = dict(index=index, raw=raw, output=output)
304 msg = self.session.msg('history_request', content)
304 msg = self.session.msg('history_request', content)
305 self._queue_request(msg)
305 self._queue_request(msg)
306 return msg['header']['msg_id']
306 return msg['header']['msg_id']
307
307
308 def shutdown(self, restart=False):
308 def shutdown(self, restart=False):
309 """Request an immediate kernel shutdown.
309 """Request an immediate kernel shutdown.
310
310
311 Upon receipt of the (empty) reply, client code can safely assume that
311 Upon receipt of the (empty) reply, client code can safely assume that
312 the kernel has shut down and it's safe to forcefully terminate it if
312 the kernel has shut down and it's safe to forcefully terminate it if
313 it's still alive.
313 it's still alive.
314
314
315 The kernel will send the reply via a function registered with Python's
315 The kernel will send the reply via a function registered with Python's
316 atexit module, ensuring it's truly done as the kernel is done with all
316 atexit module, ensuring it's truly done as the kernel is done with all
317 normal operation.
317 normal operation.
318 """
318 """
319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # Send quit message to kernel. Once we implement kernel-side setattr,
320 # this should probably be done that way, but for now this will do.
320 # this should probably be done that way, but for now this will do.
321 msg = self.session.msg('shutdown_request', {'restart':restart})
321 msg = self.session.msg('shutdown_request', {'restart':restart})
322 self._queue_request(msg)
322 self._queue_request(msg)
323 return msg['header']['msg_id']
323 return msg['header']['msg_id']
324
324
325 def _handle_events(self, socket, events):
325 def _handle_events(self, socket, events):
326 if events & POLLERR:
326 if events & POLLERR:
327 self._handle_err()
327 self._handle_err()
328 if events & POLLOUT:
328 if events & POLLOUT:
329 self._handle_send()
329 self._handle_send()
330 if events & POLLIN:
330 if events & POLLIN:
331 self._handle_recv()
331 self._handle_recv()
332
332
333 def _handle_recv(self):
333 def _handle_recv(self):
334 ident,msg = self.session.recv(self.socket, 0)
334 ident,msg = self.session.recv(self.socket, 0)
335 self.call_handlers(msg)
335 self.call_handlers(msg)
336
336
337 def _handle_send(self):
337 def _handle_send(self):
338 try:
338 try:
339 msg = self.command_queue.get(False)
339 msg = self.command_queue.get(False)
340 except Empty:
340 except Empty:
341 pass
341 pass
342 else:
342 else:
343 self.session.send(self.socket,msg)
343 self.session.send(self.socket,msg)
344 if self.command_queue.empty():
344 if self.command_queue.empty():
345 self.drop_io_state(POLLOUT)
345 self.drop_io_state(POLLOUT)
346
346
347 def _handle_err(self):
347 def _handle_err(self):
348 # We don't want to let this go silently, so eventually we should log.
348 # We don't want to let this go silently, so eventually we should log.
349 raise zmq.ZMQError()
349 raise zmq.ZMQError()
350
350
351 def _queue_request(self, msg):
351 def _queue_request(self, msg):
352 self.command_queue.put(msg)
352 self.command_queue.put(msg)
353 self.add_io_state(POLLOUT)
353 self.add_io_state(POLLOUT)
354
354
355
355
356 class SubSocketChannel(ZmqSocketChannel):
356 class SubSocketChannel(ZmqSocketChannel):
357 """The SUB channel which listens for messages that the kernel publishes.
357 """The SUB channel which listens for messages that the kernel publishes.
358 """
358 """
359
359
360 def __init__(self, context, session, address):
360 def __init__(self, context, session, address):
361 super(SubSocketChannel, self).__init__(context, session, address)
361 super(SubSocketChannel, self).__init__(context, session, address)
362 self.ioloop = ioloop.IOLoop()
362 self.ioloop = ioloop.IOLoop()
363
363
364 def run(self):
364 def run(self):
365 """The thread's main activity. Call start() instead."""
365 """The thread's main activity. Call start() instead."""
366 self.socket = self.context.socket(zmq.SUB)
366 self.socket = self.context.socket(zmq.SUB)
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 self.socket.connect('tcp://%s:%i' % self.address)
369 self.socket.connect('tcp://%s:%i' % self.address)
370 self.iostate = POLLIN|POLLERR
370 self.iostate = POLLIN|POLLERR
371 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.ioloop.add_handler(self.socket, self._handle_events,
372 self.iostate)
372 self.iostate)
373 self.ioloop.start()
373 self.ioloop.start()
374
374
375 def stop(self):
375 def stop(self):
376 self.ioloop.stop()
376 self.ioloop.stop()
377 super(SubSocketChannel, self).stop()
377 super(SubSocketChannel, self).stop()
378
378
379 def call_handlers(self, msg):
379 def call_handlers(self, msg):
380 """This method is called in the ioloop thread when a message arrives.
380 """This method is called in the ioloop thread when a message arrives.
381
381
382 Subclasses should override this method to handle incoming messages.
382 Subclasses should override this method to handle incoming messages.
383 It is important to remember that this method is called in the thread
383 It is important to remember that this method is called in the thread
384 so that some logic must be done to ensure that the application leve
384 so that some logic must be done to ensure that the application leve
385 handlers are called in the application thread.
385 handlers are called in the application thread.
386 """
386 """
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
388
388
389 def flush(self, timeout=1.0):
389 def flush(self, timeout=1.0):
390 """Immediately processes all pending messages on the SUB channel.
390 """Immediately processes all pending messages on the SUB channel.
391
391
392 Callers should use this method to ensure that :method:`call_handlers`
392 Callers should use this method to ensure that :method:`call_handlers`
393 has been called for all messages that have been received on the
393 has been called for all messages that have been received on the
394 0MQ SUB socket of this channel.
394 0MQ SUB socket of this channel.
395
395
396 This method is thread safe.
396 This method is thread safe.
397
397
398 Parameters
398 Parameters
399 ----------
399 ----------
400 timeout : float, optional
400 timeout : float, optional
401 The maximum amount of time to spend flushing, in seconds. The
401 The maximum amount of time to spend flushing, in seconds. The
402 default is one second.
402 default is one second.
403 """
403 """
404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # We do the IOLoop callback process twice to ensure that the IOLoop
405 # gets to perform at least one full poll.
405 # gets to perform at least one full poll.
406 stop_time = time.time() + timeout
406 stop_time = time.time() + timeout
407 for i in xrange(2):
407 for i in xrange(2):
408 self._flushed = False
408 self._flushed = False
409 self.ioloop.add_callback(self._flush)
409 self.ioloop.add_callback(self._flush)
410 while not self._flushed and time.time() < stop_time:
410 while not self._flushed and time.time() < stop_time:
411 time.sleep(0.01)
411 time.sleep(0.01)
412
412
413 def _handle_events(self, socket, events):
413 def _handle_events(self, socket, events):
414 # Turn on and off POLLOUT depending on if we have made a request
414 # Turn on and off POLLOUT depending on if we have made a request
415 if events & POLLERR:
415 if events & POLLERR:
416 self._handle_err()
416 self._handle_err()
417 if events & POLLIN:
417 if events & POLLIN:
418 self._handle_recv()
418 self._handle_recv()
419
419
420 def _handle_err(self):
420 def _handle_err(self):
421 # We don't want to let this go silently, so eventually we should log.
421 # We don't want to let this go silently, so eventually we should log.
422 raise zmq.ZMQError()
422 raise zmq.ZMQError()
423
423
424 def _handle_recv(self):
424 def _handle_recv(self):
425 # Get all of the messages we can
425 # Get all of the messages we can
426 while True:
426 while True:
427 try:
427 try:
428 ident,msg = self.session.recv(self.socket)
428 ident,msg = self.session.recv(self.socket)
429 except zmq.ZMQError:
429 except zmq.ZMQError:
430 # Check the errno?
430 # Check the errno?
431 # Will this trigger POLLERR?
431 # Will this trigger POLLERR?
432 break
432 break
433 else:
433 else:
434 if msg is None:
434 if msg is None:
435 break
435 break
436 self.call_handlers(msg)
436 self.call_handlers(msg)
437
437
438 def _flush(self):
438 def _flush(self):
439 """Callback for :method:`self.flush`."""
439 """Callback for :method:`self.flush`."""
440 self._flushed = True
440 self._flushed = True
441
441
442
442
443 class RepSocketChannel(ZmqSocketChannel):
443 class RepSocketChannel(ZmqSocketChannel):
444 """A reply channel to handle raw_input requests that the kernel makes."""
444 """A reply channel to handle raw_input requests that the kernel makes."""
445
445
446 msg_queue = None
446 msg_queue = None
447
447
448 def __init__(self, context, session, address):
448 def __init__(self, context, session, address):
449 super(RepSocketChannel, self).__init__(context, session, address)
449 super(RepSocketChannel, self).__init__(context, session, address)
450 self.ioloop = ioloop.IOLoop()
450 self.ioloop = ioloop.IOLoop()
451 self.msg_queue = Queue()
451 self.msg_queue = Queue()
452
452
453 def run(self):
453 def run(self):
454 """The thread's main activity. Call start() instead."""
454 """The thread's main activity. Call start() instead."""
455 self.socket = self.context.socket(zmq.XREQ)
455 self.socket = self.context.socket(zmq.XREQ)
456 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
456 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
457 self.socket.connect('tcp://%s:%i' % self.address)
457 self.socket.connect('tcp://%s:%i' % self.address)
458 self.iostate = POLLERR|POLLIN
458 self.iostate = POLLERR|POLLIN
459 self.ioloop.add_handler(self.socket, self._handle_events,
459 self.ioloop.add_handler(self.socket, self._handle_events,
460 self.iostate)
460 self.iostate)
461 self.ioloop.start()
461 self.ioloop.start()
462
462
463 def stop(self):
463 def stop(self):
464 self.ioloop.stop()
464 self.ioloop.stop()
465 super(RepSocketChannel, self).stop()
465 super(RepSocketChannel, self).stop()
466
466
467 def call_handlers(self, msg):
467 def call_handlers(self, msg):
468 """This method is called in the ioloop thread when a message arrives.
468 """This method is called in the ioloop thread when a message arrives.
469
469
470 Subclasses should override this method to handle incoming messages.
470 Subclasses should override this method to handle incoming messages.
471 It is important to remember that this method is called in the thread
471 It is important to remember that this method is called in the thread
472 so that some logic must be done to ensure that the application leve
472 so that some logic must be done to ensure that the application leve
473 handlers are called in the application thread.
473 handlers are called in the application thread.
474 """
474 """
475 raise NotImplementedError('call_handlers must be defined in a subclass.')
475 raise NotImplementedError('call_handlers must be defined in a subclass.')
476
476
477 def input(self, string):
477 def input(self, string):
478 """Send a string of raw input to the kernel."""
478 """Send a string of raw input to the kernel."""
479 content = dict(value=string)
479 content = dict(value=string)
480 msg = self.session.msg('input_reply', content)
480 msg = self.session.msg('input_reply', content)
481 self._queue_reply(msg)
481 self._queue_reply(msg)
482
482
483 def _handle_events(self, socket, events):
483 def _handle_events(self, socket, events):
484 if events & POLLERR:
484 if events & POLLERR:
485 self._handle_err()
485 self._handle_err()
486 if events & POLLOUT:
486 if events & POLLOUT:
487 self._handle_send()
487 self._handle_send()
488 if events & POLLIN:
488 if events & POLLIN:
489 self._handle_recv()
489 self._handle_recv()
490
490
491 def _handle_recv(self):
491 def _handle_recv(self):
492 ident,msg = self.session.recv(self.socket, 0)
492 ident,msg = self.session.recv(self.socket, 0)
493 self.call_handlers(msg)
493 self.call_handlers(msg)
494
494
495 def _handle_send(self):
495 def _handle_send(self):
496 try:
496 try:
497 msg = self.msg_queue.get(False)
497 msg = self.msg_queue.get(False)
498 except Empty:
498 except Empty:
499 pass
499 pass
500 else:
500 else:
501 self.session.send(self.socket,msg)
501 self.session.send(self.socket,msg)
502 if self.msg_queue.empty():
502 if self.msg_queue.empty():
503 self.drop_io_state(POLLOUT)
503 self.drop_io_state(POLLOUT)
504
504
505 def _handle_err(self):
505 def _handle_err(self):
506 # We don't want to let this go silently, so eventually we should log.
506 # We don't want to let this go silently, so eventually we should log.
507 raise zmq.ZMQError()
507 raise zmq.ZMQError()
508
508
509 def _queue_reply(self, msg):
509 def _queue_reply(self, msg):
510 self.msg_queue.put(msg)
510 self.msg_queue.put(msg)
511 self.add_io_state(POLLOUT)
511 self.add_io_state(POLLOUT)
512
512
513
513
514 class HBSocketChannel(ZmqSocketChannel):
514 class HBSocketChannel(ZmqSocketChannel):
515 """The heartbeat channel which monitors the kernel heartbeat.
515 """The heartbeat channel which monitors the kernel heartbeat.
516
516
517 Note that the heartbeat channel is paused by default. As long as you start
517 Note that the heartbeat channel is paused by default. As long as you start
518 this channel, the kernel manager will ensure that it is paused and un-paused
518 this channel, the kernel manager will ensure that it is paused and un-paused
519 as appropriate.
519 as appropriate.
520 """
520 """
521
521
522 time_to_dead = 3.0
522 time_to_dead = 3.0
523 socket = None
523 socket = None
524 poller = None
524 poller = None
525 _running = None
525 _running = None
526 _pause = None
526 _pause = None
527
527
528 def __init__(self, context, session, address):
528 def __init__(self, context, session, address):
529 super(HBSocketChannel, self).__init__(context, session, address)
529 super(HBSocketChannel, self).__init__(context, session, address)
530 self._running = False
530 self._running = False
531 self._pause = True
531 self._pause = True
532
532
533 def _create_socket(self):
533 def _create_socket(self):
534 self.socket = self.context.socket(zmq.REQ)
534 self.socket = self.context.socket(zmq.REQ)
535 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
535 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
536 self.socket.connect('tcp://%s:%i' % self.address)
536 self.socket.connect('tcp://%s:%i' % self.address)
537 self.poller = zmq.Poller()
537 self.poller = zmq.Poller()
538 self.poller.register(self.socket, zmq.POLLIN)
538 self.poller.register(self.socket, zmq.POLLIN)
539
539
540 def run(self):
540 def run(self):
541 """The thread's main activity. Call start() instead."""
541 """The thread's main activity. Call start() instead."""
542 self._create_socket()
542 self._create_socket()
543 self._running = True
543 self._running = True
544 while self._running:
544 while self._running:
545 if self._pause:
545 if self._pause:
546 time.sleep(self.time_to_dead)
546 time.sleep(self.time_to_dead)
547 else:
547 else:
548 since_last_heartbeat = 0.0
548 since_last_heartbeat = 0.0
549 request_time = time.time()
549 request_time = time.time()
550 try:
550 try:
551 #io.rprint('Ping from HB channel') # dbg
551 #io.rprint('Ping from HB channel') # dbg
552 self.socket.send(b'ping')
552 self.socket.send(b'ping')
553 except zmq.ZMQError, e:
553 except zmq.ZMQError, e:
554 #io.rprint('*** HB Error:', e) # dbg
554 #io.rprint('*** HB Error:', e) # dbg
555 if e.errno == zmq.EFSM:
555 if e.errno == zmq.EFSM:
556 #io.rprint('sleep...', self.time_to_dead) # dbg
556 #io.rprint('sleep...', self.time_to_dead) # dbg
557 time.sleep(self.time_to_dead)
557 time.sleep(self.time_to_dead)
558 self._create_socket()
558 self._create_socket()
559 else:
559 else:
560 raise
560 raise
561 else:
561 else:
562 while True:
562 while True:
563 try:
563 try:
564 self.socket.recv(zmq.NOBLOCK)
564 self.socket.recv(zmq.NOBLOCK)
565 except zmq.ZMQError, e:
565 except zmq.ZMQError, e:
566 #io.rprint('*** HB Error 2:', e) # dbg
566 #io.rprint('*** HB Error 2:', e) # dbg
567 if e.errno == zmq.EAGAIN:
567 if e.errno == zmq.EAGAIN:
568 before_poll = time.time()
568 before_poll = time.time()
569 until_dead = self.time_to_dead - (before_poll -
569 until_dead = self.time_to_dead - (before_poll -
570 request_time)
570 request_time)
571
571
572 # When the return value of poll() is an empty
572 # When the return value of poll() is an empty
573 # list, that is when things have gone wrong
573 # list, that is when things have gone wrong
574 # (zeromq bug). As long as it is not an empty
574 # (zeromq bug). As long as it is not an empty
575 # list, poll is working correctly even if it
575 # list, poll is working correctly even if it
576 # returns quickly. Note: poll timeout is in
576 # returns quickly. Note: poll timeout is in
577 # milliseconds.
577 # milliseconds.
578 self.poller.poll(1000*until_dead)
578 self.poller.poll(1000*until_dead)
579
579
580 since_last_heartbeat = time.time()-request_time
580 since_last_heartbeat = time.time()-request_time
581 if since_last_heartbeat > self.time_to_dead:
581 if since_last_heartbeat > self.time_to_dead:
582 self.call_handlers(since_last_heartbeat)
582 self.call_handlers(since_last_heartbeat)
583 break
583 break
584 else:
584 else:
585 # FIXME: We should probably log this instead.
585 # FIXME: We should probably log this instead.
586 raise
586 raise
587 else:
587 else:
588 until_dead = self.time_to_dead - (time.time() -
588 until_dead = self.time_to_dead - (time.time() -
589 request_time)
589 request_time)
590 if until_dead > 0.0:
590 if until_dead > 0.0:
591 #io.rprint('sleep...', self.time_to_dead) # dbg
591 #io.rprint('sleep...', self.time_to_dead) # dbg
592 time.sleep(until_dead)
592 time.sleep(until_dead)
593 break
593 break
594
594
595 def pause(self):
595 def pause(self):
596 """Pause the heartbeat."""
596 """Pause the heartbeat."""
597 self._pause = True
597 self._pause = True
598
598
599 def unpause(self):
599 def unpause(self):
600 """Unpause the heartbeat."""
600 """Unpause the heartbeat."""
601 self._pause = False
601 self._pause = False
602
602
603 def is_beating(self):
603 def is_beating(self):
604 """Is the heartbeat running and not paused."""
604 """Is the heartbeat running and not paused."""
605 if self.is_alive() and not self._pause:
605 if self.is_alive() and not self._pause:
606 return True
606 return True
607 else:
607 else:
608 return False
608 return False
609
609
610 def stop(self):
610 def stop(self):
611 self._running = False
611 self._running = False
612 super(HBSocketChannel, self).stop()
612 super(HBSocketChannel, self).stop()
613
613
614 def call_handlers(self, since_last_heartbeat):
614 def call_handlers(self, since_last_heartbeat):
615 """This method is called in the ioloop thread when a message arrives.
615 """This method is called in the ioloop thread when a message arrives.
616
616
617 Subclasses should override this method to handle incoming messages.
617 Subclasses should override this method to handle incoming messages.
618 It is important to remember that this method is called in the thread
618 It is important to remember that this method is called in the thread
619 so that some logic must be done to ensure that the application leve
619 so that some logic must be done to ensure that the application leve
620 handlers are called in the application thread.
620 handlers are called in the application thread.
621 """
621 """
622 raise NotImplementedError('call_handlers must be defined in a subclass.')
622 raise NotImplementedError('call_handlers must be defined in a subclass.')
623
623
624
624
625 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
626 # Main kernel manager class
626 # Main kernel manager class
627 #-----------------------------------------------------------------------------
627 #-----------------------------------------------------------------------------
628
628
629 class KernelManager(HasTraits):
629 class KernelManager(HasTraits):
630 """ Manages a kernel for a frontend.
630 """ Manages a kernel for a frontend.
631
631
632 The SUB channel is for the frontend to receive messages published by the
632 The SUB channel is for the frontend to receive messages published by the
633 kernel.
633 kernel.
634
634
635 The REQ channel is for the frontend to make requests of the kernel.
635 The REQ channel is for the frontend to make requests of the kernel.
636
636
637 The REP channel is for the kernel to request stdin (raw_input) from the
637 The REP channel is for the kernel to request stdin (raw_input) from the
638 frontend.
638 frontend.
639 """
639 """
640 # The PyZMQ Context to use for communication with the kernel.
640 # The PyZMQ Context to use for communication with the kernel.
641 context = Instance(zmq.Context,(),{})
641 context = Instance(zmq.Context,(),{})
642
642
643 # The Session to use for communication with the kernel.
643 # The Session to use for communication with the kernel.
644 session = Instance(Session,(),{})
644 session = Instance(Session,(),{})
645
645
646 # The kernel process with which the KernelManager is communicating.
646 # The kernel process with which the KernelManager is communicating.
647 kernel = Instance(Popen)
647 kernel = Instance(Popen)
648
648
649 # The addresses for the communication channels.
649 # The addresses for the communication channels.
650 xreq_address = TCPAddress((LOCALHOST, 0))
650 xreq_address = TCPAddress((LOCALHOST, 0))
651 sub_address = TCPAddress((LOCALHOST, 0))
651 sub_address = TCPAddress((LOCALHOST, 0))
652 rep_address = TCPAddress((LOCALHOST, 0))
652 rep_address = TCPAddress((LOCALHOST, 0))
653 hb_address = TCPAddress((LOCALHOST, 0))
653 hb_address = TCPAddress((LOCALHOST, 0))
654
654
655 # The classes to use for the various channels.
655 # The classes to use for the various channels.
656 xreq_channel_class = Type(XReqSocketChannel)
656 xreq_channel_class = Type(XReqSocketChannel)
657 sub_channel_class = Type(SubSocketChannel)
657 sub_channel_class = Type(SubSocketChannel)
658 rep_channel_class = Type(RepSocketChannel)
658 rep_channel_class = Type(RepSocketChannel)
659 hb_channel_class = Type(HBSocketChannel)
659 hb_channel_class = Type(HBSocketChannel)
660
660
661 # Protected traits.
661 # Protected traits.
662 _launch_args = Any
662 _launch_args = Any
663 _xreq_channel = Any
663 _xreq_channel = Any
664 _sub_channel = Any
664 _sub_channel = Any
665 _rep_channel = Any
665 _rep_channel = Any
666 _hb_channel = Any
666 _hb_channel = Any
667
667
668 def __init__(self, **kwargs):
668 def __init__(self, **kwargs):
669 super(KernelManager, self).__init__(**kwargs)
669 super(KernelManager, self).__init__(**kwargs)
670 # Uncomment this to try closing the context.
670 # Uncomment this to try closing the context.
671 # atexit.register(self.context.close)
671 # atexit.register(self.context.close)
672
672
673 #--------------------------------------------------------------------------
673 #--------------------------------------------------------------------------
674 # Channel management methods:
674 # Channel management methods:
675 #--------------------------------------------------------------------------
675 #--------------------------------------------------------------------------
676
676
677 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
677 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
678 """Starts the channels for this kernel.
678 """Starts the channels for this kernel.
679
679
680 This will create the channels if they do not exist and then start
680 This will create the channels if they do not exist and then start
681 them. If port numbers of 0 are being used (random ports) then you
681 them. If port numbers of 0 are being used (random ports) then you
682 must first call :method:`start_kernel`. If the channels have been
682 must first call :method:`start_kernel`. If the channels have been
683 stopped and you call this, :class:`RuntimeError` will be raised.
683 stopped and you call this, :class:`RuntimeError` will be raised.
684 """
684 """
685 if xreq:
685 if xreq:
686 self.xreq_channel.start()
686 self.xreq_channel.start()
687 if sub:
687 if sub:
688 self.sub_channel.start()
688 self.sub_channel.start()
689 if rep:
689 if rep:
690 self.rep_channel.start()
690 self.rep_channel.start()
691 if hb:
691 if hb:
692 self.hb_channel.start()
692 self.hb_channel.start()
693
693
694 def stop_channels(self):
694 def stop_channels(self):
695 """Stops all the running channels for this kernel.
695 """Stops all the running channels for this kernel.
696 """
696 """
697 if self.xreq_channel.is_alive():
697 if self.xreq_channel.is_alive():
698 self.xreq_channel.stop()
698 self.xreq_channel.stop()
699 if self.sub_channel.is_alive():
699 if self.sub_channel.is_alive():
700 self.sub_channel.stop()
700 self.sub_channel.stop()
701 if self.rep_channel.is_alive():
701 if self.rep_channel.is_alive():
702 self.rep_channel.stop()
702 self.rep_channel.stop()
703 if self.hb_channel.is_alive():
703 if self.hb_channel.is_alive():
704 self.hb_channel.stop()
704 self.hb_channel.stop()
705
705
706 @property
706 @property
707 def channels_running(self):
707 def channels_running(self):
708 """Are any of the channels created and running?"""
708 """Are any of the channels created and running?"""
709 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
709 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
710 self.rep_channel.is_alive() or self.hb_channel.is_alive())
710 self.rep_channel.is_alive() or self.hb_channel.is_alive())
711
711
712 #--------------------------------------------------------------------------
712 #--------------------------------------------------------------------------
713 # Kernel process management methods:
713 # Kernel process management methods:
714 #--------------------------------------------------------------------------
714 #--------------------------------------------------------------------------
715
715
716 def start_kernel(self, **kw):
716 def start_kernel(self, **kw):
717 """Starts a kernel process and configures the manager to use it.
717 """Starts a kernel process and configures the manager to use it.
718
718
719 If random ports (port=0) are being used, this method must be called
719 If random ports (port=0) are being used, this method must be called
720 before the channels are created.
720 before the channels are created.
721
721
722 Parameters:
722 Parameters:
723 -----------
723 -----------
724 ipython : bool, optional (default True)
724 ipython : bool, optional (default True)
725 Whether to use an IPython kernel instead of a plain Python kernel.
725 Whether to use an IPython kernel instead of a plain Python kernel.
726 """
726 """
727 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
727 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
728 self.rep_address, self.hb_address
728 self.rep_address, self.hb_address
729 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
729 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
730 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
730 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
731 raise RuntimeError("Can only launch a kernel on a local interface. "
731 raise RuntimeError("Can only launch a kernel on a local interface. "
732 "Make sure that the '*_address' attributes are "
732 "Make sure that the '*_address' attributes are "
733 "configured properly. "
733 "configured properly. "
734 "Currently valid addresses are: %s"%LOCAL_IPS
734 "Currently valid addresses are: %s"%LOCAL_IPS
735 )
735 )
736
736
737 self._launch_args = kw.copy()
737 self._launch_args = kw.copy()
738 if kw.pop('ipython', True):
738 if kw.pop('ipython', True):
739 from ipkernel import launch_kernel
739 from ipkernel import launch_kernel
740 else:
740 else:
741 from pykernel import launch_kernel
741 from pykernel import launch_kernel
742 self.kernel, xrep, pub, req, _hb = launch_kernel(
742 self.kernel, xrep, pub, req, _hb = launch_kernel(
743 xrep_port=xreq[1], pub_port=sub[1],
743 xrep_port=xreq[1], pub_port=sub[1],
744 req_port=rep[1], hb_port=hb[1], **kw)
744 req_port=rep[1], hb_port=hb[1], **kw)
745 self.xreq_address = (xreq[0], xrep)
745 self.xreq_address = (xreq[0], xrep)
746 self.sub_address = (sub[0], pub)
746 self.sub_address = (sub[0], pub)
747 self.rep_address = (rep[0], req)
747 self.rep_address = (rep[0], req)
748 self.hb_address = (hb[0], _hb)
748 self.hb_address = (hb[0], _hb)
749
749
750 def shutdown_kernel(self, restart=False):
750 def shutdown_kernel(self, restart=False):
751 """ Attempts to the stop the kernel process cleanly. If the kernel
751 """ Attempts to the stop the kernel process cleanly. If the kernel
752 cannot be stopped, it is killed, if possible.
752 cannot be stopped, it is killed, if possible.
753 """
753 """
754 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
754 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
755 if sys.platform == 'win32':
755 if sys.platform == 'win32':
756 self.kill_kernel()
756 self.kill_kernel()
757 return
757 return
758
758
759 # Pause the heart beat channel if it exists.
759 # Pause the heart beat channel if it exists.
760 if self._hb_channel is not None:
760 if self._hb_channel is not None:
761 self._hb_channel.pause()
761 self._hb_channel.pause()
762
762
763 # Don't send any additional kernel kill messages immediately, to give
763 # Don't send any additional kernel kill messages immediately, to give
764 # the kernel a chance to properly execute shutdown actions. Wait for at
764 # the kernel a chance to properly execute shutdown actions. Wait for at
765 # most 1s, checking every 0.1s.
765 # most 1s, checking every 0.1s.
766 self.xreq_channel.shutdown(restart=restart)
766 self.xreq_channel.shutdown(restart=restart)
767 for i in range(10):
767 for i in range(10):
768 if self.is_alive:
768 if self.is_alive:
769 time.sleep(0.1)
769 time.sleep(0.1)
770 else:
770 else:
771 break
771 break
772 else:
772 else:
773 # OK, we've waited long enough.
773 # OK, we've waited long enough.
774 if self.has_kernel:
774 if self.has_kernel:
775 self.kill_kernel()
775 self.kill_kernel()
776
776
777 def restart_kernel(self, now=False):
777 def restart_kernel(self, now=False):
778 """Restarts a kernel with the same arguments that were used to launch
778 """Restarts a kernel with the same arguments that were used to launch
779 it. If the old kernel was launched with random ports, the same ports
779 it. If the old kernel was launched with random ports, the same ports
780 will be used for the new kernel.
780 will be used for the new kernel.
781
781
782 Parameters
782 Parameters
783 ----------
783 ----------
784 now : bool, optional
784 now : bool, optional
785 If True, the kernel is forcefully restarted *immediately*, without
785 If True, the kernel is forcefully restarted *immediately*, without
786 having a chance to do any cleanup action. Otherwise the kernel is
786 having a chance to do any cleanup action. Otherwise the kernel is
787 given 1s to clean up before a forceful restart is issued.
787 given 1s to clean up before a forceful restart is issued.
788
788
789 In all cases the kernel is restarted, the only difference is whether
789 In all cases the kernel is restarted, the only difference is whether
790 it is given a chance to perform a clean shutdown or not.
790 it is given a chance to perform a clean shutdown or not.
791 """
791 """
792 if self._launch_args is None:
792 if self._launch_args is None:
793 raise RuntimeError("Cannot restart the kernel. "
793 raise RuntimeError("Cannot restart the kernel. "
794 "No previous call to 'start_kernel'.")
794 "No previous call to 'start_kernel'.")
795 else:
795 else:
796 if self.has_kernel:
796 if self.has_kernel:
797 if now:
797 if now:
798 self.kill_kernel()
798 self.kill_kernel()
799 else:
799 else:
800 self.shutdown_kernel(restart=True)
800 self.shutdown_kernel(restart=True)
801 self.start_kernel(**self._launch_args)
801 self.start_kernel(**self._launch_args)
802
802
803 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
803 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
804 # unless there is some delay here.
804 # unless there is some delay here.
805 if sys.platform == 'win32':
805 if sys.platform == 'win32':
806 time.sleep(0.2)
806 time.sleep(0.2)
807
807
808 @property
808 @property
809 def has_kernel(self):
809 def has_kernel(self):
810 """Returns whether a kernel process has been specified for the kernel
810 """Returns whether a kernel process has been specified for the kernel
811 manager.
811 manager.
812 """
812 """
813 return self.kernel is not None
813 return self.kernel is not None
814
814
815 def kill_kernel(self):
815 def kill_kernel(self):
816 """ Kill the running kernel. """
816 """ Kill the running kernel. """
817 if self.has_kernel:
817 if self.has_kernel:
818 # Pause the heart beat channel if it exists.
818 # Pause the heart beat channel if it exists.
819 if self._hb_channel is not None:
819 if self._hb_channel is not None:
820 self._hb_channel.pause()
820 self._hb_channel.pause()
821
821
822 # Attempt to kill the kernel.
822 # Attempt to kill the kernel.
823 try:
823 try:
824 self.kernel.kill()
824 self.kernel.kill()
825 except OSError, e:
825 except OSError, e:
826 # In Windows, we will get an Access Denied error if the process
826 # In Windows, we will get an Access Denied error if the process
827 # has already terminated. Ignore it.
827 # has already terminated. Ignore it.
828 if not (sys.platform == 'win32' and e.winerror == 5):
828 if not (sys.platform == 'win32' and e.winerror == 5):
829 raise
829 raise
830 self.kernel = None
830 self.kernel = None
831 else:
831 else:
832 raise RuntimeError("Cannot kill kernel. No kernel is running!")
832 raise RuntimeError("Cannot kill kernel. No kernel is running!")
833
833
834 def interrupt_kernel(self):
834 def interrupt_kernel(self):
835 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
835 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
836 well supported on all platforms.
836 well supported on all platforms.
837 """
837 """
838 if self.has_kernel:
838 if self.has_kernel:
839 if sys.platform == 'win32':
839 if sys.platform == 'win32':
840 from parentpoller import ParentPollerWindows as Poller
840 from parentpoller import ParentPollerWindows as Poller
841 Poller.send_interrupt(self.kernel.win32_interrupt_event)
841 Poller.send_interrupt(self.kernel.win32_interrupt_event)
842 else:
842 else:
843 self.kernel.send_signal(signal.SIGINT)
843 self.kernel.send_signal(signal.SIGINT)
844 else:
844 else:
845 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
845 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
846
846
847 def signal_kernel(self, signum):
847 def signal_kernel(self, signum):
848 """ Sends a signal to the kernel. Note that since only SIGTERM is
848 """ Sends a signal to the kernel. Note that since only SIGTERM is
849 supported on Windows, this function is only useful on Unix systems.
849 supported on Windows, this function is only useful on Unix systems.
850 """
850 """
851 if self.has_kernel:
851 if self.has_kernel:
852 self.kernel.send_signal(signum)
852 self.kernel.send_signal(signum)
853 else:
853 else:
854 raise RuntimeError("Cannot signal kernel. No kernel is running!")
854 raise RuntimeError("Cannot signal kernel. No kernel is running!")
855
855
856 @property
856 @property
857 def is_alive(self):
857 def is_alive(self):
858 """Is the kernel process still running?"""
858 """Is the kernel process still running?"""
859 # FIXME: not using a heartbeat means this method is broken for any
859 # FIXME: not using a heartbeat means this method is broken for any
860 # remote kernel, it's only capable of handling local kernels.
860 # remote kernel, it's only capable of handling local kernels.
861 if self.has_kernel:
861 if self.has_kernel:
862 if self.kernel.poll() is None:
862 if self.kernel.poll() is None:
863 return True
863 return True
864 else:
864 else:
865 return False
865 return False
866 else:
866 else:
867 # We didn't start the kernel with this KernelManager so we don't
867 # We didn't start the kernel with this KernelManager so we don't
868 # know if it is running. We should use a heartbeat for this case.
868 # know if it is running. We should use a heartbeat for this case.
869 return True
869 return True
870
870
871 #--------------------------------------------------------------------------
871 #--------------------------------------------------------------------------
872 # Channels used for communication with the kernel:
872 # Channels used for communication with the kernel:
873 #--------------------------------------------------------------------------
873 #--------------------------------------------------------------------------
874
874
875 @property
875 @property
876 def xreq_channel(self):
876 def xreq_channel(self):
877 """Get the REQ socket channel object to make requests of the kernel."""
877 """Get the REQ socket channel object to make requests of the kernel."""
878 if self._xreq_channel is None:
878 if self._xreq_channel is None:
879 self._xreq_channel = self.xreq_channel_class(self.context,
879 self._xreq_channel = self.xreq_channel_class(self.context,
880 self.session,
880 self.session,
881 self.xreq_address)
881 self.xreq_address)
882 return self._xreq_channel
882 return self._xreq_channel
883
883
884 @property
884 @property
885 def sub_channel(self):
885 def sub_channel(self):
886 """Get the SUB socket channel object."""
886 """Get the SUB socket channel object."""
887 if self._sub_channel is None:
887 if self._sub_channel is None:
888 self._sub_channel = self.sub_channel_class(self.context,
888 self._sub_channel = self.sub_channel_class(self.context,
889 self.session,
889 self.session,
890 self.sub_address)
890 self.sub_address)
891 return self._sub_channel
891 return self._sub_channel
892
892
893 @property
893 @property
894 def rep_channel(self):
894 def rep_channel(self):
895 """Get the REP socket channel object to handle stdin (raw_input)."""
895 """Get the REP socket channel object to handle stdin (raw_input)."""
896 if self._rep_channel is None:
896 if self._rep_channel is None:
897 self._rep_channel = self.rep_channel_class(self.context,
897 self._rep_channel = self.rep_channel_class(self.context,
898 self.session,
898 self.session,
899 self.rep_address)
899 self.rep_address)
900 return self._rep_channel
900 return self._rep_channel
901
901
902 @property
902 @property
903 def hb_channel(self):
903 def hb_channel(self):
904 """Get the REP socket channel object to handle stdin (raw_input)."""
904 """Get the heartbeat socket channel object to check that the
905 kernel is alive."""
905 if self._hb_channel is None:
906 if self._hb_channel is None:
906 self._hb_channel = self.hb_channel_class(self.context,
907 self._hb_channel = self.hb_channel_class(self.context,
907 self.session,
908 self.session,
908 self.hb_address)
909 self.hb_address)
909 return self._hb_channel
910 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now