##// END OF EJS Templates
close KernelManager channel sockets when they stop...
MinRK -
Show More
@@ -1,983 +1,1000 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Constants and exceptions
47 # Constants and exceptions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 class InvalidPortNumber(Exception):
50 class InvalidPortNumber(Exception):
51 pass
51 pass
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Utility functions
54 # Utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # some utilities to validate message structure, these might get moved elsewhere
57 # some utilities to validate message structure, these might get moved elsewhere
58 # if they prove to have more generic utility
58 # if they prove to have more generic utility
59
59
60 def validate_string_list(lst):
60 def validate_string_list(lst):
61 """Validate that the input is a list of strings.
61 """Validate that the input is a list of strings.
62
62
63 Raises ValueError if not."""
63 Raises ValueError if not."""
64 if not isinstance(lst, list):
64 if not isinstance(lst, list):
65 raise ValueError('input %r must be a list' % lst)
65 raise ValueError('input %r must be a list' % lst)
66 for x in lst:
66 for x in lst:
67 if not isinstance(x, basestring):
67 if not isinstance(x, basestring):
68 raise ValueError('element %r in list must be a string' % x)
68 raise ValueError('element %r in list must be a string' % x)
69
69
70
70
71 def validate_string_dict(dct):
71 def validate_string_dict(dct):
72 """Validate that the input is a dict with string keys and values.
72 """Validate that the input is a dict with string keys and values.
73
73
74 Raises ValueError if not."""
74 Raises ValueError if not."""
75 for k,v in dct.iteritems():
75 for k,v in dct.iteritems():
76 if not isinstance(k, basestring):
76 if not isinstance(k, basestring):
77 raise ValueError('key %r in dict must be a string' % k)
77 raise ValueError('key %r in dict must be a string' % k)
78 if not isinstance(v, basestring):
78 if not isinstance(v, basestring):
79 raise ValueError('value %r in dict must be a string' % v)
79 raise ValueError('value %r in dict must be a string' % v)
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # ZMQ Socket Channel classes
83 # ZMQ Socket Channel classes
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86 class ZMQSocketChannel(Thread):
86 class ZMQSocketChannel(Thread):
87 """The base class for the channels that use ZMQ sockets.
87 """The base class for the channels that use ZMQ sockets.
88 """
88 """
89 context = None
89 context = None
90 session = None
90 session = None
91 socket = None
91 socket = None
92 ioloop = None
92 ioloop = None
93 stream = None
93 stream = None
94 _address = None
94 _address = None
95 _exiting = False
95 _exiting = False
96
96
97 def __init__(self, context, session, address):
97 def __init__(self, context, session, address):
98 """Create a channel
98 """Create a channel
99
99
100 Parameters
100 Parameters
101 ----------
101 ----------
102 context : :class:`zmq.Context`
102 context : :class:`zmq.Context`
103 The ZMQ context to use.
103 The ZMQ context to use.
104 session : :class:`session.Session`
104 session : :class:`session.Session`
105 The session to use.
105 The session to use.
106 address : tuple
106 address : tuple
107 Standard (ip, port) tuple that the kernel is listening on.
107 Standard (ip, port) tuple that the kernel is listening on.
108 """
108 """
109 super(ZMQSocketChannel, self).__init__()
109 super(ZMQSocketChannel, self).__init__()
110 self.daemon = True
110 self.daemon = True
111
111
112 self.context = context
112 self.context = context
113 self.session = session
113 self.session = session
114 if address[1] == 0:
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
116 raise InvalidPortNumber(message)
117 self._address = address
117 self._address = address
118 atexit.register(self._notice_exit)
118 atexit.register(self._notice_exit)
119
119
120 def _notice_exit(self):
120 def _notice_exit(self):
121 self._exiting = True
121 self._exiting = True
122
122
123 def _run_loop(self):
123 def _run_loop(self):
124 """Run my loop, ignoring EINTR events in the poller"""
124 """Run my loop, ignoring EINTR events in the poller"""
125 while True:
125 while True:
126 try:
126 try:
127 self.ioloop.start()
127 self.ioloop.start()
128 except ZMQError as e:
128 except ZMQError as e:
129 if e.errno == errno.EINTR:
129 if e.errno == errno.EINTR:
130 continue
130 continue
131 else:
131 else:
132 raise
132 raise
133 except Exception:
133 except Exception:
134 if self._exiting:
134 if self._exiting:
135 break
135 break
136 else:
136 else:
137 raise
137 raise
138 else:
138 else:
139 break
139 break
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the channel's activity.
142 """Stop the channel's activity.
143
143
144 This calls :method:`Thread.join` and returns when the thread
144 This calls :method:`Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
145 terminates. :class:`RuntimeError` will be raised if
146 :method:`self.start` is called again.
146 :method:`self.start` is called again.
147 """
147 """
148 self.join()
148 self.join()
149
149
150 @property
150 @property
151 def address(self):
151 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
152 """Get the channel's address as an (ip, port) tuple.
153
153
154 By the default, the address is (localhost, 0), where 0 means a random
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
155 port.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """callback for stream.on_recv
174 """callback for stream.on_recv
175
175
176 unpacks message, and calls handlers with it.
176 unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 self.call_handlers(self.session.unserialize(smsg))
179 self.call_handlers(self.session.unserialize(smsg))
180
180
181
181
182
182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellSocketChannel(ZMQSocketChannel):
184 """The XREQ channel for issues request/replies to the kernel.
184 """The XREQ channel for issues request/replies to the kernel.
185 """
185 """
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190
190
191 def __init__(self, context, session, address):
191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellSocketChannel, self).__init__(context, session, address)
193 self.ioloop = ioloop.IOLoop()
193 self.ioloop = ioloop.IOLoop()
194
194
195 def run(self):
195 def run(self):
196 """The thread's main activity. Call start() instead."""
196 """The thread's main activity. Call start() instead."""
197 self.socket = self.context.socket(zmq.DEALER)
197 self.socket = self.context.socket(zmq.DEALER)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
199 self.socket.connect('tcp://%s:%i' % self.address)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 self.stream.on_recv(self._handle_recv)
201 self.stream.on_recv(self._handle_recv)
202 self._run_loop()
202 self._run_loop()
203 try:
204 self.socket.close()
205 except:
206 pass
203
207
204 def stop(self):
208 def stop(self):
205 self.ioloop.stop()
209 self.ioloop.stop()
206 super(ShellSocketChannel, self).stop()
210 super(ShellSocketChannel, self).stop()
207
211
208 def call_handlers(self, msg):
212 def call_handlers(self, msg):
209 """This method is called in the ioloop thread when a message arrives.
213 """This method is called in the ioloop thread when a message arrives.
210
214
211 Subclasses should override this method to handle incoming messages.
215 Subclasses should override this method to handle incoming messages.
212 It is important to remember that this method is called in the thread
216 It is important to remember that this method is called in the thread
213 so that some logic must be done to ensure that the application leve
217 so that some logic must be done to ensure that the application leve
214 handlers are called in the application thread.
218 handlers are called in the application thread.
215 """
219 """
216 raise NotImplementedError('call_handlers must be defined in a subclass.')
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
217
221
218 def execute(self, code, silent=False,
222 def execute(self, code, silent=False,
219 user_variables=None, user_expressions=None, allow_stdin=None):
223 user_variables=None, user_expressions=None, allow_stdin=None):
220 """Execute code in the kernel.
224 """Execute code in the kernel.
221
225
222 Parameters
226 Parameters
223 ----------
227 ----------
224 code : str
228 code : str
225 A string of Python code.
229 A string of Python code.
226
230
227 silent : bool, optional (default False)
231 silent : bool, optional (default False)
228 If set, the kernel will execute the code as quietly possible.
232 If set, the kernel will execute the code as quietly possible.
229
233
230 user_variables : list, optional
234 user_variables : list, optional
231 A list of variable names to pull from the user's namespace. They
235 A list of variable names to pull from the user's namespace. They
232 will come back as a dict with these names as keys and their
236 will come back as a dict with these names as keys and their
233 :func:`repr` as values.
237 :func:`repr` as values.
234
238
235 user_expressions : dict, optional
239 user_expressions : dict, optional
236 A dict with string keys and to pull from the user's
240 A dict with string keys and to pull from the user's
237 namespace. They will come back as a dict with these names as keys
241 namespace. They will come back as a dict with these names as keys
238 and their :func:`repr` as values.
242 and their :func:`repr` as values.
239
243
240 allow_stdin : bool, optional
244 allow_stdin : bool, optional
241 Flag for
245 Flag for
242 A dict with string keys and to pull from the user's
246 A dict with string keys and to pull from the user's
243 namespace. They will come back as a dict with these names as keys
247 namespace. They will come back as a dict with these names as keys
244 and their :func:`repr` as values.
248 and their :func:`repr` as values.
245
249
246 Returns
250 Returns
247 -------
251 -------
248 The msg_id of the message sent.
252 The msg_id of the message sent.
249 """
253 """
250 if user_variables is None:
254 if user_variables is None:
251 user_variables = []
255 user_variables = []
252 if user_expressions is None:
256 if user_expressions is None:
253 user_expressions = {}
257 user_expressions = {}
254 if allow_stdin is None:
258 if allow_stdin is None:
255 allow_stdin = self.allow_stdin
259 allow_stdin = self.allow_stdin
256
260
257
261
258 # Don't waste network traffic if inputs are invalid
262 # Don't waste network traffic if inputs are invalid
259 if not isinstance(code, basestring):
263 if not isinstance(code, basestring):
260 raise ValueError('code %r must be a string' % code)
264 raise ValueError('code %r must be a string' % code)
261 validate_string_list(user_variables)
265 validate_string_list(user_variables)
262 validate_string_dict(user_expressions)
266 validate_string_dict(user_expressions)
263
267
264 # Create class for content/msg creation. Related to, but possibly
268 # Create class for content/msg creation. Related to, but possibly
265 # not in Session.
269 # not in Session.
266 content = dict(code=code, silent=silent,
270 content = dict(code=code, silent=silent,
267 user_variables=user_variables,
271 user_variables=user_variables,
268 user_expressions=user_expressions,
272 user_expressions=user_expressions,
269 allow_stdin=allow_stdin,
273 allow_stdin=allow_stdin,
270 )
274 )
271 msg = self.session.msg('execute_request', content)
275 msg = self.session.msg('execute_request', content)
272 self._queue_send(msg)
276 self._queue_send(msg)
273 return msg['header']['msg_id']
277 return msg['header']['msg_id']
274
278
275 def complete(self, text, line, cursor_pos, block=None):
279 def complete(self, text, line, cursor_pos, block=None):
276 """Tab complete text in the kernel's namespace.
280 """Tab complete text in the kernel's namespace.
277
281
278 Parameters
282 Parameters
279 ----------
283 ----------
280 text : str
284 text : str
281 The text to complete.
285 The text to complete.
282 line : str
286 line : str
283 The full line of text that is the surrounding context for the
287 The full line of text that is the surrounding context for the
284 text to complete.
288 text to complete.
285 cursor_pos : int
289 cursor_pos : int
286 The position of the cursor in the line where the completion was
290 The position of the cursor in the line where the completion was
287 requested.
291 requested.
288 block : str, optional
292 block : str, optional
289 The full block of code in which the completion is being requested.
293 The full block of code in which the completion is being requested.
290
294
291 Returns
295 Returns
292 -------
296 -------
293 The msg_id of the message sent.
297 The msg_id of the message sent.
294 """
298 """
295 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
299 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
296 msg = self.session.msg('complete_request', content)
300 msg = self.session.msg('complete_request', content)
297 self._queue_send(msg)
301 self._queue_send(msg)
298 return msg['header']['msg_id']
302 return msg['header']['msg_id']
299
303
300 def object_info(self, oname, detail_level=0):
304 def object_info(self, oname, detail_level=0):
301 """Get metadata information about an object.
305 """Get metadata information about an object.
302
306
303 Parameters
307 Parameters
304 ----------
308 ----------
305 oname : str
309 oname : str
306 A string specifying the object name.
310 A string specifying the object name.
307 detail_level : int, optional
311 detail_level : int, optional
308 The level of detail for the introspection (0-2)
312 The level of detail for the introspection (0-2)
309
313
310 Returns
314 Returns
311 -------
315 -------
312 The msg_id of the message sent.
316 The msg_id of the message sent.
313 """
317 """
314 content = dict(oname=oname, detail_level=detail_level)
318 content = dict(oname=oname, detail_level=detail_level)
315 msg = self.session.msg('object_info_request', content)
319 msg = self.session.msg('object_info_request', content)
316 self._queue_send(msg)
320 self._queue_send(msg)
317 return msg['header']['msg_id']
321 return msg['header']['msg_id']
318
322
319 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
323 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
320 """Get entries from the history list.
324 """Get entries from the history list.
321
325
322 Parameters
326 Parameters
323 ----------
327 ----------
324 raw : bool
328 raw : bool
325 If True, return the raw input.
329 If True, return the raw input.
326 output : bool
330 output : bool
327 If True, then return the output as well.
331 If True, then return the output as well.
328 hist_access_type : str
332 hist_access_type : str
329 'range' (fill in session, start and stop params), 'tail' (fill in n)
333 'range' (fill in session, start and stop params), 'tail' (fill in n)
330 or 'search' (fill in pattern param).
334 or 'search' (fill in pattern param).
331
335
332 session : int
336 session : int
333 For a range request, the session from which to get lines. Session
337 For a range request, the session from which to get lines. Session
334 numbers are positive integers; negative ones count back from the
338 numbers are positive integers; negative ones count back from the
335 current session.
339 current session.
336 start : int
340 start : int
337 The first line number of a history range.
341 The first line number of a history range.
338 stop : int
342 stop : int
339 The final (excluded) line number of a history range.
343 The final (excluded) line number of a history range.
340
344
341 n : int
345 n : int
342 The number of lines of history to get for a tail request.
346 The number of lines of history to get for a tail request.
343
347
344 pattern : str
348 pattern : str
345 The glob-syntax pattern for a search request.
349 The glob-syntax pattern for a search request.
346
350
347 Returns
351 Returns
348 -------
352 -------
349 The msg_id of the message sent.
353 The msg_id of the message sent.
350 """
354 """
351 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
355 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
352 **kwargs)
356 **kwargs)
353 msg = self.session.msg('history_request', content)
357 msg = self.session.msg('history_request', content)
354 self._queue_send(msg)
358 self._queue_send(msg)
355 return msg['header']['msg_id']
359 return msg['header']['msg_id']
356
360
357 def shutdown(self, restart=False):
361 def shutdown(self, restart=False):
358 """Request an immediate kernel shutdown.
362 """Request an immediate kernel shutdown.
359
363
360 Upon receipt of the (empty) reply, client code can safely assume that
364 Upon receipt of the (empty) reply, client code can safely assume that
361 the kernel has shut down and it's safe to forcefully terminate it if
365 the kernel has shut down and it's safe to forcefully terminate it if
362 it's still alive.
366 it's still alive.
363
367
364 The kernel will send the reply via a function registered with Python's
368 The kernel will send the reply via a function registered with Python's
365 atexit module, ensuring it's truly done as the kernel is done with all
369 atexit module, ensuring it's truly done as the kernel is done with all
366 normal operation.
370 normal operation.
367 """
371 """
368 # Send quit message to kernel. Once we implement kernel-side setattr,
372 # Send quit message to kernel. Once we implement kernel-side setattr,
369 # this should probably be done that way, but for now this will do.
373 # this should probably be done that way, but for now this will do.
370 msg = self.session.msg('shutdown_request', {'restart':restart})
374 msg = self.session.msg('shutdown_request', {'restart':restart})
371 self._queue_send(msg)
375 self._queue_send(msg)
372 return msg['header']['msg_id']
376 return msg['header']['msg_id']
373
377
374
378
375
379
376 class SubSocketChannel(ZMQSocketChannel):
380 class SubSocketChannel(ZMQSocketChannel):
377 """The SUB channel which listens for messages that the kernel publishes.
381 """The SUB channel which listens for messages that the kernel publishes.
378 """
382 """
379
383
380 def __init__(self, context, session, address):
384 def __init__(self, context, session, address):
381 super(SubSocketChannel, self).__init__(context, session, address)
385 super(SubSocketChannel, self).__init__(context, session, address)
382 self.ioloop = ioloop.IOLoop()
386 self.ioloop = ioloop.IOLoop()
383
387
384 def run(self):
388 def run(self):
385 """The thread's main activity. Call start() instead."""
389 """The thread's main activity. Call start() instead."""
386 self.socket = self.context.socket(zmq.SUB)
390 self.socket = self.context.socket(zmq.SUB)
387 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
391 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
388 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
392 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
389 self.socket.connect('tcp://%s:%i' % self.address)
393 self.socket.connect('tcp://%s:%i' % self.address)
390 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
394 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
391 self.stream.on_recv(self._handle_recv)
395 self.stream.on_recv(self._handle_recv)
392 self._run_loop()
396 self._run_loop()
397 try:
398 self.socket.close()
399 except:
400 pass
393
401
394 def stop(self):
402 def stop(self):
395 self.ioloop.stop()
403 self.ioloop.stop()
396 super(SubSocketChannel, self).stop()
404 super(SubSocketChannel, self).stop()
397
405
398 def call_handlers(self, msg):
406 def call_handlers(self, msg):
399 """This method is called in the ioloop thread when a message arrives.
407 """This method is called in the ioloop thread when a message arrives.
400
408
401 Subclasses should override this method to handle incoming messages.
409 Subclasses should override this method to handle incoming messages.
402 It is important to remember that this method is called in the thread
410 It is important to remember that this method is called in the thread
403 so that some logic must be done to ensure that the application leve
411 so that some logic must be done to ensure that the application leve
404 handlers are called in the application thread.
412 handlers are called in the application thread.
405 """
413 """
406 raise NotImplementedError('call_handlers must be defined in a subclass.')
414 raise NotImplementedError('call_handlers must be defined in a subclass.')
407
415
408 def flush(self, timeout=1.0):
416 def flush(self, timeout=1.0):
409 """Immediately processes all pending messages on the SUB channel.
417 """Immediately processes all pending messages on the SUB channel.
410
418
411 Callers should use this method to ensure that :method:`call_handlers`
419 Callers should use this method to ensure that :method:`call_handlers`
412 has been called for all messages that have been received on the
420 has been called for all messages that have been received on the
413 0MQ SUB socket of this channel.
421 0MQ SUB socket of this channel.
414
422
415 This method is thread safe.
423 This method is thread safe.
416
424
417 Parameters
425 Parameters
418 ----------
426 ----------
419 timeout : float, optional
427 timeout : float, optional
420 The maximum amount of time to spend flushing, in seconds. The
428 The maximum amount of time to spend flushing, in seconds. The
421 default is one second.
429 default is one second.
422 """
430 """
423 # We do the IOLoop callback process twice to ensure that the IOLoop
431 # We do the IOLoop callback process twice to ensure that the IOLoop
424 # gets to perform at least one full poll.
432 # gets to perform at least one full poll.
425 stop_time = time.time() + timeout
433 stop_time = time.time() + timeout
426 for i in xrange(2):
434 for i in xrange(2):
427 self._flushed = False
435 self._flushed = False
428 self.ioloop.add_callback(self._flush)
436 self.ioloop.add_callback(self._flush)
429 while not self._flushed and time.time() < stop_time:
437 while not self._flushed and time.time() < stop_time:
430 time.sleep(0.01)
438 time.sleep(0.01)
431
439
432 def _flush(self):
440 def _flush(self):
433 """Callback for :method:`self.flush`."""
441 """Callback for :method:`self.flush`."""
434 self.stream.flush()
442 self.stream.flush()
435 self._flushed = True
443 self._flushed = True
436
444
437
445
438 class StdInSocketChannel(ZMQSocketChannel):
446 class StdInSocketChannel(ZMQSocketChannel):
439 """A reply channel to handle raw_input requests that the kernel makes."""
447 """A reply channel to handle raw_input requests that the kernel makes."""
440
448
441 msg_queue = None
449 msg_queue = None
442
450
443 def __init__(self, context, session, address):
451 def __init__(self, context, session, address):
444 super(StdInSocketChannel, self).__init__(context, session, address)
452 super(StdInSocketChannel, self).__init__(context, session, address)
445 self.ioloop = ioloop.IOLoop()
453 self.ioloop = ioloop.IOLoop()
446
454
447 def run(self):
455 def run(self):
448 """The thread's main activity. Call start() instead."""
456 """The thread's main activity. Call start() instead."""
449 self.socket = self.context.socket(zmq.DEALER)
457 self.socket = self.context.socket(zmq.DEALER)
450 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
458 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
451 self.socket.connect('tcp://%s:%i' % self.address)
459 self.socket.connect('tcp://%s:%i' % self.address)
452 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
460 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
453 self.stream.on_recv(self._handle_recv)
461 self.stream.on_recv(self._handle_recv)
454 self._run_loop()
462 self._run_loop()
463 try:
464 self.socket.close()
465 except:
466 pass
467
455
468
456 def stop(self):
469 def stop(self):
457 self.ioloop.stop()
470 self.ioloop.stop()
458 super(StdInSocketChannel, self).stop()
471 super(StdInSocketChannel, self).stop()
459
472
460 def call_handlers(self, msg):
473 def call_handlers(self, msg):
461 """This method is called in the ioloop thread when a message arrives.
474 """This method is called in the ioloop thread when a message arrives.
462
475
463 Subclasses should override this method to handle incoming messages.
476 Subclasses should override this method to handle incoming messages.
464 It is important to remember that this method is called in the thread
477 It is important to remember that this method is called in the thread
465 so that some logic must be done to ensure that the application leve
478 so that some logic must be done to ensure that the application leve
466 handlers are called in the application thread.
479 handlers are called in the application thread.
467 """
480 """
468 raise NotImplementedError('call_handlers must be defined in a subclass.')
481 raise NotImplementedError('call_handlers must be defined in a subclass.')
469
482
470 def input(self, string):
483 def input(self, string):
471 """Send a string of raw input to the kernel."""
484 """Send a string of raw input to the kernel."""
472 content = dict(value=string)
485 content = dict(value=string)
473 msg = self.session.msg('input_reply', content)
486 msg = self.session.msg('input_reply', content)
474 self._queue_send(msg)
487 self._queue_send(msg)
475
488
476
489
477 class HBSocketChannel(ZMQSocketChannel):
490 class HBSocketChannel(ZMQSocketChannel):
478 """The heartbeat channel which monitors the kernel heartbeat.
491 """The heartbeat channel which monitors the kernel heartbeat.
479
492
480 Note that the heartbeat channel is paused by default. As long as you start
493 Note that the heartbeat channel is paused by default. As long as you start
481 this channel, the kernel manager will ensure that it is paused and un-paused
494 this channel, the kernel manager will ensure that it is paused and un-paused
482 as appropriate.
495 as appropriate.
483 """
496 """
484
497
485 time_to_dead = 3.0
498 time_to_dead = 3.0
486 socket = None
499 socket = None
487 poller = None
500 poller = None
488 _running = None
501 _running = None
489 _pause = None
502 _pause = None
490 _beating = None
503 _beating = None
491
504
492 def __init__(self, context, session, address):
505 def __init__(self, context, session, address):
493 super(HBSocketChannel, self).__init__(context, session, address)
506 super(HBSocketChannel, self).__init__(context, session, address)
494 self._running = False
507 self._running = False
495 self._pause =True
508 self._pause =True
496 self.poller = zmq.Poller()
509 self.poller = zmq.Poller()
497
510
498 def _create_socket(self):
511 def _create_socket(self):
499 if self.socket is not None:
512 if self.socket is not None:
500 # close previous socket, before opening a new one
513 # close previous socket, before opening a new one
501 self.poller.unregister(self.socket)
514 self.poller.unregister(self.socket)
502 self.socket.close()
515 self.socket.close()
503 self.socket = self.context.socket(zmq.REQ)
516 self.socket = self.context.socket(zmq.REQ)
504 self.socket.setsockopt(zmq.LINGER, 0)
517 self.socket.setsockopt(zmq.LINGER, 0)
505 self.socket.connect('tcp://%s:%i' % self.address)
518 self.socket.connect('tcp://%s:%i' % self.address)
506
519
507 self.poller.register(self.socket, zmq.POLLIN)
520 self.poller.register(self.socket, zmq.POLLIN)
508
521
509 def _poll(self, start_time):
522 def _poll(self, start_time):
510 """poll for heartbeat replies until we reach self.time_to_dead
523 """poll for heartbeat replies until we reach self.time_to_dead
511
524
512 Ignores interrupts, and returns the result of poll(), which
525 Ignores interrupts, and returns the result of poll(), which
513 will be an empty list if no messages arrived before the timeout,
526 will be an empty list if no messages arrived before the timeout,
514 or the event tuple if there is a message to receive.
527 or the event tuple if there is a message to receive.
515 """
528 """
516
529
517 until_dead = self.time_to_dead - (time.time() - start_time)
530 until_dead = self.time_to_dead - (time.time() - start_time)
518 # ensure poll at least once
531 # ensure poll at least once
519 until_dead = max(until_dead, 1e-3)
532 until_dead = max(until_dead, 1e-3)
520 events = []
533 events = []
521 while True:
534 while True:
522 try:
535 try:
523 events = self.poller.poll(1000 * until_dead)
536 events = self.poller.poll(1000 * until_dead)
524 except ZMQError as e:
537 except ZMQError as e:
525 if e.errno == errno.EINTR:
538 if e.errno == errno.EINTR:
526 # ignore interrupts during heartbeat
539 # ignore interrupts during heartbeat
527 # this may never actually happen
540 # this may never actually happen
528 until_dead = self.time_to_dead - (time.time() - start_time)
541 until_dead = self.time_to_dead - (time.time() - start_time)
529 until_dead = max(until_dead, 1e-3)
542 until_dead = max(until_dead, 1e-3)
530 pass
543 pass
531 else:
544 else:
532 raise
545 raise
533 except Exception:
546 except Exception:
534 if self._exiting:
547 if self._exiting:
535 break
548 break
536 else:
549 else:
537 raise
550 raise
538 else:
551 else:
539 break
552 break
540 return events
553 return events
541
554
542 def run(self):
555 def run(self):
543 """The thread's main activity. Call start() instead."""
556 """The thread's main activity. Call start() instead."""
544 self._create_socket()
557 self._create_socket()
545 self._running = True
558 self._running = True
546 self._beating = True
559 self._beating = True
547
560
548 while self._running:
561 while self._running:
549 if self._pause:
562 if self._pause:
550 # just sleep, and skip the rest of the loop
563 # just sleep, and skip the rest of the loop
551 time.sleep(self.time_to_dead)
564 time.sleep(self.time_to_dead)
552 continue
565 continue
553
566
554 since_last_heartbeat = 0.0
567 since_last_heartbeat = 0.0
555 # io.rprint('Ping from HB channel') # dbg
568 # io.rprint('Ping from HB channel') # dbg
556 # no need to catch EFSM here, because the previous event was
569 # no need to catch EFSM here, because the previous event was
557 # either a recv or connect, which cannot be followed by EFSM
570 # either a recv or connect, which cannot be followed by EFSM
558 self.socket.send(b'ping')
571 self.socket.send(b'ping')
559 request_time = time.time()
572 request_time = time.time()
560 ready = self._poll(request_time)
573 ready = self._poll(request_time)
561 if ready:
574 if ready:
562 self._beating = True
575 self._beating = True
563 # the poll above guarantees we have something to recv
576 # the poll above guarantees we have something to recv
564 self.socket.recv()
577 self.socket.recv()
565 # sleep the remainder of the cycle
578 # sleep the remainder of the cycle
566 remainder = self.time_to_dead - (time.time() - request_time)
579 remainder = self.time_to_dead - (time.time() - request_time)
567 if remainder > 0:
580 if remainder > 0:
568 time.sleep(remainder)
581 time.sleep(remainder)
569 continue
582 continue
570 else:
583 else:
571 # nothing was received within the time limit, signal heart failure
584 # nothing was received within the time limit, signal heart failure
572 self._beating = False
585 self._beating = False
573 since_last_heartbeat = time.time() - request_time
586 since_last_heartbeat = time.time() - request_time
574 self.call_handlers(since_last_heartbeat)
587 self.call_handlers(since_last_heartbeat)
575 # and close/reopen the socket, because the REQ/REP cycle has been broken
588 # and close/reopen the socket, because the REQ/REP cycle has been broken
576 self._create_socket()
589 self._create_socket()
577 continue
590 continue
591 try:
592 self.socket.close()
593 except:
594 pass
578
595
579 def pause(self):
596 def pause(self):
580 """Pause the heartbeat."""
597 """Pause the heartbeat."""
581 self._pause = True
598 self._pause = True
582
599
583 def unpause(self):
600 def unpause(self):
584 """Unpause the heartbeat."""
601 """Unpause the heartbeat."""
585 self._pause = False
602 self._pause = False
586
603
587 def is_beating(self):
604 def is_beating(self):
588 """Is the heartbeat running and responsive (and not paused)."""
605 """Is the heartbeat running and responsive (and not paused)."""
589 if self.is_alive() and not self._pause and self._beating:
606 if self.is_alive() and not self._pause and self._beating:
590 return True
607 return True
591 else:
608 else:
592 return False
609 return False
593
610
594 def stop(self):
611 def stop(self):
595 self._running = False
612 self._running = False
596 super(HBSocketChannel, self).stop()
613 super(HBSocketChannel, self).stop()
597
614
598 def call_handlers(self, since_last_heartbeat):
615 def call_handlers(self, since_last_heartbeat):
599 """This method is called in the ioloop thread when a message arrives.
616 """This method is called in the ioloop thread when a message arrives.
600
617
601 Subclasses should override this method to handle incoming messages.
618 Subclasses should override this method to handle incoming messages.
602 It is important to remember that this method is called in the thread
619 It is important to remember that this method is called in the thread
603 so that some logic must be done to ensure that the application level
620 so that some logic must be done to ensure that the application level
604 handlers are called in the application thread.
621 handlers are called in the application thread.
605 """
622 """
606 raise NotImplementedError('call_handlers must be defined in a subclass.')
623 raise NotImplementedError('call_handlers must be defined in a subclass.')
607
624
608
625
609 #-----------------------------------------------------------------------------
626 #-----------------------------------------------------------------------------
610 # Main kernel manager class
627 # Main kernel manager class
611 #-----------------------------------------------------------------------------
628 #-----------------------------------------------------------------------------
612
629
613 class KernelManager(HasTraits):
630 class KernelManager(HasTraits):
614 """ Manages a kernel for a frontend.
631 """ Manages a kernel for a frontend.
615
632
616 The SUB channel is for the frontend to receive messages published by the
633 The SUB channel is for the frontend to receive messages published by the
617 kernel.
634 kernel.
618
635
619 The REQ channel is for the frontend to make requests of the kernel.
636 The REQ channel is for the frontend to make requests of the kernel.
620
637
621 The REP channel is for the kernel to request stdin (raw_input) from the
638 The REP channel is for the kernel to request stdin (raw_input) from the
622 frontend.
639 frontend.
623 """
640 """
624 # config object for passing to child configurables
641 # config object for passing to child configurables
625 config = Instance(Config)
642 config = Instance(Config)
626
643
627 # The PyZMQ Context to use for communication with the kernel.
644 # The PyZMQ Context to use for communication with the kernel.
628 context = Instance(zmq.Context)
645 context = Instance(zmq.Context)
629 def _context_default(self):
646 def _context_default(self):
630 return zmq.Context.instance()
647 return zmq.Context.instance()
631
648
632 # The Session to use for communication with the kernel.
649 # The Session to use for communication with the kernel.
633 session = Instance(Session)
650 session = Instance(Session)
634
651
635 # The kernel process with which the KernelManager is communicating.
652 # The kernel process with which the KernelManager is communicating.
636 kernel = Instance(Popen)
653 kernel = Instance(Popen)
637
654
638 # The addresses for the communication channels.
655 # The addresses for the communication channels.
639 connection_file = Unicode('')
656 connection_file = Unicode('')
640 ip = Unicode(LOCALHOST)
657 ip = Unicode(LOCALHOST)
641 def _ip_changed(self, name, old, new):
658 def _ip_changed(self, name, old, new):
642 if new == '*':
659 if new == '*':
643 self.ip = '0.0.0.0'
660 self.ip = '0.0.0.0'
644 shell_port = Integer(0)
661 shell_port = Integer(0)
645 iopub_port = Integer(0)
662 iopub_port = Integer(0)
646 stdin_port = Integer(0)
663 stdin_port = Integer(0)
647 hb_port = Integer(0)
664 hb_port = Integer(0)
648
665
649 # The classes to use for the various channels.
666 # The classes to use for the various channels.
650 shell_channel_class = Type(ShellSocketChannel)
667 shell_channel_class = Type(ShellSocketChannel)
651 sub_channel_class = Type(SubSocketChannel)
668 sub_channel_class = Type(SubSocketChannel)
652 stdin_channel_class = Type(StdInSocketChannel)
669 stdin_channel_class = Type(StdInSocketChannel)
653 hb_channel_class = Type(HBSocketChannel)
670 hb_channel_class = Type(HBSocketChannel)
654
671
655 # Protected traits.
672 # Protected traits.
656 _launch_args = Any
673 _launch_args = Any
657 _shell_channel = Any
674 _shell_channel = Any
658 _sub_channel = Any
675 _sub_channel = Any
659 _stdin_channel = Any
676 _stdin_channel = Any
660 _hb_channel = Any
677 _hb_channel = Any
661 _connection_file_written=Bool(False)
678 _connection_file_written=Bool(False)
662
679
663 def __init__(self, **kwargs):
680 def __init__(self, **kwargs):
664 super(KernelManager, self).__init__(**kwargs)
681 super(KernelManager, self).__init__(**kwargs)
665 if self.session is None:
682 if self.session is None:
666 self.session = Session(config=self.config)
683 self.session = Session(config=self.config)
667
684
668 def __del__(self):
685 def __del__(self):
669 self.cleanup_connection_file()
686 self.cleanup_connection_file()
670
687
671
688
672 #--------------------------------------------------------------------------
689 #--------------------------------------------------------------------------
673 # Channel management methods:
690 # Channel management methods:
674 #--------------------------------------------------------------------------
691 #--------------------------------------------------------------------------
675
692
676 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
693 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
677 """Starts the channels for this kernel.
694 """Starts the channels for this kernel.
678
695
679 This will create the channels if they do not exist and then start
696 This will create the channels if they do not exist and then start
680 them. If port numbers of 0 are being used (random ports) then you
697 them. If port numbers of 0 are being used (random ports) then you
681 must first call :method:`start_kernel`. If the channels have been
698 must first call :method:`start_kernel`. If the channels have been
682 stopped and you call this, :class:`RuntimeError` will be raised.
699 stopped and you call this, :class:`RuntimeError` will be raised.
683 """
700 """
684 if shell:
701 if shell:
685 self.shell_channel.start()
702 self.shell_channel.start()
686 if sub:
703 if sub:
687 self.sub_channel.start()
704 self.sub_channel.start()
688 if stdin:
705 if stdin:
689 self.stdin_channel.start()
706 self.stdin_channel.start()
690 self.shell_channel.allow_stdin = True
707 self.shell_channel.allow_stdin = True
691 else:
708 else:
692 self.shell_channel.allow_stdin = False
709 self.shell_channel.allow_stdin = False
693 if hb:
710 if hb:
694 self.hb_channel.start()
711 self.hb_channel.start()
695
712
696 def stop_channels(self):
713 def stop_channels(self):
697 """Stops all the running channels for this kernel.
714 """Stops all the running channels for this kernel.
698 """
715 """
699 if self.shell_channel.is_alive():
716 if self.shell_channel.is_alive():
700 self.shell_channel.stop()
717 self.shell_channel.stop()
701 if self.sub_channel.is_alive():
718 if self.sub_channel.is_alive():
702 self.sub_channel.stop()
719 self.sub_channel.stop()
703 if self.stdin_channel.is_alive():
720 if self.stdin_channel.is_alive():
704 self.stdin_channel.stop()
721 self.stdin_channel.stop()
705 if self.hb_channel.is_alive():
722 if self.hb_channel.is_alive():
706 self.hb_channel.stop()
723 self.hb_channel.stop()
707
724
708 @property
725 @property
709 def channels_running(self):
726 def channels_running(self):
710 """Are any of the channels created and running?"""
727 """Are any of the channels created and running?"""
711 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
728 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
712 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
729 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
713
730
714 #--------------------------------------------------------------------------
731 #--------------------------------------------------------------------------
715 # Kernel process management methods:
732 # Kernel process management methods:
716 #--------------------------------------------------------------------------
733 #--------------------------------------------------------------------------
717
734
718 def cleanup_connection_file(self):
735 def cleanup_connection_file(self):
719 """cleanup connection file *if we wrote it*
736 """cleanup connection file *if we wrote it*
720
737
721 Will not raise if the connection file was already removed somehow.
738 Will not raise if the connection file was already removed somehow.
722 """
739 """
723 if self._connection_file_written:
740 if self._connection_file_written:
724 # cleanup connection files on full shutdown of kernel we started
741 # cleanup connection files on full shutdown of kernel we started
725 self._connection_file_written = False
742 self._connection_file_written = False
726 try:
743 try:
727 os.remove(self.connection_file)
744 os.remove(self.connection_file)
728 except OSError:
745 except OSError:
729 pass
746 pass
730
747
731 def load_connection_file(self):
748 def load_connection_file(self):
732 """load connection info from JSON dict in self.connection_file"""
749 """load connection info from JSON dict in self.connection_file"""
733 with open(self.connection_file) as f:
750 with open(self.connection_file) as f:
734 cfg = json.loads(f.read())
751 cfg = json.loads(f.read())
735
752
736 self.ip = cfg['ip']
753 self.ip = cfg['ip']
737 self.shell_port = cfg['shell_port']
754 self.shell_port = cfg['shell_port']
738 self.stdin_port = cfg['stdin_port']
755 self.stdin_port = cfg['stdin_port']
739 self.iopub_port = cfg['iopub_port']
756 self.iopub_port = cfg['iopub_port']
740 self.hb_port = cfg['hb_port']
757 self.hb_port = cfg['hb_port']
741 self.session.key = str_to_bytes(cfg['key'])
758 self.session.key = str_to_bytes(cfg['key'])
742
759
743 def write_connection_file(self):
760 def write_connection_file(self):
744 """write connection info to JSON dict in self.connection_file"""
761 """write connection info to JSON dict in self.connection_file"""
745 if self._connection_file_written:
762 if self._connection_file_written:
746 return
763 return
747 self.connection_file,cfg = write_connection_file(self.connection_file,
764 self.connection_file,cfg = write_connection_file(self.connection_file,
748 ip=self.ip, key=self.session.key,
765 ip=self.ip, key=self.session.key,
749 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
766 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
750 shell_port=self.shell_port, hb_port=self.hb_port)
767 shell_port=self.shell_port, hb_port=self.hb_port)
751 # write_connection_file also sets default ports:
768 # write_connection_file also sets default ports:
752 self.shell_port = cfg['shell_port']
769 self.shell_port = cfg['shell_port']
753 self.stdin_port = cfg['stdin_port']
770 self.stdin_port = cfg['stdin_port']
754 self.iopub_port = cfg['iopub_port']
771 self.iopub_port = cfg['iopub_port']
755 self.hb_port = cfg['hb_port']
772 self.hb_port = cfg['hb_port']
756
773
757 self._connection_file_written = True
774 self._connection_file_written = True
758
775
759 def start_kernel(self, **kw):
776 def start_kernel(self, **kw):
760 """Starts a kernel process and configures the manager to use it.
777 """Starts a kernel process and configures the manager to use it.
761
778
762 If random ports (port=0) are being used, this method must be called
779 If random ports (port=0) are being used, this method must be called
763 before the channels are created.
780 before the channels are created.
764
781
765 Parameters:
782 Parameters:
766 -----------
783 -----------
767 ipython : bool, optional (default True)
784 ipython : bool, optional (default True)
768 Whether to use an IPython kernel instead of a plain Python kernel.
785 Whether to use an IPython kernel instead of a plain Python kernel.
769
786
770 launcher : callable, optional (default None)
787 launcher : callable, optional (default None)
771 A custom function for launching the kernel process (generally a
788 A custom function for launching the kernel process (generally a
772 wrapper around ``entry_point.base_launch_kernel``). In most cases,
789 wrapper around ``entry_point.base_launch_kernel``). In most cases,
773 it should not be necessary to use this parameter.
790 it should not be necessary to use this parameter.
774
791
775 **kw : optional
792 **kw : optional
776 See respective options for IPython and Python kernels.
793 See respective options for IPython and Python kernels.
777 """
794 """
778 if self.ip not in LOCAL_IPS:
795 if self.ip not in LOCAL_IPS:
779 raise RuntimeError("Can only launch a kernel on a local interface. "
796 raise RuntimeError("Can only launch a kernel on a local interface. "
780 "Make sure that the '*_address' attributes are "
797 "Make sure that the '*_address' attributes are "
781 "configured properly. "
798 "configured properly. "
782 "Currently valid addresses are: %s"%LOCAL_IPS
799 "Currently valid addresses are: %s"%LOCAL_IPS
783 )
800 )
784
801
785 # write connection file / get default ports
802 # write connection file / get default ports
786 self.write_connection_file()
803 self.write_connection_file()
787
804
788 self._launch_args = kw.copy()
805 self._launch_args = kw.copy()
789 launch_kernel = kw.pop('launcher', None)
806 launch_kernel = kw.pop('launcher', None)
790 if launch_kernel is None:
807 if launch_kernel is None:
791 if kw.pop('ipython', True):
808 if kw.pop('ipython', True):
792 from ipkernel import launch_kernel
809 from ipkernel import launch_kernel
793 else:
810 else:
794 from pykernel import launch_kernel
811 from pykernel import launch_kernel
795 self.kernel = launch_kernel(fname=self.connection_file, **kw)
812 self.kernel = launch_kernel(fname=self.connection_file, **kw)
796
813
797 def shutdown_kernel(self, restart=False):
814 def shutdown_kernel(self, restart=False):
798 """ Attempts to the stop the kernel process cleanly. If the kernel
815 """ Attempts to the stop the kernel process cleanly. If the kernel
799 cannot be stopped, it is killed, if possible.
816 cannot be stopped, it is killed, if possible.
800 """
817 """
801 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
818 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
802 if sys.platform == 'win32':
819 if sys.platform == 'win32':
803 self.kill_kernel()
820 self.kill_kernel()
804 return
821 return
805
822
806 # Pause the heart beat channel if it exists.
823 # Pause the heart beat channel if it exists.
807 if self._hb_channel is not None:
824 if self._hb_channel is not None:
808 self._hb_channel.pause()
825 self._hb_channel.pause()
809
826
810 # Don't send any additional kernel kill messages immediately, to give
827 # Don't send any additional kernel kill messages immediately, to give
811 # the kernel a chance to properly execute shutdown actions. Wait for at
828 # the kernel a chance to properly execute shutdown actions. Wait for at
812 # most 1s, checking every 0.1s.
829 # most 1s, checking every 0.1s.
813 self.shell_channel.shutdown(restart=restart)
830 self.shell_channel.shutdown(restart=restart)
814 for i in range(10):
831 for i in range(10):
815 if self.is_alive:
832 if self.is_alive:
816 time.sleep(0.1)
833 time.sleep(0.1)
817 else:
834 else:
818 break
835 break
819 else:
836 else:
820 # OK, we've waited long enough.
837 # OK, we've waited long enough.
821 if self.has_kernel:
838 if self.has_kernel:
822 self.kill_kernel()
839 self.kill_kernel()
823
840
824 if not restart and self._connection_file_written:
841 if not restart and self._connection_file_written:
825 # cleanup connection files on full shutdown of kernel we started
842 # cleanup connection files on full shutdown of kernel we started
826 self._connection_file_written = False
843 self._connection_file_written = False
827 try:
844 try:
828 os.remove(self.connection_file)
845 os.remove(self.connection_file)
829 except IOError:
846 except IOError:
830 pass
847 pass
831
848
832 def restart_kernel(self, now=False, **kw):
849 def restart_kernel(self, now=False, **kw):
833 """Restarts a kernel with the arguments that were used to launch it.
850 """Restarts a kernel with the arguments that were used to launch it.
834
851
835 If the old kernel was launched with random ports, the same ports will be
852 If the old kernel was launched with random ports, the same ports will be
836 used for the new kernel.
853 used for the new kernel.
837
854
838 Parameters
855 Parameters
839 ----------
856 ----------
840 now : bool, optional
857 now : bool, optional
841 If True, the kernel is forcefully restarted *immediately*, without
858 If True, the kernel is forcefully restarted *immediately*, without
842 having a chance to do any cleanup action. Otherwise the kernel is
859 having a chance to do any cleanup action. Otherwise the kernel is
843 given 1s to clean up before a forceful restart is issued.
860 given 1s to clean up before a forceful restart is issued.
844
861
845 In all cases the kernel is restarted, the only difference is whether
862 In all cases the kernel is restarted, the only difference is whether
846 it is given a chance to perform a clean shutdown or not.
863 it is given a chance to perform a clean shutdown or not.
847
864
848 **kw : optional
865 **kw : optional
849 Any options specified here will replace those used to launch the
866 Any options specified here will replace those used to launch the
850 kernel.
867 kernel.
851 """
868 """
852 if self._launch_args is None:
869 if self._launch_args is None:
853 raise RuntimeError("Cannot restart the kernel. "
870 raise RuntimeError("Cannot restart the kernel. "
854 "No previous call to 'start_kernel'.")
871 "No previous call to 'start_kernel'.")
855 else:
872 else:
856 # Stop currently running kernel.
873 # Stop currently running kernel.
857 if self.has_kernel:
874 if self.has_kernel:
858 if now:
875 if now:
859 self.kill_kernel()
876 self.kill_kernel()
860 else:
877 else:
861 self.shutdown_kernel(restart=True)
878 self.shutdown_kernel(restart=True)
862
879
863 # Start new kernel.
880 # Start new kernel.
864 self._launch_args.update(kw)
881 self._launch_args.update(kw)
865 self.start_kernel(**self._launch_args)
882 self.start_kernel(**self._launch_args)
866
883
867 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
884 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
868 # unless there is some delay here.
885 # unless there is some delay here.
869 if sys.platform == 'win32':
886 if sys.platform == 'win32':
870 time.sleep(0.2)
887 time.sleep(0.2)
871
888
872 @property
889 @property
873 def has_kernel(self):
890 def has_kernel(self):
874 """Returns whether a kernel process has been specified for the kernel
891 """Returns whether a kernel process has been specified for the kernel
875 manager.
892 manager.
876 """
893 """
877 return self.kernel is not None
894 return self.kernel is not None
878
895
879 def kill_kernel(self):
896 def kill_kernel(self):
880 """ Kill the running kernel. """
897 """ Kill the running kernel. """
881 if self.has_kernel:
898 if self.has_kernel:
882 # Pause the heart beat channel if it exists.
899 # Pause the heart beat channel if it exists.
883 if self._hb_channel is not None:
900 if self._hb_channel is not None:
884 self._hb_channel.pause()
901 self._hb_channel.pause()
885
902
886 # Attempt to kill the kernel.
903 # Attempt to kill the kernel.
887 try:
904 try:
888 self.kernel.kill()
905 self.kernel.kill()
889 except OSError, e:
906 except OSError, e:
890 # In Windows, we will get an Access Denied error if the process
907 # In Windows, we will get an Access Denied error if the process
891 # has already terminated. Ignore it.
908 # has already terminated. Ignore it.
892 if sys.platform == 'win32':
909 if sys.platform == 'win32':
893 if e.winerror != 5:
910 if e.winerror != 5:
894 raise
911 raise
895 # On Unix, we may get an ESRCH error if the process has already
912 # On Unix, we may get an ESRCH error if the process has already
896 # terminated. Ignore it.
913 # terminated. Ignore it.
897 else:
914 else:
898 from errno import ESRCH
915 from errno import ESRCH
899 if e.errno != ESRCH:
916 if e.errno != ESRCH:
900 raise
917 raise
901 self.kernel = None
918 self.kernel = None
902 else:
919 else:
903 raise RuntimeError("Cannot kill kernel. No kernel is running!")
920 raise RuntimeError("Cannot kill kernel. No kernel is running!")
904
921
905 def interrupt_kernel(self):
922 def interrupt_kernel(self):
906 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
923 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
907 well supported on all platforms.
924 well supported on all platforms.
908 """
925 """
909 if self.has_kernel:
926 if self.has_kernel:
910 if sys.platform == 'win32':
927 if sys.platform == 'win32':
911 from parentpoller import ParentPollerWindows as Poller
928 from parentpoller import ParentPollerWindows as Poller
912 Poller.send_interrupt(self.kernel.win32_interrupt_event)
929 Poller.send_interrupt(self.kernel.win32_interrupt_event)
913 else:
930 else:
914 self.kernel.send_signal(signal.SIGINT)
931 self.kernel.send_signal(signal.SIGINT)
915 else:
932 else:
916 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
933 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
917
934
918 def signal_kernel(self, signum):
935 def signal_kernel(self, signum):
919 """ Sends a signal to the kernel. Note that since only SIGTERM is
936 """ Sends a signal to the kernel. Note that since only SIGTERM is
920 supported on Windows, this function is only useful on Unix systems.
937 supported on Windows, this function is only useful on Unix systems.
921 """
938 """
922 if self.has_kernel:
939 if self.has_kernel:
923 self.kernel.send_signal(signum)
940 self.kernel.send_signal(signum)
924 else:
941 else:
925 raise RuntimeError("Cannot signal kernel. No kernel is running!")
942 raise RuntimeError("Cannot signal kernel. No kernel is running!")
926
943
927 @property
944 @property
928 def is_alive(self):
945 def is_alive(self):
929 """Is the kernel process still running?"""
946 """Is the kernel process still running?"""
930 if self.has_kernel:
947 if self.has_kernel:
931 if self.kernel.poll() is None:
948 if self.kernel.poll() is None:
932 return True
949 return True
933 else:
950 else:
934 return False
951 return False
935 elif self._hb_channel is not None:
952 elif self._hb_channel is not None:
936 # We didn't start the kernel with this KernelManager so we
953 # We didn't start the kernel with this KernelManager so we
937 # use the heartbeat.
954 # use the heartbeat.
938 return self._hb_channel.is_beating()
955 return self._hb_channel.is_beating()
939 else:
956 else:
940 # no heartbeat and not local, we can't tell if it's running,
957 # no heartbeat and not local, we can't tell if it's running,
941 # so naively return True
958 # so naively return True
942 return True
959 return True
943
960
944 #--------------------------------------------------------------------------
961 #--------------------------------------------------------------------------
945 # Channels used for communication with the kernel:
962 # Channels used for communication with the kernel:
946 #--------------------------------------------------------------------------
963 #--------------------------------------------------------------------------
947
964
948 @property
965 @property
949 def shell_channel(self):
966 def shell_channel(self):
950 """Get the REQ socket channel object to make requests of the kernel."""
967 """Get the REQ socket channel object to make requests of the kernel."""
951 if self._shell_channel is None:
968 if self._shell_channel is None:
952 self._shell_channel = self.shell_channel_class(self.context,
969 self._shell_channel = self.shell_channel_class(self.context,
953 self.session,
970 self.session,
954 (self.ip, self.shell_port))
971 (self.ip, self.shell_port))
955 return self._shell_channel
972 return self._shell_channel
956
973
957 @property
974 @property
958 def sub_channel(self):
975 def sub_channel(self):
959 """Get the SUB socket channel object."""
976 """Get the SUB socket channel object."""
960 if self._sub_channel is None:
977 if self._sub_channel is None:
961 self._sub_channel = self.sub_channel_class(self.context,
978 self._sub_channel = self.sub_channel_class(self.context,
962 self.session,
979 self.session,
963 (self.ip, self.iopub_port))
980 (self.ip, self.iopub_port))
964 return self._sub_channel
981 return self._sub_channel
965
982
966 @property
983 @property
967 def stdin_channel(self):
984 def stdin_channel(self):
968 """Get the REP socket channel object to handle stdin (raw_input)."""
985 """Get the REP socket channel object to handle stdin (raw_input)."""
969 if self._stdin_channel is None:
986 if self._stdin_channel is None:
970 self._stdin_channel = self.stdin_channel_class(self.context,
987 self._stdin_channel = self.stdin_channel_class(self.context,
971 self.session,
988 self.session,
972 (self.ip, self.stdin_port))
989 (self.ip, self.stdin_port))
973 return self._stdin_channel
990 return self._stdin_channel
974
991
975 @property
992 @property
976 def hb_channel(self):
993 def hb_channel(self):
977 """Get the heartbeat socket channel object to check that the
994 """Get the heartbeat socket channel object to check that the
978 kernel is alive."""
995 kernel is alive."""
979 if self._hb_channel is None:
996 if self._hb_channel is None:
980 self._hb_channel = self.hb_channel_class(self.context,
997 self._hb_channel = self.hb_channel_class(self.context,
981 self.session,
998 self.session,
982 (self.ip, self.hb_port))
999 (self.ip, self.hb_port))
983 return self._hb_channel
1000 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now