##// END OF EJS Templates
Push store_history parameter up to kernelmanager.
Jason Grout -
Show More
@@ -1,994 +1,999 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Constants and exceptions
47 # Constants and exceptions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 class InvalidPortNumber(Exception):
50 class InvalidPortNumber(Exception):
51 pass
51 pass
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Utility functions
54 # Utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # some utilities to validate message structure, these might get moved elsewhere
57 # some utilities to validate message structure, these might get moved elsewhere
58 # if they prove to have more generic utility
58 # if they prove to have more generic utility
59
59
60 def validate_string_list(lst):
60 def validate_string_list(lst):
61 """Validate that the input is a list of strings.
61 """Validate that the input is a list of strings.
62
62
63 Raises ValueError if not."""
63 Raises ValueError if not."""
64 if not isinstance(lst, list):
64 if not isinstance(lst, list):
65 raise ValueError('input %r must be a list' % lst)
65 raise ValueError('input %r must be a list' % lst)
66 for x in lst:
66 for x in lst:
67 if not isinstance(x, basestring):
67 if not isinstance(x, basestring):
68 raise ValueError('element %r in list must be a string' % x)
68 raise ValueError('element %r in list must be a string' % x)
69
69
70
70
71 def validate_string_dict(dct):
71 def validate_string_dict(dct):
72 """Validate that the input is a dict with string keys and values.
72 """Validate that the input is a dict with string keys and values.
73
73
74 Raises ValueError if not."""
74 Raises ValueError if not."""
75 for k,v in dct.iteritems():
75 for k,v in dct.iteritems():
76 if not isinstance(k, basestring):
76 if not isinstance(k, basestring):
77 raise ValueError('key %r in dict must be a string' % k)
77 raise ValueError('key %r in dict must be a string' % k)
78 if not isinstance(v, basestring):
78 if not isinstance(v, basestring):
79 raise ValueError('value %r in dict must be a string' % v)
79 raise ValueError('value %r in dict must be a string' % v)
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # ZMQ Socket Channel classes
83 # ZMQ Socket Channel classes
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86 class ZMQSocketChannel(Thread):
86 class ZMQSocketChannel(Thread):
87 """The base class for the channels that use ZMQ sockets.
87 """The base class for the channels that use ZMQ sockets.
88 """
88 """
89 context = None
89 context = None
90 session = None
90 session = None
91 socket = None
91 socket = None
92 ioloop = None
92 ioloop = None
93 stream = None
93 stream = None
94 _address = None
94 _address = None
95 _exiting = False
95 _exiting = False
96
96
97 def __init__(self, context, session, address):
97 def __init__(self, context, session, address):
98 """Create a channel
98 """Create a channel
99
99
100 Parameters
100 Parameters
101 ----------
101 ----------
102 context : :class:`zmq.Context`
102 context : :class:`zmq.Context`
103 The ZMQ context to use.
103 The ZMQ context to use.
104 session : :class:`session.Session`
104 session : :class:`session.Session`
105 The session to use.
105 The session to use.
106 address : tuple
106 address : tuple
107 Standard (ip, port) tuple that the kernel is listening on.
107 Standard (ip, port) tuple that the kernel is listening on.
108 """
108 """
109 super(ZMQSocketChannel, self).__init__()
109 super(ZMQSocketChannel, self).__init__()
110 self.daemon = True
110 self.daemon = True
111
111
112 self.context = context
112 self.context = context
113 self.session = session
113 self.session = session
114 if address[1] == 0:
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
116 raise InvalidPortNumber(message)
117 self._address = address
117 self._address = address
118 atexit.register(self._notice_exit)
118 atexit.register(self._notice_exit)
119
119
120 def _notice_exit(self):
120 def _notice_exit(self):
121 self._exiting = True
121 self._exiting = True
122
122
123 def _run_loop(self):
123 def _run_loop(self):
124 """Run my loop, ignoring EINTR events in the poller"""
124 """Run my loop, ignoring EINTR events in the poller"""
125 while True:
125 while True:
126 try:
126 try:
127 self.ioloop.start()
127 self.ioloop.start()
128 except ZMQError as e:
128 except ZMQError as e:
129 if e.errno == errno.EINTR:
129 if e.errno == errno.EINTR:
130 continue
130 continue
131 else:
131 else:
132 raise
132 raise
133 except Exception:
133 except Exception:
134 if self._exiting:
134 if self._exiting:
135 break
135 break
136 else:
136 else:
137 raise
137 raise
138 else:
138 else:
139 break
139 break
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the channel's activity.
142 """Stop the channel's activity.
143
143
144 This calls :method:`Thread.join` and returns when the thread
144 This calls :method:`Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
145 terminates. :class:`RuntimeError` will be raised if
146 :method:`self.start` is called again.
146 :method:`self.start` is called again.
147 """
147 """
148 self.join()
148 self.join()
149
149
150 @property
150 @property
151 def address(self):
151 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
152 """Get the channel's address as an (ip, port) tuple.
153
153
154 By the default, the address is (localhost, 0), where 0 means a random
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
155 port.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """callback for stream.on_recv
174 """callback for stream.on_recv
175
175
176 unpacks message, and calls handlers with it.
176 unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 self.call_handlers(self.session.unserialize(smsg))
179 self.call_handlers(self.session.unserialize(smsg))
180
180
181
181
182
182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellSocketChannel(ZMQSocketChannel):
184 """The DEALER channel for issues request/replies to the kernel.
184 """The DEALER channel for issues request/replies to the kernel.
185 """
185 """
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190
190
191 def __init__(self, context, session, address):
191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellSocketChannel, self).__init__(context, session, address)
193 self.ioloop = ioloop.IOLoop()
193 self.ioloop = ioloop.IOLoop()
194
194
195 def run(self):
195 def run(self):
196 """The thread's main activity. Call start() instead."""
196 """The thread's main activity. Call start() instead."""
197 self.socket = self.context.socket(zmq.DEALER)
197 self.socket = self.context.socket(zmq.DEALER)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
199 self.socket.connect('tcp://%s:%i' % self.address)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 self.stream.on_recv(self._handle_recv)
201 self.stream.on_recv(self._handle_recv)
202 self._run_loop()
202 self._run_loop()
203 try:
203 try:
204 self.socket.close()
204 self.socket.close()
205 except:
205 except:
206 pass
206 pass
207
207
208 def stop(self):
208 def stop(self):
209 self.ioloop.stop()
209 self.ioloop.stop()
210 super(ShellSocketChannel, self).stop()
210 super(ShellSocketChannel, self).stop()
211
211
212 def call_handlers(self, msg):
212 def call_handlers(self, msg):
213 """This method is called in the ioloop thread when a message arrives.
213 """This method is called in the ioloop thread when a message arrives.
214
214
215 Subclasses should override this method to handle incoming messages.
215 Subclasses should override this method to handle incoming messages.
216 It is important to remember that this method is called in the thread
216 It is important to remember that this method is called in the thread
217 so that some logic must be done to ensure that the application leve
217 so that some logic must be done to ensure that the application leve
218 handlers are called in the application thread.
218 handlers are called in the application thread.
219 """
219 """
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221
221
222 def execute(self, code, silent=False,
222 def execute(self, code, silent=False, store_history=True,
223 user_variables=None, user_expressions=None, allow_stdin=None):
223 user_variables=None, user_expressions=None, allow_stdin=None):
224 """Execute code in the kernel.
224 """Execute code in the kernel.
225
225
226 Parameters
226 Parameters
227 ----------
227 ----------
228 code : str
228 code : str
229 A string of Python code.
229 A string of Python code.
230
230
231 silent : bool, optional (default False)
231 silent : bool, optional (default False)
232 If set, the kernel will execute the code as quietly possible.
232 If set, the kernel will execute the code as quietly possible, and
233 will force store_history to be False.
234
235 store_history : bool, optional (default True)
236 If set, the kernel will execute store command history. This is forced
237 to be False if silent is True.
233
238
234 user_variables : list, optional
239 user_variables : list, optional
235 A list of variable names to pull from the user's namespace. They
240 A list of variable names to pull from the user's namespace. They
236 will come back as a dict with these names as keys and their
241 will come back as a dict with these names as keys and their
237 :func:`repr` as values.
242 :func:`repr` as values.
238
243
239 user_expressions : dict, optional
244 user_expressions : dict, optional
240 A dict with string keys and to pull from the user's
245 A dict with string keys and to pull from the user's
241 namespace. They will come back as a dict with these names as keys
246 namespace. They will come back as a dict with these names as keys
242 and their :func:`repr` as values.
247 and their :func:`repr` as values.
243
248
244 allow_stdin : bool, optional
249 allow_stdin : bool, optional
245 Flag for
250 Flag for
246 A dict with string keys and to pull from the user's
251 A dict with string keys and to pull from the user's
247 namespace. They will come back as a dict with these names as keys
252 namespace. They will come back as a dict with these names as keys
248 and their :func:`repr` as values.
253 and their :func:`repr` as values.
249
254
250 Returns
255 Returns
251 -------
256 -------
252 The msg_id of the message sent.
257 The msg_id of the message sent.
253 """
258 """
254 if user_variables is None:
259 if user_variables is None:
255 user_variables = []
260 user_variables = []
256 if user_expressions is None:
261 if user_expressions is None:
257 user_expressions = {}
262 user_expressions = {}
258 if allow_stdin is None:
263 if allow_stdin is None:
259 allow_stdin = self.allow_stdin
264 allow_stdin = self.allow_stdin
260
265
261
266
262 # Don't waste network traffic if inputs are invalid
267 # Don't waste network traffic if inputs are invalid
263 if not isinstance(code, basestring):
268 if not isinstance(code, basestring):
264 raise ValueError('code %r must be a string' % code)
269 raise ValueError('code %r must be a string' % code)
265 validate_string_list(user_variables)
270 validate_string_list(user_variables)
266 validate_string_dict(user_expressions)
271 validate_string_dict(user_expressions)
267
272
268 # Create class for content/msg creation. Related to, but possibly
273 # Create class for content/msg creation. Related to, but possibly
269 # not in Session.
274 # not in Session.
270 content = dict(code=code, silent=silent,
275 content = dict(code=code, silent=silent, store_history=store_history,
271 user_variables=user_variables,
276 user_variables=user_variables,
272 user_expressions=user_expressions,
277 user_expressions=user_expressions,
273 allow_stdin=allow_stdin,
278 allow_stdin=allow_stdin,
274 )
279 )
275 msg = self.session.msg('execute_request', content)
280 msg = self.session.msg('execute_request', content)
276 self._queue_send(msg)
281 self._queue_send(msg)
277 return msg['header']['msg_id']
282 return msg['header']['msg_id']
278
283
279 def complete(self, text, line, cursor_pos, block=None):
284 def complete(self, text, line, cursor_pos, block=None):
280 """Tab complete text in the kernel's namespace.
285 """Tab complete text in the kernel's namespace.
281
286
282 Parameters
287 Parameters
283 ----------
288 ----------
284 text : str
289 text : str
285 The text to complete.
290 The text to complete.
286 line : str
291 line : str
287 The full line of text that is the surrounding context for the
292 The full line of text that is the surrounding context for the
288 text to complete.
293 text to complete.
289 cursor_pos : int
294 cursor_pos : int
290 The position of the cursor in the line where the completion was
295 The position of the cursor in the line where the completion was
291 requested.
296 requested.
292 block : str, optional
297 block : str, optional
293 The full block of code in which the completion is being requested.
298 The full block of code in which the completion is being requested.
294
299
295 Returns
300 Returns
296 -------
301 -------
297 The msg_id of the message sent.
302 The msg_id of the message sent.
298 """
303 """
299 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
300 msg = self.session.msg('complete_request', content)
305 msg = self.session.msg('complete_request', content)
301 self._queue_send(msg)
306 self._queue_send(msg)
302 return msg['header']['msg_id']
307 return msg['header']['msg_id']
303
308
304 def object_info(self, oname, detail_level=0):
309 def object_info(self, oname, detail_level=0):
305 """Get metadata information about an object.
310 """Get metadata information about an object.
306
311
307 Parameters
312 Parameters
308 ----------
313 ----------
309 oname : str
314 oname : str
310 A string specifying the object name.
315 A string specifying the object name.
311 detail_level : int, optional
316 detail_level : int, optional
312 The level of detail for the introspection (0-2)
317 The level of detail for the introspection (0-2)
313
318
314 Returns
319 Returns
315 -------
320 -------
316 The msg_id of the message sent.
321 The msg_id of the message sent.
317 """
322 """
318 content = dict(oname=oname, detail_level=detail_level)
323 content = dict(oname=oname, detail_level=detail_level)
319 msg = self.session.msg('object_info_request', content)
324 msg = self.session.msg('object_info_request', content)
320 self._queue_send(msg)
325 self._queue_send(msg)
321 return msg['header']['msg_id']
326 return msg['header']['msg_id']
322
327
323 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
324 """Get entries from the history list.
329 """Get entries from the history list.
325
330
326 Parameters
331 Parameters
327 ----------
332 ----------
328 raw : bool
333 raw : bool
329 If True, return the raw input.
334 If True, return the raw input.
330 output : bool
335 output : bool
331 If True, then return the output as well.
336 If True, then return the output as well.
332 hist_access_type : str
337 hist_access_type : str
333 'range' (fill in session, start and stop params), 'tail' (fill in n)
338 'range' (fill in session, start and stop params), 'tail' (fill in n)
334 or 'search' (fill in pattern param).
339 or 'search' (fill in pattern param).
335
340
336 session : int
341 session : int
337 For a range request, the session from which to get lines. Session
342 For a range request, the session from which to get lines. Session
338 numbers are positive integers; negative ones count back from the
343 numbers are positive integers; negative ones count back from the
339 current session.
344 current session.
340 start : int
345 start : int
341 The first line number of a history range.
346 The first line number of a history range.
342 stop : int
347 stop : int
343 The final (excluded) line number of a history range.
348 The final (excluded) line number of a history range.
344
349
345 n : int
350 n : int
346 The number of lines of history to get for a tail request.
351 The number of lines of history to get for a tail request.
347
352
348 pattern : str
353 pattern : str
349 The glob-syntax pattern for a search request.
354 The glob-syntax pattern for a search request.
350
355
351 Returns
356 Returns
352 -------
357 -------
353 The msg_id of the message sent.
358 The msg_id of the message sent.
354 """
359 """
355 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
356 **kwargs)
361 **kwargs)
357 msg = self.session.msg('history_request', content)
362 msg = self.session.msg('history_request', content)
358 self._queue_send(msg)
363 self._queue_send(msg)
359 return msg['header']['msg_id']
364 return msg['header']['msg_id']
360
365
361 def shutdown(self, restart=False):
366 def shutdown(self, restart=False):
362 """Request an immediate kernel shutdown.
367 """Request an immediate kernel shutdown.
363
368
364 Upon receipt of the (empty) reply, client code can safely assume that
369 Upon receipt of the (empty) reply, client code can safely assume that
365 the kernel has shut down and it's safe to forcefully terminate it if
370 the kernel has shut down and it's safe to forcefully terminate it if
366 it's still alive.
371 it's still alive.
367
372
368 The kernel will send the reply via a function registered with Python's
373 The kernel will send the reply via a function registered with Python's
369 atexit module, ensuring it's truly done as the kernel is done with all
374 atexit module, ensuring it's truly done as the kernel is done with all
370 normal operation.
375 normal operation.
371 """
376 """
372 # Send quit message to kernel. Once we implement kernel-side setattr,
377 # Send quit message to kernel. Once we implement kernel-side setattr,
373 # this should probably be done that way, but for now this will do.
378 # this should probably be done that way, but for now this will do.
374 msg = self.session.msg('shutdown_request', {'restart':restart})
379 msg = self.session.msg('shutdown_request', {'restart':restart})
375 self._queue_send(msg)
380 self._queue_send(msg)
376 return msg['header']['msg_id']
381 return msg['header']['msg_id']
377
382
378
383
379
384
380 class SubSocketChannel(ZMQSocketChannel):
385 class SubSocketChannel(ZMQSocketChannel):
381 """The SUB channel which listens for messages that the kernel publishes.
386 """The SUB channel which listens for messages that the kernel publishes.
382 """
387 """
383
388
384 def __init__(self, context, session, address):
389 def __init__(self, context, session, address):
385 super(SubSocketChannel, self).__init__(context, session, address)
390 super(SubSocketChannel, self).__init__(context, session, address)
386 self.ioloop = ioloop.IOLoop()
391 self.ioloop = ioloop.IOLoop()
387
392
388 def run(self):
393 def run(self):
389 """The thread's main activity. Call start() instead."""
394 """The thread's main activity. Call start() instead."""
390 self.socket = self.context.socket(zmq.SUB)
395 self.socket = self.context.socket(zmq.SUB)
391 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
392 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
393 self.socket.connect('tcp://%s:%i' % self.address)
398 self.socket.connect('tcp://%s:%i' % self.address)
394 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
395 self.stream.on_recv(self._handle_recv)
400 self.stream.on_recv(self._handle_recv)
396 self._run_loop()
401 self._run_loop()
397 try:
402 try:
398 self.socket.close()
403 self.socket.close()
399 except:
404 except:
400 pass
405 pass
401
406
402 def stop(self):
407 def stop(self):
403 self.ioloop.stop()
408 self.ioloop.stop()
404 super(SubSocketChannel, self).stop()
409 super(SubSocketChannel, self).stop()
405
410
406 def call_handlers(self, msg):
411 def call_handlers(self, msg):
407 """This method is called in the ioloop thread when a message arrives.
412 """This method is called in the ioloop thread when a message arrives.
408
413
409 Subclasses should override this method to handle incoming messages.
414 Subclasses should override this method to handle incoming messages.
410 It is important to remember that this method is called in the thread
415 It is important to remember that this method is called in the thread
411 so that some logic must be done to ensure that the application leve
416 so that some logic must be done to ensure that the application leve
412 handlers are called in the application thread.
417 handlers are called in the application thread.
413 """
418 """
414 raise NotImplementedError('call_handlers must be defined in a subclass.')
419 raise NotImplementedError('call_handlers must be defined in a subclass.')
415
420
416 def flush(self, timeout=1.0):
421 def flush(self, timeout=1.0):
417 """Immediately processes all pending messages on the SUB channel.
422 """Immediately processes all pending messages on the SUB channel.
418
423
419 Callers should use this method to ensure that :method:`call_handlers`
424 Callers should use this method to ensure that :method:`call_handlers`
420 has been called for all messages that have been received on the
425 has been called for all messages that have been received on the
421 0MQ SUB socket of this channel.
426 0MQ SUB socket of this channel.
422
427
423 This method is thread safe.
428 This method is thread safe.
424
429
425 Parameters
430 Parameters
426 ----------
431 ----------
427 timeout : float, optional
432 timeout : float, optional
428 The maximum amount of time to spend flushing, in seconds. The
433 The maximum amount of time to spend flushing, in seconds. The
429 default is one second.
434 default is one second.
430 """
435 """
431 # We do the IOLoop callback process twice to ensure that the IOLoop
436 # We do the IOLoop callback process twice to ensure that the IOLoop
432 # gets to perform at least one full poll.
437 # gets to perform at least one full poll.
433 stop_time = time.time() + timeout
438 stop_time = time.time() + timeout
434 for i in xrange(2):
439 for i in xrange(2):
435 self._flushed = False
440 self._flushed = False
436 self.ioloop.add_callback(self._flush)
441 self.ioloop.add_callback(self._flush)
437 while not self._flushed and time.time() < stop_time:
442 while not self._flushed and time.time() < stop_time:
438 time.sleep(0.01)
443 time.sleep(0.01)
439
444
440 def _flush(self):
445 def _flush(self):
441 """Callback for :method:`self.flush`."""
446 """Callback for :method:`self.flush`."""
442 self.stream.flush()
447 self.stream.flush()
443 self._flushed = True
448 self._flushed = True
444
449
445
450
446 class StdInSocketChannel(ZMQSocketChannel):
451 class StdInSocketChannel(ZMQSocketChannel):
447 """A reply channel to handle raw_input requests that the kernel makes."""
452 """A reply channel to handle raw_input requests that the kernel makes."""
448
453
449 msg_queue = None
454 msg_queue = None
450
455
451 def __init__(self, context, session, address):
456 def __init__(self, context, session, address):
452 super(StdInSocketChannel, self).__init__(context, session, address)
457 super(StdInSocketChannel, self).__init__(context, session, address)
453 self.ioloop = ioloop.IOLoop()
458 self.ioloop = ioloop.IOLoop()
454
459
455 def run(self):
460 def run(self):
456 """The thread's main activity. Call start() instead."""
461 """The thread's main activity. Call start() instead."""
457 self.socket = self.context.socket(zmq.DEALER)
462 self.socket = self.context.socket(zmq.DEALER)
458 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
459 self.socket.connect('tcp://%s:%i' % self.address)
464 self.socket.connect('tcp://%s:%i' % self.address)
460 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
461 self.stream.on_recv(self._handle_recv)
466 self.stream.on_recv(self._handle_recv)
462 self._run_loop()
467 self._run_loop()
463 try:
468 try:
464 self.socket.close()
469 self.socket.close()
465 except:
470 except:
466 pass
471 pass
467
472
468
473
469 def stop(self):
474 def stop(self):
470 self.ioloop.stop()
475 self.ioloop.stop()
471 super(StdInSocketChannel, self).stop()
476 super(StdInSocketChannel, self).stop()
472
477
473 def call_handlers(self, msg):
478 def call_handlers(self, msg):
474 """This method is called in the ioloop thread when a message arrives.
479 """This method is called in the ioloop thread when a message arrives.
475
480
476 Subclasses should override this method to handle incoming messages.
481 Subclasses should override this method to handle incoming messages.
477 It is important to remember that this method is called in the thread
482 It is important to remember that this method is called in the thread
478 so that some logic must be done to ensure that the application leve
483 so that some logic must be done to ensure that the application leve
479 handlers are called in the application thread.
484 handlers are called in the application thread.
480 """
485 """
481 raise NotImplementedError('call_handlers must be defined in a subclass.')
486 raise NotImplementedError('call_handlers must be defined in a subclass.')
482
487
483 def input(self, string):
488 def input(self, string):
484 """Send a string of raw input to the kernel."""
489 """Send a string of raw input to the kernel."""
485 content = dict(value=string)
490 content = dict(value=string)
486 msg = self.session.msg('input_reply', content)
491 msg = self.session.msg('input_reply', content)
487 self._queue_send(msg)
492 self._queue_send(msg)
488
493
489
494
490 class HBSocketChannel(ZMQSocketChannel):
495 class HBSocketChannel(ZMQSocketChannel):
491 """The heartbeat channel which monitors the kernel heartbeat.
496 """The heartbeat channel which monitors the kernel heartbeat.
492
497
493 Note that the heartbeat channel is paused by default. As long as you start
498 Note that the heartbeat channel is paused by default. As long as you start
494 this channel, the kernel manager will ensure that it is paused and un-paused
499 this channel, the kernel manager will ensure that it is paused and un-paused
495 as appropriate.
500 as appropriate.
496 """
501 """
497
502
498 time_to_dead = 3.0
503 time_to_dead = 3.0
499 socket = None
504 socket = None
500 poller = None
505 poller = None
501 _running = None
506 _running = None
502 _pause = None
507 _pause = None
503 _beating = None
508 _beating = None
504
509
505 def __init__(self, context, session, address):
510 def __init__(self, context, session, address):
506 super(HBSocketChannel, self).__init__(context, session, address)
511 super(HBSocketChannel, self).__init__(context, session, address)
507 self._running = False
512 self._running = False
508 self._pause =True
513 self._pause =True
509 self.poller = zmq.Poller()
514 self.poller = zmq.Poller()
510
515
511 def _create_socket(self):
516 def _create_socket(self):
512 if self.socket is not None:
517 if self.socket is not None:
513 # close previous socket, before opening a new one
518 # close previous socket, before opening a new one
514 self.poller.unregister(self.socket)
519 self.poller.unregister(self.socket)
515 self.socket.close()
520 self.socket.close()
516 self.socket = self.context.socket(zmq.REQ)
521 self.socket = self.context.socket(zmq.REQ)
517 self.socket.setsockopt(zmq.LINGER, 0)
522 self.socket.setsockopt(zmq.LINGER, 0)
518 self.socket.connect('tcp://%s:%i' % self.address)
523 self.socket.connect('tcp://%s:%i' % self.address)
519
524
520 self.poller.register(self.socket, zmq.POLLIN)
525 self.poller.register(self.socket, zmq.POLLIN)
521
526
522 def _poll(self, start_time):
527 def _poll(self, start_time):
523 """poll for heartbeat replies until we reach self.time_to_dead
528 """poll for heartbeat replies until we reach self.time_to_dead
524
529
525 Ignores interrupts, and returns the result of poll(), which
530 Ignores interrupts, and returns the result of poll(), which
526 will be an empty list if no messages arrived before the timeout,
531 will be an empty list if no messages arrived before the timeout,
527 or the event tuple if there is a message to receive.
532 or the event tuple if there is a message to receive.
528 """
533 """
529
534
530 until_dead = self.time_to_dead - (time.time() - start_time)
535 until_dead = self.time_to_dead - (time.time() - start_time)
531 # ensure poll at least once
536 # ensure poll at least once
532 until_dead = max(until_dead, 1e-3)
537 until_dead = max(until_dead, 1e-3)
533 events = []
538 events = []
534 while True:
539 while True:
535 try:
540 try:
536 events = self.poller.poll(1000 * until_dead)
541 events = self.poller.poll(1000 * until_dead)
537 except ZMQError as e:
542 except ZMQError as e:
538 if e.errno == errno.EINTR:
543 if e.errno == errno.EINTR:
539 # ignore interrupts during heartbeat
544 # ignore interrupts during heartbeat
540 # this may never actually happen
545 # this may never actually happen
541 until_dead = self.time_to_dead - (time.time() - start_time)
546 until_dead = self.time_to_dead - (time.time() - start_time)
542 until_dead = max(until_dead, 1e-3)
547 until_dead = max(until_dead, 1e-3)
543 pass
548 pass
544 else:
549 else:
545 raise
550 raise
546 except Exception:
551 except Exception:
547 if self._exiting:
552 if self._exiting:
548 break
553 break
549 else:
554 else:
550 raise
555 raise
551 else:
556 else:
552 break
557 break
553 return events
558 return events
554
559
555 def run(self):
560 def run(self):
556 """The thread's main activity. Call start() instead."""
561 """The thread's main activity. Call start() instead."""
557 self._create_socket()
562 self._create_socket()
558 self._running = True
563 self._running = True
559 self._beating = True
564 self._beating = True
560
565
561 while self._running:
566 while self._running:
562 if self._pause:
567 if self._pause:
563 # just sleep, and skip the rest of the loop
568 # just sleep, and skip the rest of the loop
564 time.sleep(self.time_to_dead)
569 time.sleep(self.time_to_dead)
565 continue
570 continue
566
571
567 since_last_heartbeat = 0.0
572 since_last_heartbeat = 0.0
568 # io.rprint('Ping from HB channel') # dbg
573 # io.rprint('Ping from HB channel') # dbg
569 # no need to catch EFSM here, because the previous event was
574 # no need to catch EFSM here, because the previous event was
570 # either a recv or connect, which cannot be followed by EFSM
575 # either a recv or connect, which cannot be followed by EFSM
571 self.socket.send(b'ping')
576 self.socket.send(b'ping')
572 request_time = time.time()
577 request_time = time.time()
573 ready = self._poll(request_time)
578 ready = self._poll(request_time)
574 if ready:
579 if ready:
575 self._beating = True
580 self._beating = True
576 # the poll above guarantees we have something to recv
581 # the poll above guarantees we have something to recv
577 self.socket.recv()
582 self.socket.recv()
578 # sleep the remainder of the cycle
583 # sleep the remainder of the cycle
579 remainder = self.time_to_dead - (time.time() - request_time)
584 remainder = self.time_to_dead - (time.time() - request_time)
580 if remainder > 0:
585 if remainder > 0:
581 time.sleep(remainder)
586 time.sleep(remainder)
582 continue
587 continue
583 else:
588 else:
584 # nothing was received within the time limit, signal heart failure
589 # nothing was received within the time limit, signal heart failure
585 self._beating = False
590 self._beating = False
586 since_last_heartbeat = time.time() - request_time
591 since_last_heartbeat = time.time() - request_time
587 self.call_handlers(since_last_heartbeat)
592 self.call_handlers(since_last_heartbeat)
588 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 # and close/reopen the socket, because the REQ/REP cycle has been broken
589 self._create_socket()
594 self._create_socket()
590 continue
595 continue
591 try:
596 try:
592 self.socket.close()
597 self.socket.close()
593 except:
598 except:
594 pass
599 pass
595
600
596 def pause(self):
601 def pause(self):
597 """Pause the heartbeat."""
602 """Pause the heartbeat."""
598 self._pause = True
603 self._pause = True
599
604
600 def unpause(self):
605 def unpause(self):
601 """Unpause the heartbeat."""
606 """Unpause the heartbeat."""
602 self._pause = False
607 self._pause = False
603
608
604 def is_beating(self):
609 def is_beating(self):
605 """Is the heartbeat running and responsive (and not paused)."""
610 """Is the heartbeat running and responsive (and not paused)."""
606 if self.is_alive() and not self._pause and self._beating:
611 if self.is_alive() and not self._pause and self._beating:
607 return True
612 return True
608 else:
613 else:
609 return False
614 return False
610
615
611 def stop(self):
616 def stop(self):
612 self._running = False
617 self._running = False
613 super(HBSocketChannel, self).stop()
618 super(HBSocketChannel, self).stop()
614
619
615 def call_handlers(self, since_last_heartbeat):
620 def call_handlers(self, since_last_heartbeat):
616 """This method is called in the ioloop thread when a message arrives.
621 """This method is called in the ioloop thread when a message arrives.
617
622
618 Subclasses should override this method to handle incoming messages.
623 Subclasses should override this method to handle incoming messages.
619 It is important to remember that this method is called in the thread
624 It is important to remember that this method is called in the thread
620 so that some logic must be done to ensure that the application level
625 so that some logic must be done to ensure that the application level
621 handlers are called in the application thread.
626 handlers are called in the application thread.
622 """
627 """
623 raise NotImplementedError('call_handlers must be defined in a subclass.')
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
624
629
625
630
626 #-----------------------------------------------------------------------------
631 #-----------------------------------------------------------------------------
627 # Main kernel manager class
632 # Main kernel manager class
628 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
629
634
630 class KernelManager(HasTraits):
635 class KernelManager(HasTraits):
631 """ Manages a kernel for a frontend.
636 """ Manages a kernel for a frontend.
632
637
633 The SUB channel is for the frontend to receive messages published by the
638 The SUB channel is for the frontend to receive messages published by the
634 kernel.
639 kernel.
635
640
636 The REQ channel is for the frontend to make requests of the kernel.
641 The REQ channel is for the frontend to make requests of the kernel.
637
642
638 The REP channel is for the kernel to request stdin (raw_input) from the
643 The REP channel is for the kernel to request stdin (raw_input) from the
639 frontend.
644 frontend.
640 """
645 """
641 # config object for passing to child configurables
646 # config object for passing to child configurables
642 config = Instance(Config)
647 config = Instance(Config)
643
648
644 # The PyZMQ Context to use for communication with the kernel.
649 # The PyZMQ Context to use for communication with the kernel.
645 context = Instance(zmq.Context)
650 context = Instance(zmq.Context)
646 def _context_default(self):
651 def _context_default(self):
647 return zmq.Context.instance()
652 return zmq.Context.instance()
648
653
649 # The Session to use for communication with the kernel.
654 # The Session to use for communication with the kernel.
650 session = Instance(Session)
655 session = Instance(Session)
651
656
652 # The kernel process with which the KernelManager is communicating.
657 # The kernel process with which the KernelManager is communicating.
653 kernel = Instance(Popen)
658 kernel = Instance(Popen)
654
659
655 # The addresses for the communication channels.
660 # The addresses for the communication channels.
656 connection_file = Unicode('')
661 connection_file = Unicode('')
657 ip = Unicode(LOCALHOST)
662 ip = Unicode(LOCALHOST)
658 def _ip_changed(self, name, old, new):
663 def _ip_changed(self, name, old, new):
659 if new == '*':
664 if new == '*':
660 self.ip = '0.0.0.0'
665 self.ip = '0.0.0.0'
661 shell_port = Integer(0)
666 shell_port = Integer(0)
662 iopub_port = Integer(0)
667 iopub_port = Integer(0)
663 stdin_port = Integer(0)
668 stdin_port = Integer(0)
664 hb_port = Integer(0)
669 hb_port = Integer(0)
665
670
666 # The classes to use for the various channels.
671 # The classes to use for the various channels.
667 shell_channel_class = Type(ShellSocketChannel)
672 shell_channel_class = Type(ShellSocketChannel)
668 sub_channel_class = Type(SubSocketChannel)
673 sub_channel_class = Type(SubSocketChannel)
669 stdin_channel_class = Type(StdInSocketChannel)
674 stdin_channel_class = Type(StdInSocketChannel)
670 hb_channel_class = Type(HBSocketChannel)
675 hb_channel_class = Type(HBSocketChannel)
671
676
672 # Protected traits.
677 # Protected traits.
673 _launch_args = Any
678 _launch_args = Any
674 _shell_channel = Any
679 _shell_channel = Any
675 _sub_channel = Any
680 _sub_channel = Any
676 _stdin_channel = Any
681 _stdin_channel = Any
677 _hb_channel = Any
682 _hb_channel = Any
678 _connection_file_written=Bool(False)
683 _connection_file_written=Bool(False)
679
684
680 def __init__(self, **kwargs):
685 def __init__(self, **kwargs):
681 super(KernelManager, self).__init__(**kwargs)
686 super(KernelManager, self).__init__(**kwargs)
682 if self.session is None:
687 if self.session is None:
683 self.session = Session(config=self.config)
688 self.session = Session(config=self.config)
684
689
685 def __del__(self):
690 def __del__(self):
686 self.cleanup_connection_file()
691 self.cleanup_connection_file()
687
692
688
693
689 #--------------------------------------------------------------------------
694 #--------------------------------------------------------------------------
690 # Channel management methods:
695 # Channel management methods:
691 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
692
697
693 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
698 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
694 """Starts the channels for this kernel.
699 """Starts the channels for this kernel.
695
700
696 This will create the channels if they do not exist and then start
701 This will create the channels if they do not exist and then start
697 them. If port numbers of 0 are being used (random ports) then you
702 them. If port numbers of 0 are being used (random ports) then you
698 must first call :method:`start_kernel`. If the channels have been
703 must first call :method:`start_kernel`. If the channels have been
699 stopped and you call this, :class:`RuntimeError` will be raised.
704 stopped and you call this, :class:`RuntimeError` will be raised.
700 """
705 """
701 if shell:
706 if shell:
702 self.shell_channel.start()
707 self.shell_channel.start()
703 if sub:
708 if sub:
704 self.sub_channel.start()
709 self.sub_channel.start()
705 if stdin:
710 if stdin:
706 self.stdin_channel.start()
711 self.stdin_channel.start()
707 self.shell_channel.allow_stdin = True
712 self.shell_channel.allow_stdin = True
708 else:
713 else:
709 self.shell_channel.allow_stdin = False
714 self.shell_channel.allow_stdin = False
710 if hb:
715 if hb:
711 self.hb_channel.start()
716 self.hb_channel.start()
712
717
713 def stop_channels(self):
718 def stop_channels(self):
714 """Stops all the running channels for this kernel.
719 """Stops all the running channels for this kernel.
715 """
720 """
716 if self.shell_channel.is_alive():
721 if self.shell_channel.is_alive():
717 self.shell_channel.stop()
722 self.shell_channel.stop()
718 if self.sub_channel.is_alive():
723 if self.sub_channel.is_alive():
719 self.sub_channel.stop()
724 self.sub_channel.stop()
720 if self.stdin_channel.is_alive():
725 if self.stdin_channel.is_alive():
721 self.stdin_channel.stop()
726 self.stdin_channel.stop()
722 if self.hb_channel.is_alive():
727 if self.hb_channel.is_alive():
723 self.hb_channel.stop()
728 self.hb_channel.stop()
724
729
725 @property
730 @property
726 def channels_running(self):
731 def channels_running(self):
727 """Are any of the channels created and running?"""
732 """Are any of the channels created and running?"""
728 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
733 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
729 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
734 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
730
735
731 #--------------------------------------------------------------------------
736 #--------------------------------------------------------------------------
732 # Kernel process management methods:
737 # Kernel process management methods:
733 #--------------------------------------------------------------------------
738 #--------------------------------------------------------------------------
734
739
735 def cleanup_connection_file(self):
740 def cleanup_connection_file(self):
736 """cleanup connection file *if we wrote it*
741 """cleanup connection file *if we wrote it*
737
742
738 Will not raise if the connection file was already removed somehow.
743 Will not raise if the connection file was already removed somehow.
739 """
744 """
740 if self._connection_file_written:
745 if self._connection_file_written:
741 # cleanup connection files on full shutdown of kernel we started
746 # cleanup connection files on full shutdown of kernel we started
742 self._connection_file_written = False
747 self._connection_file_written = False
743 try:
748 try:
744 os.remove(self.connection_file)
749 os.remove(self.connection_file)
745 except OSError:
750 except OSError:
746 pass
751 pass
747
752
748 def load_connection_file(self):
753 def load_connection_file(self):
749 """load connection info from JSON dict in self.connection_file"""
754 """load connection info from JSON dict in self.connection_file"""
750 with open(self.connection_file) as f:
755 with open(self.connection_file) as f:
751 cfg = json.loads(f.read())
756 cfg = json.loads(f.read())
752
757
753 self.ip = cfg['ip']
758 self.ip = cfg['ip']
754 self.shell_port = cfg['shell_port']
759 self.shell_port = cfg['shell_port']
755 self.stdin_port = cfg['stdin_port']
760 self.stdin_port = cfg['stdin_port']
756 self.iopub_port = cfg['iopub_port']
761 self.iopub_port = cfg['iopub_port']
757 self.hb_port = cfg['hb_port']
762 self.hb_port = cfg['hb_port']
758 self.session.key = str_to_bytes(cfg['key'])
763 self.session.key = str_to_bytes(cfg['key'])
759
764
760 def write_connection_file(self):
765 def write_connection_file(self):
761 """write connection info to JSON dict in self.connection_file"""
766 """write connection info to JSON dict in self.connection_file"""
762 if self._connection_file_written:
767 if self._connection_file_written:
763 return
768 return
764 self.connection_file,cfg = write_connection_file(self.connection_file,
769 self.connection_file,cfg = write_connection_file(self.connection_file,
765 ip=self.ip, key=self.session.key,
770 ip=self.ip, key=self.session.key,
766 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
771 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
767 shell_port=self.shell_port, hb_port=self.hb_port)
772 shell_port=self.shell_port, hb_port=self.hb_port)
768 # write_connection_file also sets default ports:
773 # write_connection_file also sets default ports:
769 self.shell_port = cfg['shell_port']
774 self.shell_port = cfg['shell_port']
770 self.stdin_port = cfg['stdin_port']
775 self.stdin_port = cfg['stdin_port']
771 self.iopub_port = cfg['iopub_port']
776 self.iopub_port = cfg['iopub_port']
772 self.hb_port = cfg['hb_port']
777 self.hb_port = cfg['hb_port']
773
778
774 self._connection_file_written = True
779 self._connection_file_written = True
775
780
776 def start_kernel(self, **kw):
781 def start_kernel(self, **kw):
777 """Starts a kernel process and configures the manager to use it.
782 """Starts a kernel process and configures the manager to use it.
778
783
779 If random ports (port=0) are being used, this method must be called
784 If random ports (port=0) are being used, this method must be called
780 before the channels are created.
785 before the channels are created.
781
786
782 Parameters:
787 Parameters:
783 -----------
788 -----------
784 launcher : callable, optional (default None)
789 launcher : callable, optional (default None)
785 A custom function for launching the kernel process (generally a
790 A custom function for launching the kernel process (generally a
786 wrapper around ``entry_point.base_launch_kernel``). In most cases,
791 wrapper around ``entry_point.base_launch_kernel``). In most cases,
787 it should not be necessary to use this parameter.
792 it should not be necessary to use this parameter.
788
793
789 **kw : optional
794 **kw : optional
790 See respective options for IPython and Python kernels.
795 See respective options for IPython and Python kernels.
791 """
796 """
792 if self.ip not in LOCAL_IPS:
797 if self.ip not in LOCAL_IPS:
793 raise RuntimeError("Can only launch a kernel on a local interface. "
798 raise RuntimeError("Can only launch a kernel on a local interface. "
794 "Make sure that the '*_address' attributes are "
799 "Make sure that the '*_address' attributes are "
795 "configured properly. "
800 "configured properly. "
796 "Currently valid addresses are: %s"%LOCAL_IPS
801 "Currently valid addresses are: %s"%LOCAL_IPS
797 )
802 )
798
803
799 # write connection file / get default ports
804 # write connection file / get default ports
800 self.write_connection_file()
805 self.write_connection_file()
801
806
802 self._launch_args = kw.copy()
807 self._launch_args = kw.copy()
803 launch_kernel = kw.pop('launcher', None)
808 launch_kernel = kw.pop('launcher', None)
804 if launch_kernel is None:
809 if launch_kernel is None:
805 from ipkernel import launch_kernel
810 from ipkernel import launch_kernel
806 self.kernel = launch_kernel(fname=self.connection_file, **kw)
811 self.kernel = launch_kernel(fname=self.connection_file, **kw)
807
812
808 def shutdown_kernel(self, restart=False):
813 def shutdown_kernel(self, restart=False):
809 """ Attempts to the stop the kernel process cleanly. If the kernel
814 """ Attempts to the stop the kernel process cleanly. If the kernel
810 cannot be stopped, it is killed, if possible.
815 cannot be stopped, it is killed, if possible.
811 """
816 """
812 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
817 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
813 if sys.platform == 'win32':
818 if sys.platform == 'win32':
814 self.kill_kernel()
819 self.kill_kernel()
815 return
820 return
816
821
817 # Pause the heart beat channel if it exists.
822 # Pause the heart beat channel if it exists.
818 if self._hb_channel is not None:
823 if self._hb_channel is not None:
819 self._hb_channel.pause()
824 self._hb_channel.pause()
820
825
821 # Don't send any additional kernel kill messages immediately, to give
826 # Don't send any additional kernel kill messages immediately, to give
822 # the kernel a chance to properly execute shutdown actions. Wait for at
827 # the kernel a chance to properly execute shutdown actions. Wait for at
823 # most 1s, checking every 0.1s.
828 # most 1s, checking every 0.1s.
824 self.shell_channel.shutdown(restart=restart)
829 self.shell_channel.shutdown(restart=restart)
825 for i in range(10):
830 for i in range(10):
826 if self.is_alive:
831 if self.is_alive:
827 time.sleep(0.1)
832 time.sleep(0.1)
828 else:
833 else:
829 break
834 break
830 else:
835 else:
831 # OK, we've waited long enough.
836 # OK, we've waited long enough.
832 if self.has_kernel:
837 if self.has_kernel:
833 self.kill_kernel()
838 self.kill_kernel()
834
839
835 if not restart and self._connection_file_written:
840 if not restart and self._connection_file_written:
836 # cleanup connection files on full shutdown of kernel we started
841 # cleanup connection files on full shutdown of kernel we started
837 self._connection_file_written = False
842 self._connection_file_written = False
838 try:
843 try:
839 os.remove(self.connection_file)
844 os.remove(self.connection_file)
840 except IOError:
845 except IOError:
841 pass
846 pass
842
847
843 def restart_kernel(self, now=False, **kw):
848 def restart_kernel(self, now=False, **kw):
844 """Restarts a kernel with the arguments that were used to launch it.
849 """Restarts a kernel with the arguments that were used to launch it.
845
850
846 If the old kernel was launched with random ports, the same ports will be
851 If the old kernel was launched with random ports, the same ports will be
847 used for the new kernel.
852 used for the new kernel.
848
853
849 Parameters
854 Parameters
850 ----------
855 ----------
851 now : bool, optional
856 now : bool, optional
852 If True, the kernel is forcefully restarted *immediately*, without
857 If True, the kernel is forcefully restarted *immediately*, without
853 having a chance to do any cleanup action. Otherwise the kernel is
858 having a chance to do any cleanup action. Otherwise the kernel is
854 given 1s to clean up before a forceful restart is issued.
859 given 1s to clean up before a forceful restart is issued.
855
860
856 In all cases the kernel is restarted, the only difference is whether
861 In all cases the kernel is restarted, the only difference is whether
857 it is given a chance to perform a clean shutdown or not.
862 it is given a chance to perform a clean shutdown or not.
858
863
859 **kw : optional
864 **kw : optional
860 Any options specified here will replace those used to launch the
865 Any options specified here will replace those used to launch the
861 kernel.
866 kernel.
862 """
867 """
863 if self._launch_args is None:
868 if self._launch_args is None:
864 raise RuntimeError("Cannot restart the kernel. "
869 raise RuntimeError("Cannot restart the kernel. "
865 "No previous call to 'start_kernel'.")
870 "No previous call to 'start_kernel'.")
866 else:
871 else:
867 # Stop currently running kernel.
872 # Stop currently running kernel.
868 if self.has_kernel:
873 if self.has_kernel:
869 if now:
874 if now:
870 self.kill_kernel()
875 self.kill_kernel()
871 else:
876 else:
872 self.shutdown_kernel(restart=True)
877 self.shutdown_kernel(restart=True)
873
878
874 # Start new kernel.
879 # Start new kernel.
875 self._launch_args.update(kw)
880 self._launch_args.update(kw)
876 self.start_kernel(**self._launch_args)
881 self.start_kernel(**self._launch_args)
877
882
878 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
883 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
879 # unless there is some delay here.
884 # unless there is some delay here.
880 if sys.platform == 'win32':
885 if sys.platform == 'win32':
881 time.sleep(0.2)
886 time.sleep(0.2)
882
887
883 @property
888 @property
884 def has_kernel(self):
889 def has_kernel(self):
885 """Returns whether a kernel process has been specified for the kernel
890 """Returns whether a kernel process has been specified for the kernel
886 manager.
891 manager.
887 """
892 """
888 return self.kernel is not None
893 return self.kernel is not None
889
894
890 def kill_kernel(self):
895 def kill_kernel(self):
891 """ Kill the running kernel. """
896 """ Kill the running kernel. """
892 if self.has_kernel:
897 if self.has_kernel:
893 # Pause the heart beat channel if it exists.
898 # Pause the heart beat channel if it exists.
894 if self._hb_channel is not None:
899 if self._hb_channel is not None:
895 self._hb_channel.pause()
900 self._hb_channel.pause()
896
901
897 # Attempt to kill the kernel.
902 # Attempt to kill the kernel.
898 try:
903 try:
899 self.kernel.kill()
904 self.kernel.kill()
900 except OSError as e:
905 except OSError as e:
901 # In Windows, we will get an Access Denied error if the process
906 # In Windows, we will get an Access Denied error if the process
902 # has already terminated. Ignore it.
907 # has already terminated. Ignore it.
903 if sys.platform == 'win32':
908 if sys.platform == 'win32':
904 if e.winerror != 5:
909 if e.winerror != 5:
905 raise
910 raise
906 # On Unix, we may get an ESRCH error if the process has already
911 # On Unix, we may get an ESRCH error if the process has already
907 # terminated. Ignore it.
912 # terminated. Ignore it.
908 else:
913 else:
909 from errno import ESRCH
914 from errno import ESRCH
910 if e.errno != ESRCH:
915 if e.errno != ESRCH:
911 raise
916 raise
912 self.kernel = None
917 self.kernel = None
913 else:
918 else:
914 raise RuntimeError("Cannot kill kernel. No kernel is running!")
919 raise RuntimeError("Cannot kill kernel. No kernel is running!")
915
920
916 def interrupt_kernel(self):
921 def interrupt_kernel(self):
917 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
922 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
918 well supported on all platforms.
923 well supported on all platforms.
919 """
924 """
920 if self.has_kernel:
925 if self.has_kernel:
921 if sys.platform == 'win32':
926 if sys.platform == 'win32':
922 from parentpoller import ParentPollerWindows as Poller
927 from parentpoller import ParentPollerWindows as Poller
923 Poller.send_interrupt(self.kernel.win32_interrupt_event)
928 Poller.send_interrupt(self.kernel.win32_interrupt_event)
924 else:
929 else:
925 self.kernel.send_signal(signal.SIGINT)
930 self.kernel.send_signal(signal.SIGINT)
926 else:
931 else:
927 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
932 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
928
933
929 def signal_kernel(self, signum):
934 def signal_kernel(self, signum):
930 """ Sends a signal to the kernel. Note that since only SIGTERM is
935 """ Sends a signal to the kernel. Note that since only SIGTERM is
931 supported on Windows, this function is only useful on Unix systems.
936 supported on Windows, this function is only useful on Unix systems.
932 """
937 """
933 if self.has_kernel:
938 if self.has_kernel:
934 self.kernel.send_signal(signum)
939 self.kernel.send_signal(signum)
935 else:
940 else:
936 raise RuntimeError("Cannot signal kernel. No kernel is running!")
941 raise RuntimeError("Cannot signal kernel. No kernel is running!")
937
942
938 @property
943 @property
939 def is_alive(self):
944 def is_alive(self):
940 """Is the kernel process still running?"""
945 """Is the kernel process still running?"""
941 if self.has_kernel:
946 if self.has_kernel:
942 if self.kernel.poll() is None:
947 if self.kernel.poll() is None:
943 return True
948 return True
944 else:
949 else:
945 return False
950 return False
946 elif self._hb_channel is not None:
951 elif self._hb_channel is not None:
947 # We didn't start the kernel with this KernelManager so we
952 # We didn't start the kernel with this KernelManager so we
948 # use the heartbeat.
953 # use the heartbeat.
949 return self._hb_channel.is_beating()
954 return self._hb_channel.is_beating()
950 else:
955 else:
951 # no heartbeat and not local, we can't tell if it's running,
956 # no heartbeat and not local, we can't tell if it's running,
952 # so naively return True
957 # so naively return True
953 return True
958 return True
954
959
955 #--------------------------------------------------------------------------
960 #--------------------------------------------------------------------------
956 # Channels used for communication with the kernel:
961 # Channels used for communication with the kernel:
957 #--------------------------------------------------------------------------
962 #--------------------------------------------------------------------------
958
963
959 @property
964 @property
960 def shell_channel(self):
965 def shell_channel(self):
961 """Get the REQ socket channel object to make requests of the kernel."""
966 """Get the REQ socket channel object to make requests of the kernel."""
962 if self._shell_channel is None:
967 if self._shell_channel is None:
963 self._shell_channel = self.shell_channel_class(self.context,
968 self._shell_channel = self.shell_channel_class(self.context,
964 self.session,
969 self.session,
965 (self.ip, self.shell_port))
970 (self.ip, self.shell_port))
966 return self._shell_channel
971 return self._shell_channel
967
972
968 @property
973 @property
969 def sub_channel(self):
974 def sub_channel(self):
970 """Get the SUB socket channel object."""
975 """Get the SUB socket channel object."""
971 if self._sub_channel is None:
976 if self._sub_channel is None:
972 self._sub_channel = self.sub_channel_class(self.context,
977 self._sub_channel = self.sub_channel_class(self.context,
973 self.session,
978 self.session,
974 (self.ip, self.iopub_port))
979 (self.ip, self.iopub_port))
975 return self._sub_channel
980 return self._sub_channel
976
981
977 @property
982 @property
978 def stdin_channel(self):
983 def stdin_channel(self):
979 """Get the REP socket channel object to handle stdin (raw_input)."""
984 """Get the REP socket channel object to handle stdin (raw_input)."""
980 if self._stdin_channel is None:
985 if self._stdin_channel is None:
981 self._stdin_channel = self.stdin_channel_class(self.context,
986 self._stdin_channel = self.stdin_channel_class(self.context,
982 self.session,
987 self.session,
983 (self.ip, self.stdin_port))
988 (self.ip, self.stdin_port))
984 return self._stdin_channel
989 return self._stdin_channel
985
990
986 @property
991 @property
987 def hb_channel(self):
992 def hb_channel(self):
988 """Get the heartbeat socket channel object to check that the
993 """Get the heartbeat socket channel object to check that the
989 kernel is alive."""
994 kernel is alive."""
990 if self._hb_channel is None:
995 if self._hb_channel is None:
991 self._hb_channel = self.hb_channel_class(self.context,
996 self._hb_channel = self.hb_channel_class(self.context,
992 self.session,
997 self.session,
993 (self.ip, self.hb_port))
998 (self.ip, self.hb_port))
994 return self._hb_channel
999 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now