##// END OF EJS Templates
avoid AttributeErrors on zmq.ZMQError at shutdown of qtconsole...
MinRK -
Show More
@@ -1,962 +1,965
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import errno
19 import errno
20 import json
20 import json
21 from subprocess import Popen
21 from subprocess import Popen
22 import os
22 import os
23 import signal
23 import signal
24 import sys
24 import sys
25 from threading import Thread
25 from threading import Thread
26 import time
26 import time
27
27
28 # System library imports.
28 # System library imports.
29 import zmq
29 import zmq
30 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # during garbage collection of threads at exit:
32 from zmq import ZMQError
30 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
31
34
32 # Local imports.
35 # Local imports.
33 from IPython.config.loader import Config
36 from IPython.config.loader import Config
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.traitlets import (
38 from IPython.utils.traitlets import (
36 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
39 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
37 )
40 )
38 from IPython.utils.py3compat import str_to_bytes
41 from IPython.utils.py3compat import str_to_bytes
39 from IPython.zmq.entry_point import write_connection_file
42 from IPython.zmq.entry_point import write_connection_file
40 from session import Session
43 from session import Session
41
44
42 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
43 # Constants and exceptions
46 # Constants and exceptions
44 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
45
48
46 class InvalidPortNumber(Exception):
49 class InvalidPortNumber(Exception):
47 pass
50 pass
48
51
49 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
50 # Utility functions
53 # Utility functions
51 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
52
55
53 # some utilities to validate message structure, these might get moved elsewhere
56 # some utilities to validate message structure, these might get moved elsewhere
54 # if they prove to have more generic utility
57 # if they prove to have more generic utility
55
58
56 def validate_string_list(lst):
59 def validate_string_list(lst):
57 """Validate that the input is a list of strings.
60 """Validate that the input is a list of strings.
58
61
59 Raises ValueError if not."""
62 Raises ValueError if not."""
60 if not isinstance(lst, list):
63 if not isinstance(lst, list):
61 raise ValueError('input %r must be a list' % lst)
64 raise ValueError('input %r must be a list' % lst)
62 for x in lst:
65 for x in lst:
63 if not isinstance(x, basestring):
66 if not isinstance(x, basestring):
64 raise ValueError('element %r in list must be a string' % x)
67 raise ValueError('element %r in list must be a string' % x)
65
68
66
69
67 def validate_string_dict(dct):
70 def validate_string_dict(dct):
68 """Validate that the input is a dict with string keys and values.
71 """Validate that the input is a dict with string keys and values.
69
72
70 Raises ValueError if not."""
73 Raises ValueError if not."""
71 for k,v in dct.iteritems():
74 for k,v in dct.iteritems():
72 if not isinstance(k, basestring):
75 if not isinstance(k, basestring):
73 raise ValueError('key %r in dict must be a string' % k)
76 raise ValueError('key %r in dict must be a string' % k)
74 if not isinstance(v, basestring):
77 if not isinstance(v, basestring):
75 raise ValueError('value %r in dict must be a string' % v)
78 raise ValueError('value %r in dict must be a string' % v)
76
79
77
80
78 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
79 # ZMQ Socket Channel classes
82 # ZMQ Socket Channel classes
80 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
81
84
82 class ZMQSocketChannel(Thread):
85 class ZMQSocketChannel(Thread):
83 """The base class for the channels that use ZMQ sockets.
86 """The base class for the channels that use ZMQ sockets.
84 """
87 """
85 context = None
88 context = None
86 session = None
89 session = None
87 socket = None
90 socket = None
88 ioloop = None
91 ioloop = None
89 stream = None
92 stream = None
90 _address = None
93 _address = None
91
94
92 def __init__(self, context, session, address):
95 def __init__(self, context, session, address):
93 """Create a channel
96 """Create a channel
94
97
95 Parameters
98 Parameters
96 ----------
99 ----------
97 context : :class:`zmq.Context`
100 context : :class:`zmq.Context`
98 The ZMQ context to use.
101 The ZMQ context to use.
99 session : :class:`session.Session`
102 session : :class:`session.Session`
100 The session to use.
103 The session to use.
101 address : tuple
104 address : tuple
102 Standard (ip, port) tuple that the kernel is listening on.
105 Standard (ip, port) tuple that the kernel is listening on.
103 """
106 """
104 super(ZMQSocketChannel, self).__init__()
107 super(ZMQSocketChannel, self).__init__()
105 self.daemon = True
108 self.daemon = True
106
109
107 self.context = context
110 self.context = context
108 self.session = session
111 self.session = session
109 if address[1] == 0:
112 if address[1] == 0:
110 message = 'The port number for a channel cannot be 0.'
113 message = 'The port number for a channel cannot be 0.'
111 raise InvalidPortNumber(message)
114 raise InvalidPortNumber(message)
112 self._address = address
115 self._address = address
113
116
114 def _run_loop(self):
117 def _run_loop(self):
115 """Run my loop, ignoring EINTR events in the poller"""
118 """Run my loop, ignoring EINTR events in the poller"""
116 while True:
119 while True:
117 try:
120 try:
118 self.ioloop.start()
121 self.ioloop.start()
119 except zmq.ZMQError as e:
122 except ZMQError as e:
120 if e.errno == errno.EINTR:
123 if e.errno == errno.EINTR:
121 continue
124 continue
122 else:
125 else:
123 raise
126 raise
124 else:
127 else:
125 break
128 break
126
129
127 def stop(self):
130 def stop(self):
128 """Stop the channel's activity.
131 """Stop the channel's activity.
129
132
130 This calls :method:`Thread.join` and returns when the thread
133 This calls :method:`Thread.join` and returns when the thread
131 terminates. :class:`RuntimeError` will be raised if
134 terminates. :class:`RuntimeError` will be raised if
132 :method:`self.start` is called again.
135 :method:`self.start` is called again.
133 """
136 """
134 self.join()
137 self.join()
135
138
136 @property
139 @property
137 def address(self):
140 def address(self):
138 """Get the channel's address as an (ip, port) tuple.
141 """Get the channel's address as an (ip, port) tuple.
139
142
140 By the default, the address is (localhost, 0), where 0 means a random
143 By the default, the address is (localhost, 0), where 0 means a random
141 port.
144 port.
142 """
145 """
143 return self._address
146 return self._address
144
147
145 def _queue_send(self, msg):
148 def _queue_send(self, msg):
146 """Queue a message to be sent from the IOLoop's thread.
149 """Queue a message to be sent from the IOLoop's thread.
147
150
148 Parameters
151 Parameters
149 ----------
152 ----------
150 msg : message to send
153 msg : message to send
151
154
152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
155 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
153 thread control of the action.
156 thread control of the action.
154 """
157 """
155 def thread_send():
158 def thread_send():
156 self.session.send(self.stream, msg)
159 self.session.send(self.stream, msg)
157 self.ioloop.add_callback(thread_send)
160 self.ioloop.add_callback(thread_send)
158
161
159 def _handle_recv(self, msg):
162 def _handle_recv(self, msg):
160 """callback for stream.on_recv
163 """callback for stream.on_recv
161
164
162 unpacks message, and calls handlers with it.
165 unpacks message, and calls handlers with it.
163 """
166 """
164 ident,smsg = self.session.feed_identities(msg)
167 ident,smsg = self.session.feed_identities(msg)
165 self.call_handlers(self.session.unserialize(smsg))
168 self.call_handlers(self.session.unserialize(smsg))
166
169
167
170
168
171
169 class ShellSocketChannel(ZMQSocketChannel):
172 class ShellSocketChannel(ZMQSocketChannel):
170 """The XREQ channel for issues request/replies to the kernel.
173 """The XREQ channel for issues request/replies to the kernel.
171 """
174 """
172
175
173 command_queue = None
176 command_queue = None
174 # flag for whether execute requests should be allowed to call raw_input:
177 # flag for whether execute requests should be allowed to call raw_input:
175 allow_stdin = True
178 allow_stdin = True
176
179
177 def __init__(self, context, session, address):
180 def __init__(self, context, session, address):
178 super(ShellSocketChannel, self).__init__(context, session, address)
181 super(ShellSocketChannel, self).__init__(context, session, address)
179 self.ioloop = ioloop.IOLoop()
182 self.ioloop = ioloop.IOLoop()
180
183
181 def run(self):
184 def run(self):
182 """The thread's main activity. Call start() instead."""
185 """The thread's main activity. Call start() instead."""
183 self.socket = self.context.socket(zmq.DEALER)
186 self.socket = self.context.socket(zmq.DEALER)
184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
187 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
185 self.socket.connect('tcp://%s:%i' % self.address)
188 self.socket.connect('tcp://%s:%i' % self.address)
186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
189 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
187 self.stream.on_recv(self._handle_recv)
190 self.stream.on_recv(self._handle_recv)
188 self._run_loop()
191 self._run_loop()
189
192
190 def stop(self):
193 def stop(self):
191 self.ioloop.stop()
194 self.ioloop.stop()
192 super(ShellSocketChannel, self).stop()
195 super(ShellSocketChannel, self).stop()
193
196
194 def call_handlers(self, msg):
197 def call_handlers(self, msg):
195 """This method is called in the ioloop thread when a message arrives.
198 """This method is called in the ioloop thread when a message arrives.
196
199
197 Subclasses should override this method to handle incoming messages.
200 Subclasses should override this method to handle incoming messages.
198 It is important to remember that this method is called in the thread
201 It is important to remember that this method is called in the thread
199 so that some logic must be done to ensure that the application leve
202 so that some logic must be done to ensure that the application leve
200 handlers are called in the application thread.
203 handlers are called in the application thread.
201 """
204 """
202 raise NotImplementedError('call_handlers must be defined in a subclass.')
205 raise NotImplementedError('call_handlers must be defined in a subclass.')
203
206
204 def execute(self, code, silent=False,
207 def execute(self, code, silent=False,
205 user_variables=None, user_expressions=None, allow_stdin=None):
208 user_variables=None, user_expressions=None, allow_stdin=None):
206 """Execute code in the kernel.
209 """Execute code in the kernel.
207
210
208 Parameters
211 Parameters
209 ----------
212 ----------
210 code : str
213 code : str
211 A string of Python code.
214 A string of Python code.
212
215
213 silent : bool, optional (default False)
216 silent : bool, optional (default False)
214 If set, the kernel will execute the code as quietly possible.
217 If set, the kernel will execute the code as quietly possible.
215
218
216 user_variables : list, optional
219 user_variables : list, optional
217 A list of variable names to pull from the user's namespace. They
220 A list of variable names to pull from the user's namespace. They
218 will come back as a dict with these names as keys and their
221 will come back as a dict with these names as keys and their
219 :func:`repr` as values.
222 :func:`repr` as values.
220
223
221 user_expressions : dict, optional
224 user_expressions : dict, optional
222 A dict with string keys and to pull from the user's
225 A dict with string keys and to pull from the user's
223 namespace. They will come back as a dict with these names as keys
226 namespace. They will come back as a dict with these names as keys
224 and their :func:`repr` as values.
227 and their :func:`repr` as values.
225
228
226 allow_stdin : bool, optional
229 allow_stdin : bool, optional
227 Flag for
230 Flag for
228 A dict with string keys and to pull from the user's
231 A dict with string keys and to pull from the user's
229 namespace. They will come back as a dict with these names as keys
232 namespace. They will come back as a dict with these names as keys
230 and their :func:`repr` as values.
233 and their :func:`repr` as values.
231
234
232 Returns
235 Returns
233 -------
236 -------
234 The msg_id of the message sent.
237 The msg_id of the message sent.
235 """
238 """
236 if user_variables is None:
239 if user_variables is None:
237 user_variables = []
240 user_variables = []
238 if user_expressions is None:
241 if user_expressions is None:
239 user_expressions = {}
242 user_expressions = {}
240 if allow_stdin is None:
243 if allow_stdin is None:
241 allow_stdin = self.allow_stdin
244 allow_stdin = self.allow_stdin
242
245
243
246
244 # Don't waste network traffic if inputs are invalid
247 # Don't waste network traffic if inputs are invalid
245 if not isinstance(code, basestring):
248 if not isinstance(code, basestring):
246 raise ValueError('code %r must be a string' % code)
249 raise ValueError('code %r must be a string' % code)
247 validate_string_list(user_variables)
250 validate_string_list(user_variables)
248 validate_string_dict(user_expressions)
251 validate_string_dict(user_expressions)
249
252
250 # Create class for content/msg creation. Related to, but possibly
253 # Create class for content/msg creation. Related to, but possibly
251 # not in Session.
254 # not in Session.
252 content = dict(code=code, silent=silent,
255 content = dict(code=code, silent=silent,
253 user_variables=user_variables,
256 user_variables=user_variables,
254 user_expressions=user_expressions,
257 user_expressions=user_expressions,
255 allow_stdin=allow_stdin,
258 allow_stdin=allow_stdin,
256 )
259 )
257 msg = self.session.msg('execute_request', content)
260 msg = self.session.msg('execute_request', content)
258 self._queue_send(msg)
261 self._queue_send(msg)
259 return msg['header']['msg_id']
262 return msg['header']['msg_id']
260
263
261 def complete(self, text, line, cursor_pos, block=None):
264 def complete(self, text, line, cursor_pos, block=None):
262 """Tab complete text in the kernel's namespace.
265 """Tab complete text in the kernel's namespace.
263
266
264 Parameters
267 Parameters
265 ----------
268 ----------
266 text : str
269 text : str
267 The text to complete.
270 The text to complete.
268 line : str
271 line : str
269 The full line of text that is the surrounding context for the
272 The full line of text that is the surrounding context for the
270 text to complete.
273 text to complete.
271 cursor_pos : int
274 cursor_pos : int
272 The position of the cursor in the line where the completion was
275 The position of the cursor in the line where the completion was
273 requested.
276 requested.
274 block : str, optional
277 block : str, optional
275 The full block of code in which the completion is being requested.
278 The full block of code in which the completion is being requested.
276
279
277 Returns
280 Returns
278 -------
281 -------
279 The msg_id of the message sent.
282 The msg_id of the message sent.
280 """
283 """
281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
284 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
282 msg = self.session.msg('complete_request', content)
285 msg = self.session.msg('complete_request', content)
283 self._queue_send(msg)
286 self._queue_send(msg)
284 return msg['header']['msg_id']
287 return msg['header']['msg_id']
285
288
286 def object_info(self, oname):
289 def object_info(self, oname):
287 """Get metadata information about an object.
290 """Get metadata information about an object.
288
291
289 Parameters
292 Parameters
290 ----------
293 ----------
291 oname : str
294 oname : str
292 A string specifying the object name.
295 A string specifying the object name.
293
296
294 Returns
297 Returns
295 -------
298 -------
296 The msg_id of the message sent.
299 The msg_id of the message sent.
297 """
300 """
298 content = dict(oname=oname)
301 content = dict(oname=oname)
299 msg = self.session.msg('object_info_request', content)
302 msg = self.session.msg('object_info_request', content)
300 self._queue_send(msg)
303 self._queue_send(msg)
301 return msg['header']['msg_id']
304 return msg['header']['msg_id']
302
305
303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
306 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
304 """Get entries from the history list.
307 """Get entries from the history list.
305
308
306 Parameters
309 Parameters
307 ----------
310 ----------
308 raw : bool
311 raw : bool
309 If True, return the raw input.
312 If True, return the raw input.
310 output : bool
313 output : bool
311 If True, then return the output as well.
314 If True, then return the output as well.
312 hist_access_type : str
315 hist_access_type : str
313 'range' (fill in session, start and stop params), 'tail' (fill in n)
316 'range' (fill in session, start and stop params), 'tail' (fill in n)
314 or 'search' (fill in pattern param).
317 or 'search' (fill in pattern param).
315
318
316 session : int
319 session : int
317 For a range request, the session from which to get lines. Session
320 For a range request, the session from which to get lines. Session
318 numbers are positive integers; negative ones count back from the
321 numbers are positive integers; negative ones count back from the
319 current session.
322 current session.
320 start : int
323 start : int
321 The first line number of a history range.
324 The first line number of a history range.
322 stop : int
325 stop : int
323 The final (excluded) line number of a history range.
326 The final (excluded) line number of a history range.
324
327
325 n : int
328 n : int
326 The number of lines of history to get for a tail request.
329 The number of lines of history to get for a tail request.
327
330
328 pattern : str
331 pattern : str
329 The glob-syntax pattern for a search request.
332 The glob-syntax pattern for a search request.
330
333
331 Returns
334 Returns
332 -------
335 -------
333 The msg_id of the message sent.
336 The msg_id of the message sent.
334 """
337 """
335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
338 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
336 **kwargs)
339 **kwargs)
337 msg = self.session.msg('history_request', content)
340 msg = self.session.msg('history_request', content)
338 self._queue_send(msg)
341 self._queue_send(msg)
339 return msg['header']['msg_id']
342 return msg['header']['msg_id']
340
343
341 def shutdown(self, restart=False):
344 def shutdown(self, restart=False):
342 """Request an immediate kernel shutdown.
345 """Request an immediate kernel shutdown.
343
346
344 Upon receipt of the (empty) reply, client code can safely assume that
347 Upon receipt of the (empty) reply, client code can safely assume that
345 the kernel has shut down and it's safe to forcefully terminate it if
348 the kernel has shut down and it's safe to forcefully terminate it if
346 it's still alive.
349 it's still alive.
347
350
348 The kernel will send the reply via a function registered with Python's
351 The kernel will send the reply via a function registered with Python's
349 atexit module, ensuring it's truly done as the kernel is done with all
352 atexit module, ensuring it's truly done as the kernel is done with all
350 normal operation.
353 normal operation.
351 """
354 """
352 # Send quit message to kernel. Once we implement kernel-side setattr,
355 # Send quit message to kernel. Once we implement kernel-side setattr,
353 # this should probably be done that way, but for now this will do.
356 # this should probably be done that way, but for now this will do.
354 msg = self.session.msg('shutdown_request', {'restart':restart})
357 msg = self.session.msg('shutdown_request', {'restart':restart})
355 self._queue_send(msg)
358 self._queue_send(msg)
356 return msg['header']['msg_id']
359 return msg['header']['msg_id']
357
360
358
361
359
362
360 class SubSocketChannel(ZMQSocketChannel):
363 class SubSocketChannel(ZMQSocketChannel):
361 """The SUB channel which listens for messages that the kernel publishes.
364 """The SUB channel which listens for messages that the kernel publishes.
362 """
365 """
363
366
364 def __init__(self, context, session, address):
367 def __init__(self, context, session, address):
365 super(SubSocketChannel, self).__init__(context, session, address)
368 super(SubSocketChannel, self).__init__(context, session, address)
366 self.ioloop = ioloop.IOLoop()
369 self.ioloop = ioloop.IOLoop()
367
370
368 def run(self):
371 def run(self):
369 """The thread's main activity. Call start() instead."""
372 """The thread's main activity. Call start() instead."""
370 self.socket = self.context.socket(zmq.SUB)
373 self.socket = self.context.socket(zmq.SUB)
371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
374 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
375 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect('tcp://%s:%i' % self.address)
376 self.socket.connect('tcp://%s:%i' % self.address)
374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
377 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
375 self.stream.on_recv(self._handle_recv)
378 self.stream.on_recv(self._handle_recv)
376 self._run_loop()
379 self._run_loop()
377
380
378 def stop(self):
381 def stop(self):
379 self.ioloop.stop()
382 self.ioloop.stop()
380 super(SubSocketChannel, self).stop()
383 super(SubSocketChannel, self).stop()
381
384
382 def call_handlers(self, msg):
385 def call_handlers(self, msg):
383 """This method is called in the ioloop thread when a message arrives.
386 """This method is called in the ioloop thread when a message arrives.
384
387
385 Subclasses should override this method to handle incoming messages.
388 Subclasses should override this method to handle incoming messages.
386 It is important to remember that this method is called in the thread
389 It is important to remember that this method is called in the thread
387 so that some logic must be done to ensure that the application leve
390 so that some logic must be done to ensure that the application leve
388 handlers are called in the application thread.
391 handlers are called in the application thread.
389 """
392 """
390 raise NotImplementedError('call_handlers must be defined in a subclass.')
393 raise NotImplementedError('call_handlers must be defined in a subclass.')
391
394
392 def flush(self, timeout=1.0):
395 def flush(self, timeout=1.0):
393 """Immediately processes all pending messages on the SUB channel.
396 """Immediately processes all pending messages on the SUB channel.
394
397
395 Callers should use this method to ensure that :method:`call_handlers`
398 Callers should use this method to ensure that :method:`call_handlers`
396 has been called for all messages that have been received on the
399 has been called for all messages that have been received on the
397 0MQ SUB socket of this channel.
400 0MQ SUB socket of this channel.
398
401
399 This method is thread safe.
402 This method is thread safe.
400
403
401 Parameters
404 Parameters
402 ----------
405 ----------
403 timeout : float, optional
406 timeout : float, optional
404 The maximum amount of time to spend flushing, in seconds. The
407 The maximum amount of time to spend flushing, in seconds. The
405 default is one second.
408 default is one second.
406 """
409 """
407 # We do the IOLoop callback process twice to ensure that the IOLoop
410 # We do the IOLoop callback process twice to ensure that the IOLoop
408 # gets to perform at least one full poll.
411 # gets to perform at least one full poll.
409 stop_time = time.time() + timeout
412 stop_time = time.time() + timeout
410 for i in xrange(2):
413 for i in xrange(2):
411 self._flushed = False
414 self._flushed = False
412 self.ioloop.add_callback(self._flush)
415 self.ioloop.add_callback(self._flush)
413 while not self._flushed and time.time() < stop_time:
416 while not self._flushed and time.time() < stop_time:
414 time.sleep(0.01)
417 time.sleep(0.01)
415
418
416 def _flush(self):
419 def _flush(self):
417 """Callback for :method:`self.flush`."""
420 """Callback for :method:`self.flush`."""
418 self.stream.flush()
421 self.stream.flush()
419 self._flushed = True
422 self._flushed = True
420
423
421
424
422 class StdInSocketChannel(ZMQSocketChannel):
425 class StdInSocketChannel(ZMQSocketChannel):
423 """A reply channel to handle raw_input requests that the kernel makes."""
426 """A reply channel to handle raw_input requests that the kernel makes."""
424
427
425 msg_queue = None
428 msg_queue = None
426
429
427 def __init__(self, context, session, address):
430 def __init__(self, context, session, address):
428 super(StdInSocketChannel, self).__init__(context, session, address)
431 super(StdInSocketChannel, self).__init__(context, session, address)
429 self.ioloop = ioloop.IOLoop()
432 self.ioloop = ioloop.IOLoop()
430
433
431 def run(self):
434 def run(self):
432 """The thread's main activity. Call start() instead."""
435 """The thread's main activity. Call start() instead."""
433 self.socket = self.context.socket(zmq.DEALER)
436 self.socket = self.context.socket(zmq.DEALER)
434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
437 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
435 self.socket.connect('tcp://%s:%i' % self.address)
438 self.socket.connect('tcp://%s:%i' % self.address)
436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
439 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
437 self.stream.on_recv(self._handle_recv)
440 self.stream.on_recv(self._handle_recv)
438 self._run_loop()
441 self._run_loop()
439
442
440 def stop(self):
443 def stop(self):
441 self.ioloop.stop()
444 self.ioloop.stop()
442 super(StdInSocketChannel, self).stop()
445 super(StdInSocketChannel, self).stop()
443
446
444 def call_handlers(self, msg):
447 def call_handlers(self, msg):
445 """This method is called in the ioloop thread when a message arrives.
448 """This method is called in the ioloop thread when a message arrives.
446
449
447 Subclasses should override this method to handle incoming messages.
450 Subclasses should override this method to handle incoming messages.
448 It is important to remember that this method is called in the thread
451 It is important to remember that this method is called in the thread
449 so that some logic must be done to ensure that the application leve
452 so that some logic must be done to ensure that the application leve
450 handlers are called in the application thread.
453 handlers are called in the application thread.
451 """
454 """
452 raise NotImplementedError('call_handlers must be defined in a subclass.')
455 raise NotImplementedError('call_handlers must be defined in a subclass.')
453
456
454 def input(self, string):
457 def input(self, string):
455 """Send a string of raw input to the kernel."""
458 """Send a string of raw input to the kernel."""
456 content = dict(value=string)
459 content = dict(value=string)
457 msg = self.session.msg('input_reply', content)
460 msg = self.session.msg('input_reply', content)
458 self._queue_send(msg)
461 self._queue_send(msg)
459
462
460
463
461 class HBSocketChannel(ZMQSocketChannel):
464 class HBSocketChannel(ZMQSocketChannel):
462 """The heartbeat channel which monitors the kernel heartbeat.
465 """The heartbeat channel which monitors the kernel heartbeat.
463
466
464 Note that the heartbeat channel is paused by default. As long as you start
467 Note that the heartbeat channel is paused by default. As long as you start
465 this channel, the kernel manager will ensure that it is paused and un-paused
468 this channel, the kernel manager will ensure that it is paused and un-paused
466 as appropriate.
469 as appropriate.
467 """
470 """
468
471
469 time_to_dead = 3.0
472 time_to_dead = 3.0
470 socket = None
473 socket = None
471 poller = None
474 poller = None
472 _running = None
475 _running = None
473 _pause = None
476 _pause = None
474 _beating = None
477 _beating = None
475
478
476 def __init__(self, context, session, address):
479 def __init__(self, context, session, address):
477 super(HBSocketChannel, self).__init__(context, session, address)
480 super(HBSocketChannel, self).__init__(context, session, address)
478 self._running = False
481 self._running = False
479 self._pause =True
482 self._pause =True
480 self.poller = zmq.Poller()
483 self.poller = zmq.Poller()
481
484
482 def _create_socket(self):
485 def _create_socket(self):
483 if self.socket is not None:
486 if self.socket is not None:
484 # close previous socket, before opening a new one
487 # close previous socket, before opening a new one
485 self.poller.unregister(self.socket)
488 self.poller.unregister(self.socket)
486 self.socket.close()
489 self.socket.close()
487 self.socket = self.context.socket(zmq.REQ)
490 self.socket = self.context.socket(zmq.REQ)
488 self.socket.setsockopt(zmq.LINGER, 0)
491 self.socket.setsockopt(zmq.LINGER, 0)
489 self.socket.connect('tcp://%s:%i' % self.address)
492 self.socket.connect('tcp://%s:%i' % self.address)
490
493
491 self.poller.register(self.socket, zmq.POLLIN)
494 self.poller.register(self.socket, zmq.POLLIN)
492
495
493 def _poll(self, start_time):
496 def _poll(self, start_time):
494 """poll for heartbeat replies until we reach self.time_to_dead
497 """poll for heartbeat replies until we reach self.time_to_dead
495
498
496 Ignores interrupts, and returns the result of poll(), which
499 Ignores interrupts, and returns the result of poll(), which
497 will be an empty list if no messages arrived before the timeout,
500 will be an empty list if no messages arrived before the timeout,
498 or the event tuple if there is a message to receive.
501 or the event tuple if there is a message to receive.
499 """
502 """
500
503
501 until_dead = self.time_to_dead - (time.time() - start_time)
504 until_dead = self.time_to_dead - (time.time() - start_time)
502 # ensure poll at least once
505 # ensure poll at least once
503 until_dead = max(until_dead, 1e-3)
506 until_dead = max(until_dead, 1e-3)
504 events = []
507 events = []
505 while True:
508 while True:
506 try:
509 try:
507 events = self.poller.poll(1000 * until_dead)
510 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
511 except ZMQError as e:
509 if e.errno == errno.EINTR:
512 if e.errno == errno.EINTR:
510 # ignore interrupts during heartbeat
513 # ignore interrupts during heartbeat
511 # this may never actually happen
514 # this may never actually happen
512 until_dead = self.time_to_dead - (time.time() - start_time)
515 until_dead = self.time_to_dead - (time.time() - start_time)
513 until_dead = max(until_dead, 1e-3)
516 until_dead = max(until_dead, 1e-3)
514 pass
517 pass
515 else:
518 else:
516 raise
519 raise
517 else:
520 else:
518 break
521 break
519 return events
522 return events
520
523
521 def run(self):
524 def run(self):
522 """The thread's main activity. Call start() instead."""
525 """The thread's main activity. Call start() instead."""
523 self._create_socket()
526 self._create_socket()
524 self._running = True
527 self._running = True
525 self._beating = True
528 self._beating = True
526
529
527 while self._running:
530 while self._running:
528 if self._pause:
531 if self._pause:
529 # just sleep, and skip the rest of the loop
532 # just sleep, and skip the rest of the loop
530 time.sleep(self.time_to_dead)
533 time.sleep(self.time_to_dead)
531 continue
534 continue
532
535
533 since_last_heartbeat = 0.0
536 since_last_heartbeat = 0.0
534 # io.rprint('Ping from HB channel') # dbg
537 # io.rprint('Ping from HB channel') # dbg
535 # no need to catch EFSM here, because the previous event was
538 # no need to catch EFSM here, because the previous event was
536 # either a recv or connect, which cannot be followed by EFSM
539 # either a recv or connect, which cannot be followed by EFSM
537 self.socket.send(b'ping')
540 self.socket.send(b'ping')
538 request_time = time.time()
541 request_time = time.time()
539 ready = self._poll(request_time)
542 ready = self._poll(request_time)
540 if ready:
543 if ready:
541 self._beating = True
544 self._beating = True
542 # the poll above guarantees we have something to recv
545 # the poll above guarantees we have something to recv
543 self.socket.recv()
546 self.socket.recv()
544 # sleep the remainder of the cycle
547 # sleep the remainder of the cycle
545 remainder = self.time_to_dead - (time.time() - request_time)
548 remainder = self.time_to_dead - (time.time() - request_time)
546 if remainder > 0:
549 if remainder > 0:
547 time.sleep(remainder)
550 time.sleep(remainder)
548 continue
551 continue
549 else:
552 else:
550 # nothing was received within the time limit, signal heart failure
553 # nothing was received within the time limit, signal heart failure
551 self._beating = False
554 self._beating = False
552 since_last_heartbeat = time.time() - request_time
555 since_last_heartbeat = time.time() - request_time
553 self.call_handlers(since_last_heartbeat)
556 self.call_handlers(since_last_heartbeat)
554 # and close/reopen the socket, because the REQ/REP cycle has been broken
557 # and close/reopen the socket, because the REQ/REP cycle has been broken
555 self._create_socket()
558 self._create_socket()
556 continue
559 continue
557
560
558 def pause(self):
561 def pause(self):
559 """Pause the heartbeat."""
562 """Pause the heartbeat."""
560 self._pause = True
563 self._pause = True
561
564
562 def unpause(self):
565 def unpause(self):
563 """Unpause the heartbeat."""
566 """Unpause the heartbeat."""
564 self._pause = False
567 self._pause = False
565
568
566 def is_beating(self):
569 def is_beating(self):
567 """Is the heartbeat running and responsive (and not paused)."""
570 """Is the heartbeat running and responsive (and not paused)."""
568 if self.is_alive() and not self._pause and self._beating:
571 if self.is_alive() and not self._pause and self._beating:
569 return True
572 return True
570 else:
573 else:
571 return False
574 return False
572
575
573 def stop(self):
576 def stop(self):
574 self._running = False
577 self._running = False
575 super(HBSocketChannel, self).stop()
578 super(HBSocketChannel, self).stop()
576
579
577 def call_handlers(self, since_last_heartbeat):
580 def call_handlers(self, since_last_heartbeat):
578 """This method is called in the ioloop thread when a message arrives.
581 """This method is called in the ioloop thread when a message arrives.
579
582
580 Subclasses should override this method to handle incoming messages.
583 Subclasses should override this method to handle incoming messages.
581 It is important to remember that this method is called in the thread
584 It is important to remember that this method is called in the thread
582 so that some logic must be done to ensure that the application level
585 so that some logic must be done to ensure that the application level
583 handlers are called in the application thread.
586 handlers are called in the application thread.
584 """
587 """
585 raise NotImplementedError('call_handlers must be defined in a subclass.')
588 raise NotImplementedError('call_handlers must be defined in a subclass.')
586
589
587
590
588 #-----------------------------------------------------------------------------
591 #-----------------------------------------------------------------------------
589 # Main kernel manager class
592 # Main kernel manager class
590 #-----------------------------------------------------------------------------
593 #-----------------------------------------------------------------------------
591
594
592 class KernelManager(HasTraits):
595 class KernelManager(HasTraits):
593 """ Manages a kernel for a frontend.
596 """ Manages a kernel for a frontend.
594
597
595 The SUB channel is for the frontend to receive messages published by the
598 The SUB channel is for the frontend to receive messages published by the
596 kernel.
599 kernel.
597
600
598 The REQ channel is for the frontend to make requests of the kernel.
601 The REQ channel is for the frontend to make requests of the kernel.
599
602
600 The REP channel is for the kernel to request stdin (raw_input) from the
603 The REP channel is for the kernel to request stdin (raw_input) from the
601 frontend.
604 frontend.
602 """
605 """
603 # config object for passing to child configurables
606 # config object for passing to child configurables
604 config = Instance(Config)
607 config = Instance(Config)
605
608
606 # The PyZMQ Context to use for communication with the kernel.
609 # The PyZMQ Context to use for communication with the kernel.
607 context = Instance(zmq.Context)
610 context = Instance(zmq.Context)
608 def _context_default(self):
611 def _context_default(self):
609 return zmq.Context.instance()
612 return zmq.Context.instance()
610
613
611 # The Session to use for communication with the kernel.
614 # The Session to use for communication with the kernel.
612 session = Instance(Session)
615 session = Instance(Session)
613
616
614 # The kernel process with which the KernelManager is communicating.
617 # The kernel process with which the KernelManager is communicating.
615 kernel = Instance(Popen)
618 kernel = Instance(Popen)
616
619
617 # The addresses for the communication channels.
620 # The addresses for the communication channels.
618 connection_file = Unicode('')
621 connection_file = Unicode('')
619 ip = Unicode(LOCALHOST)
622 ip = Unicode(LOCALHOST)
620 def _ip_changed(self, name, old, new):
623 def _ip_changed(self, name, old, new):
621 if new == '*':
624 if new == '*':
622 self.ip = '0.0.0.0'
625 self.ip = '0.0.0.0'
623 shell_port = Integer(0)
626 shell_port = Integer(0)
624 iopub_port = Integer(0)
627 iopub_port = Integer(0)
625 stdin_port = Integer(0)
628 stdin_port = Integer(0)
626 hb_port = Integer(0)
629 hb_port = Integer(0)
627
630
628 # The classes to use for the various channels.
631 # The classes to use for the various channels.
629 shell_channel_class = Type(ShellSocketChannel)
632 shell_channel_class = Type(ShellSocketChannel)
630 sub_channel_class = Type(SubSocketChannel)
633 sub_channel_class = Type(SubSocketChannel)
631 stdin_channel_class = Type(StdInSocketChannel)
634 stdin_channel_class = Type(StdInSocketChannel)
632 hb_channel_class = Type(HBSocketChannel)
635 hb_channel_class = Type(HBSocketChannel)
633
636
634 # Protected traits.
637 # Protected traits.
635 _launch_args = Any
638 _launch_args = Any
636 _shell_channel = Any
639 _shell_channel = Any
637 _sub_channel = Any
640 _sub_channel = Any
638 _stdin_channel = Any
641 _stdin_channel = Any
639 _hb_channel = Any
642 _hb_channel = Any
640 _connection_file_written=Bool(False)
643 _connection_file_written=Bool(False)
641
644
642 def __init__(self, **kwargs):
645 def __init__(self, **kwargs):
643 super(KernelManager, self).__init__(**kwargs)
646 super(KernelManager, self).__init__(**kwargs)
644 if self.session is None:
647 if self.session is None:
645 self.session = Session(config=self.config)
648 self.session = Session(config=self.config)
646
649
647 def __del__(self):
650 def __del__(self):
648 self.cleanup_connection_file()
651 self.cleanup_connection_file()
649
652
650
653
651 #--------------------------------------------------------------------------
654 #--------------------------------------------------------------------------
652 # Channel management methods:
655 # Channel management methods:
653 #--------------------------------------------------------------------------
656 #--------------------------------------------------------------------------
654
657
655 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
658 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
656 """Starts the channels for this kernel.
659 """Starts the channels for this kernel.
657
660
658 This will create the channels if they do not exist and then start
661 This will create the channels if they do not exist and then start
659 them. If port numbers of 0 are being used (random ports) then you
662 them. If port numbers of 0 are being used (random ports) then you
660 must first call :method:`start_kernel`. If the channels have been
663 must first call :method:`start_kernel`. If the channels have been
661 stopped and you call this, :class:`RuntimeError` will be raised.
664 stopped and you call this, :class:`RuntimeError` will be raised.
662 """
665 """
663 if shell:
666 if shell:
664 self.shell_channel.start()
667 self.shell_channel.start()
665 if sub:
668 if sub:
666 self.sub_channel.start()
669 self.sub_channel.start()
667 if stdin:
670 if stdin:
668 self.stdin_channel.start()
671 self.stdin_channel.start()
669 self.shell_channel.allow_stdin = True
672 self.shell_channel.allow_stdin = True
670 else:
673 else:
671 self.shell_channel.allow_stdin = False
674 self.shell_channel.allow_stdin = False
672 if hb:
675 if hb:
673 self.hb_channel.start()
676 self.hb_channel.start()
674
677
675 def stop_channels(self):
678 def stop_channels(self):
676 """Stops all the running channels for this kernel.
679 """Stops all the running channels for this kernel.
677 """
680 """
678 if self.shell_channel.is_alive():
681 if self.shell_channel.is_alive():
679 self.shell_channel.stop()
682 self.shell_channel.stop()
680 if self.sub_channel.is_alive():
683 if self.sub_channel.is_alive():
681 self.sub_channel.stop()
684 self.sub_channel.stop()
682 if self.stdin_channel.is_alive():
685 if self.stdin_channel.is_alive():
683 self.stdin_channel.stop()
686 self.stdin_channel.stop()
684 if self.hb_channel.is_alive():
687 if self.hb_channel.is_alive():
685 self.hb_channel.stop()
688 self.hb_channel.stop()
686
689
687 @property
690 @property
688 def channels_running(self):
691 def channels_running(self):
689 """Are any of the channels created and running?"""
692 """Are any of the channels created and running?"""
690 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
693 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
691 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
694 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
692
695
693 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
694 # Kernel process management methods:
697 # Kernel process management methods:
695 #--------------------------------------------------------------------------
698 #--------------------------------------------------------------------------
696
699
697 def cleanup_connection_file(self):
700 def cleanup_connection_file(self):
698 """cleanup connection file *if we wrote it*
701 """cleanup connection file *if we wrote it*
699
702
700 Will not raise if the connection file was already removed somehow.
703 Will not raise if the connection file was already removed somehow.
701 """
704 """
702 if self._connection_file_written:
705 if self._connection_file_written:
703 # cleanup connection files on full shutdown of kernel we started
706 # cleanup connection files on full shutdown of kernel we started
704 self._connection_file_written = False
707 self._connection_file_written = False
705 try:
708 try:
706 os.remove(self.connection_file)
709 os.remove(self.connection_file)
707 except OSError:
710 except OSError:
708 pass
711 pass
709
712
710 def load_connection_file(self):
713 def load_connection_file(self):
711 """load connection info from JSON dict in self.connection_file"""
714 """load connection info from JSON dict in self.connection_file"""
712 with open(self.connection_file) as f:
715 with open(self.connection_file) as f:
713 cfg = json.loads(f.read())
716 cfg = json.loads(f.read())
714
717
715 self.ip = cfg['ip']
718 self.ip = cfg['ip']
716 self.shell_port = cfg['shell_port']
719 self.shell_port = cfg['shell_port']
717 self.stdin_port = cfg['stdin_port']
720 self.stdin_port = cfg['stdin_port']
718 self.iopub_port = cfg['iopub_port']
721 self.iopub_port = cfg['iopub_port']
719 self.hb_port = cfg['hb_port']
722 self.hb_port = cfg['hb_port']
720 self.session.key = str_to_bytes(cfg['key'])
723 self.session.key = str_to_bytes(cfg['key'])
721
724
722 def write_connection_file(self):
725 def write_connection_file(self):
723 """write connection info to JSON dict in self.connection_file"""
726 """write connection info to JSON dict in self.connection_file"""
724 if self._connection_file_written:
727 if self._connection_file_written:
725 return
728 return
726 self.connection_file,cfg = write_connection_file(self.connection_file,
729 self.connection_file,cfg = write_connection_file(self.connection_file,
727 ip=self.ip, key=self.session.key,
730 ip=self.ip, key=self.session.key,
728 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
731 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
729 shell_port=self.shell_port, hb_port=self.hb_port)
732 shell_port=self.shell_port, hb_port=self.hb_port)
730 # write_connection_file also sets default ports:
733 # write_connection_file also sets default ports:
731 self.shell_port = cfg['shell_port']
734 self.shell_port = cfg['shell_port']
732 self.stdin_port = cfg['stdin_port']
735 self.stdin_port = cfg['stdin_port']
733 self.iopub_port = cfg['iopub_port']
736 self.iopub_port = cfg['iopub_port']
734 self.hb_port = cfg['hb_port']
737 self.hb_port = cfg['hb_port']
735
738
736 self._connection_file_written = True
739 self._connection_file_written = True
737
740
738 def start_kernel(self, **kw):
741 def start_kernel(self, **kw):
739 """Starts a kernel process and configures the manager to use it.
742 """Starts a kernel process and configures the manager to use it.
740
743
741 If random ports (port=0) are being used, this method must be called
744 If random ports (port=0) are being used, this method must be called
742 before the channels are created.
745 before the channels are created.
743
746
744 Parameters:
747 Parameters:
745 -----------
748 -----------
746 ipython : bool, optional (default True)
749 ipython : bool, optional (default True)
747 Whether to use an IPython kernel instead of a plain Python kernel.
750 Whether to use an IPython kernel instead of a plain Python kernel.
748
751
749 launcher : callable, optional (default None)
752 launcher : callable, optional (default None)
750 A custom function for launching the kernel process (generally a
753 A custom function for launching the kernel process (generally a
751 wrapper around ``entry_point.base_launch_kernel``). In most cases,
754 wrapper around ``entry_point.base_launch_kernel``). In most cases,
752 it should not be necessary to use this parameter.
755 it should not be necessary to use this parameter.
753
756
754 **kw : optional
757 **kw : optional
755 See respective options for IPython and Python kernels.
758 See respective options for IPython and Python kernels.
756 """
759 """
757 if self.ip not in LOCAL_IPS:
760 if self.ip not in LOCAL_IPS:
758 raise RuntimeError("Can only launch a kernel on a local interface. "
761 raise RuntimeError("Can only launch a kernel on a local interface. "
759 "Make sure that the '*_address' attributes are "
762 "Make sure that the '*_address' attributes are "
760 "configured properly. "
763 "configured properly. "
761 "Currently valid addresses are: %s"%LOCAL_IPS
764 "Currently valid addresses are: %s"%LOCAL_IPS
762 )
765 )
763
766
764 # write connection file / get default ports
767 # write connection file / get default ports
765 self.write_connection_file()
768 self.write_connection_file()
766
769
767 self._launch_args = kw.copy()
770 self._launch_args = kw.copy()
768 launch_kernel = kw.pop('launcher', None)
771 launch_kernel = kw.pop('launcher', None)
769 if launch_kernel is None:
772 if launch_kernel is None:
770 if kw.pop('ipython', True):
773 if kw.pop('ipython', True):
771 from ipkernel import launch_kernel
774 from ipkernel import launch_kernel
772 else:
775 else:
773 from pykernel import launch_kernel
776 from pykernel import launch_kernel
774 self.kernel = launch_kernel(fname=self.connection_file, **kw)
777 self.kernel = launch_kernel(fname=self.connection_file, **kw)
775
778
776 def shutdown_kernel(self, restart=False):
779 def shutdown_kernel(self, restart=False):
777 """ Attempts to the stop the kernel process cleanly. If the kernel
780 """ Attempts to the stop the kernel process cleanly. If the kernel
778 cannot be stopped, it is killed, if possible.
781 cannot be stopped, it is killed, if possible.
779 """
782 """
780 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
783 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
781 if sys.platform == 'win32':
784 if sys.platform == 'win32':
782 self.kill_kernel()
785 self.kill_kernel()
783 return
786 return
784
787
785 # Pause the heart beat channel if it exists.
788 # Pause the heart beat channel if it exists.
786 if self._hb_channel is not None:
789 if self._hb_channel is not None:
787 self._hb_channel.pause()
790 self._hb_channel.pause()
788
791
789 # Don't send any additional kernel kill messages immediately, to give
792 # Don't send any additional kernel kill messages immediately, to give
790 # the kernel a chance to properly execute shutdown actions. Wait for at
793 # the kernel a chance to properly execute shutdown actions. Wait for at
791 # most 1s, checking every 0.1s.
794 # most 1s, checking every 0.1s.
792 self.shell_channel.shutdown(restart=restart)
795 self.shell_channel.shutdown(restart=restart)
793 for i in range(10):
796 for i in range(10):
794 if self.is_alive:
797 if self.is_alive:
795 time.sleep(0.1)
798 time.sleep(0.1)
796 else:
799 else:
797 break
800 break
798 else:
801 else:
799 # OK, we've waited long enough.
802 # OK, we've waited long enough.
800 if self.has_kernel:
803 if self.has_kernel:
801 self.kill_kernel()
804 self.kill_kernel()
802
805
803 if not restart and self._connection_file_written:
806 if not restart and self._connection_file_written:
804 # cleanup connection files on full shutdown of kernel we started
807 # cleanup connection files on full shutdown of kernel we started
805 self._connection_file_written = False
808 self._connection_file_written = False
806 try:
809 try:
807 os.remove(self.connection_file)
810 os.remove(self.connection_file)
808 except IOError:
811 except IOError:
809 pass
812 pass
810
813
811 def restart_kernel(self, now=False, **kw):
814 def restart_kernel(self, now=False, **kw):
812 """Restarts a kernel with the arguments that were used to launch it.
815 """Restarts a kernel with the arguments that were used to launch it.
813
816
814 If the old kernel was launched with random ports, the same ports will be
817 If the old kernel was launched with random ports, the same ports will be
815 used for the new kernel.
818 used for the new kernel.
816
819
817 Parameters
820 Parameters
818 ----------
821 ----------
819 now : bool, optional
822 now : bool, optional
820 If True, the kernel is forcefully restarted *immediately*, without
823 If True, the kernel is forcefully restarted *immediately*, without
821 having a chance to do any cleanup action. Otherwise the kernel is
824 having a chance to do any cleanup action. Otherwise the kernel is
822 given 1s to clean up before a forceful restart is issued.
825 given 1s to clean up before a forceful restart is issued.
823
826
824 In all cases the kernel is restarted, the only difference is whether
827 In all cases the kernel is restarted, the only difference is whether
825 it is given a chance to perform a clean shutdown or not.
828 it is given a chance to perform a clean shutdown or not.
826
829
827 **kw : optional
830 **kw : optional
828 Any options specified here will replace those used to launch the
831 Any options specified here will replace those used to launch the
829 kernel.
832 kernel.
830 """
833 """
831 if self._launch_args is None:
834 if self._launch_args is None:
832 raise RuntimeError("Cannot restart the kernel. "
835 raise RuntimeError("Cannot restart the kernel. "
833 "No previous call to 'start_kernel'.")
836 "No previous call to 'start_kernel'.")
834 else:
837 else:
835 # Stop currently running kernel.
838 # Stop currently running kernel.
836 if self.has_kernel:
839 if self.has_kernel:
837 if now:
840 if now:
838 self.kill_kernel()
841 self.kill_kernel()
839 else:
842 else:
840 self.shutdown_kernel(restart=True)
843 self.shutdown_kernel(restart=True)
841
844
842 # Start new kernel.
845 # Start new kernel.
843 self._launch_args.update(kw)
846 self._launch_args.update(kw)
844 self.start_kernel(**self._launch_args)
847 self.start_kernel(**self._launch_args)
845
848
846 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
849 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
847 # unless there is some delay here.
850 # unless there is some delay here.
848 if sys.platform == 'win32':
851 if sys.platform == 'win32':
849 time.sleep(0.2)
852 time.sleep(0.2)
850
853
851 @property
854 @property
852 def has_kernel(self):
855 def has_kernel(self):
853 """Returns whether a kernel process has been specified for the kernel
856 """Returns whether a kernel process has been specified for the kernel
854 manager.
857 manager.
855 """
858 """
856 return self.kernel is not None
859 return self.kernel is not None
857
860
858 def kill_kernel(self):
861 def kill_kernel(self):
859 """ Kill the running kernel. """
862 """ Kill the running kernel. """
860 if self.has_kernel:
863 if self.has_kernel:
861 # Pause the heart beat channel if it exists.
864 # Pause the heart beat channel if it exists.
862 if self._hb_channel is not None:
865 if self._hb_channel is not None:
863 self._hb_channel.pause()
866 self._hb_channel.pause()
864
867
865 # Attempt to kill the kernel.
868 # Attempt to kill the kernel.
866 try:
869 try:
867 self.kernel.kill()
870 self.kernel.kill()
868 except OSError, e:
871 except OSError, e:
869 # In Windows, we will get an Access Denied error if the process
872 # In Windows, we will get an Access Denied error if the process
870 # has already terminated. Ignore it.
873 # has already terminated. Ignore it.
871 if sys.platform == 'win32':
874 if sys.platform == 'win32':
872 if e.winerror != 5:
875 if e.winerror != 5:
873 raise
876 raise
874 # On Unix, we may get an ESRCH error if the process has already
877 # On Unix, we may get an ESRCH error if the process has already
875 # terminated. Ignore it.
878 # terminated. Ignore it.
876 else:
879 else:
877 from errno import ESRCH
880 from errno import ESRCH
878 if e.errno != ESRCH:
881 if e.errno != ESRCH:
879 raise
882 raise
880 self.kernel = None
883 self.kernel = None
881 else:
884 else:
882 raise RuntimeError("Cannot kill kernel. No kernel is running!")
885 raise RuntimeError("Cannot kill kernel. No kernel is running!")
883
886
884 def interrupt_kernel(self):
887 def interrupt_kernel(self):
885 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
888 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
886 well supported on all platforms.
889 well supported on all platforms.
887 """
890 """
888 if self.has_kernel:
891 if self.has_kernel:
889 if sys.platform == 'win32':
892 if sys.platform == 'win32':
890 from parentpoller import ParentPollerWindows as Poller
893 from parentpoller import ParentPollerWindows as Poller
891 Poller.send_interrupt(self.kernel.win32_interrupt_event)
894 Poller.send_interrupt(self.kernel.win32_interrupt_event)
892 else:
895 else:
893 self.kernel.send_signal(signal.SIGINT)
896 self.kernel.send_signal(signal.SIGINT)
894 else:
897 else:
895 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
898 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
896
899
897 def signal_kernel(self, signum):
900 def signal_kernel(self, signum):
898 """ Sends a signal to the kernel. Note that since only SIGTERM is
901 """ Sends a signal to the kernel. Note that since only SIGTERM is
899 supported on Windows, this function is only useful on Unix systems.
902 supported on Windows, this function is only useful on Unix systems.
900 """
903 """
901 if self.has_kernel:
904 if self.has_kernel:
902 self.kernel.send_signal(signum)
905 self.kernel.send_signal(signum)
903 else:
906 else:
904 raise RuntimeError("Cannot signal kernel. No kernel is running!")
907 raise RuntimeError("Cannot signal kernel. No kernel is running!")
905
908
906 @property
909 @property
907 def is_alive(self):
910 def is_alive(self):
908 """Is the kernel process still running?"""
911 """Is the kernel process still running?"""
909 if self.has_kernel:
912 if self.has_kernel:
910 if self.kernel.poll() is None:
913 if self.kernel.poll() is None:
911 return True
914 return True
912 else:
915 else:
913 return False
916 return False
914 elif self._hb_channel is not None:
917 elif self._hb_channel is not None:
915 # We didn't start the kernel with this KernelManager so we
918 # We didn't start the kernel with this KernelManager so we
916 # use the heartbeat.
919 # use the heartbeat.
917 return self._hb_channel.is_beating()
920 return self._hb_channel.is_beating()
918 else:
921 else:
919 # no heartbeat and not local, we can't tell if it's running,
922 # no heartbeat and not local, we can't tell if it's running,
920 # so naively return True
923 # so naively return True
921 return True
924 return True
922
925
923 #--------------------------------------------------------------------------
926 #--------------------------------------------------------------------------
924 # Channels used for communication with the kernel:
927 # Channels used for communication with the kernel:
925 #--------------------------------------------------------------------------
928 #--------------------------------------------------------------------------
926
929
927 @property
930 @property
928 def shell_channel(self):
931 def shell_channel(self):
929 """Get the REQ socket channel object to make requests of the kernel."""
932 """Get the REQ socket channel object to make requests of the kernel."""
930 if self._shell_channel is None:
933 if self._shell_channel is None:
931 self._shell_channel = self.shell_channel_class(self.context,
934 self._shell_channel = self.shell_channel_class(self.context,
932 self.session,
935 self.session,
933 (self.ip, self.shell_port))
936 (self.ip, self.shell_port))
934 return self._shell_channel
937 return self._shell_channel
935
938
936 @property
939 @property
937 def sub_channel(self):
940 def sub_channel(self):
938 """Get the SUB socket channel object."""
941 """Get the SUB socket channel object."""
939 if self._sub_channel is None:
942 if self._sub_channel is None:
940 self._sub_channel = self.sub_channel_class(self.context,
943 self._sub_channel = self.sub_channel_class(self.context,
941 self.session,
944 self.session,
942 (self.ip, self.iopub_port))
945 (self.ip, self.iopub_port))
943 return self._sub_channel
946 return self._sub_channel
944
947
945 @property
948 @property
946 def stdin_channel(self):
949 def stdin_channel(self):
947 """Get the REP socket channel object to handle stdin (raw_input)."""
950 """Get the REP socket channel object to handle stdin (raw_input)."""
948 if self._stdin_channel is None:
951 if self._stdin_channel is None:
949 self._stdin_channel = self.stdin_channel_class(self.context,
952 self._stdin_channel = self.stdin_channel_class(self.context,
950 self.session,
953 self.session,
951 (self.ip, self.stdin_port))
954 (self.ip, self.stdin_port))
952 return self._stdin_channel
955 return self._stdin_channel
953
956
954 @property
957 @property
955 def hb_channel(self):
958 def hb_channel(self):
956 """Get the heartbeat socket channel object to check that the
959 """Get the heartbeat socket channel object to check that the
957 kernel is alive."""
960 kernel is alive."""
958 if self._hb_channel is None:
961 if self._hb_channel is None:
959 self._hb_channel = self.hb_channel_class(self.context,
962 self._hb_channel = self.hb_channel_class(self.context,
960 self.session,
963 self.session,
961 (self.ip, self.hb_port))
964 (self.ip, self.hb_port))
962 return self._hb_channel
965 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now