##// END OF EJS Templates
fix typo in docs
Jason Grout -
Show More
@@ -1,999 +1,999 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Constants and exceptions
47 # Constants and exceptions
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 class InvalidPortNumber(Exception):
50 class InvalidPortNumber(Exception):
51 pass
51 pass
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Utility functions
54 # Utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # some utilities to validate message structure, these might get moved elsewhere
57 # some utilities to validate message structure, these might get moved elsewhere
58 # if they prove to have more generic utility
58 # if they prove to have more generic utility
59
59
60 def validate_string_list(lst):
60 def validate_string_list(lst):
61 """Validate that the input is a list of strings.
61 """Validate that the input is a list of strings.
62
62
63 Raises ValueError if not."""
63 Raises ValueError if not."""
64 if not isinstance(lst, list):
64 if not isinstance(lst, list):
65 raise ValueError('input %r must be a list' % lst)
65 raise ValueError('input %r must be a list' % lst)
66 for x in lst:
66 for x in lst:
67 if not isinstance(x, basestring):
67 if not isinstance(x, basestring):
68 raise ValueError('element %r in list must be a string' % x)
68 raise ValueError('element %r in list must be a string' % x)
69
69
70
70
71 def validate_string_dict(dct):
71 def validate_string_dict(dct):
72 """Validate that the input is a dict with string keys and values.
72 """Validate that the input is a dict with string keys and values.
73
73
74 Raises ValueError if not."""
74 Raises ValueError if not."""
75 for k,v in dct.iteritems():
75 for k,v in dct.iteritems():
76 if not isinstance(k, basestring):
76 if not isinstance(k, basestring):
77 raise ValueError('key %r in dict must be a string' % k)
77 raise ValueError('key %r in dict must be a string' % k)
78 if not isinstance(v, basestring):
78 if not isinstance(v, basestring):
79 raise ValueError('value %r in dict must be a string' % v)
79 raise ValueError('value %r in dict must be a string' % v)
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # ZMQ Socket Channel classes
83 # ZMQ Socket Channel classes
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86 class ZMQSocketChannel(Thread):
86 class ZMQSocketChannel(Thread):
87 """The base class for the channels that use ZMQ sockets.
87 """The base class for the channels that use ZMQ sockets.
88 """
88 """
89 context = None
89 context = None
90 session = None
90 session = None
91 socket = None
91 socket = None
92 ioloop = None
92 ioloop = None
93 stream = None
93 stream = None
94 _address = None
94 _address = None
95 _exiting = False
95 _exiting = False
96
96
97 def __init__(self, context, session, address):
97 def __init__(self, context, session, address):
98 """Create a channel
98 """Create a channel
99
99
100 Parameters
100 Parameters
101 ----------
101 ----------
102 context : :class:`zmq.Context`
102 context : :class:`zmq.Context`
103 The ZMQ context to use.
103 The ZMQ context to use.
104 session : :class:`session.Session`
104 session : :class:`session.Session`
105 The session to use.
105 The session to use.
106 address : tuple
106 address : tuple
107 Standard (ip, port) tuple that the kernel is listening on.
107 Standard (ip, port) tuple that the kernel is listening on.
108 """
108 """
109 super(ZMQSocketChannel, self).__init__()
109 super(ZMQSocketChannel, self).__init__()
110 self.daemon = True
110 self.daemon = True
111
111
112 self.context = context
112 self.context = context
113 self.session = session
113 self.session = session
114 if address[1] == 0:
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
116 raise InvalidPortNumber(message)
117 self._address = address
117 self._address = address
118 atexit.register(self._notice_exit)
118 atexit.register(self._notice_exit)
119
119
120 def _notice_exit(self):
120 def _notice_exit(self):
121 self._exiting = True
121 self._exiting = True
122
122
123 def _run_loop(self):
123 def _run_loop(self):
124 """Run my loop, ignoring EINTR events in the poller"""
124 """Run my loop, ignoring EINTR events in the poller"""
125 while True:
125 while True:
126 try:
126 try:
127 self.ioloop.start()
127 self.ioloop.start()
128 except ZMQError as e:
128 except ZMQError as e:
129 if e.errno == errno.EINTR:
129 if e.errno == errno.EINTR:
130 continue
130 continue
131 else:
131 else:
132 raise
132 raise
133 except Exception:
133 except Exception:
134 if self._exiting:
134 if self._exiting:
135 break
135 break
136 else:
136 else:
137 raise
137 raise
138 else:
138 else:
139 break
139 break
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the channel's activity.
142 """Stop the channel's activity.
143
143
144 This calls :method:`Thread.join` and returns when the thread
144 This calls :method:`Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
145 terminates. :class:`RuntimeError` will be raised if
146 :method:`self.start` is called again.
146 :method:`self.start` is called again.
147 """
147 """
148 self.join()
148 self.join()
149
149
150 @property
150 @property
151 def address(self):
151 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
152 """Get the channel's address as an (ip, port) tuple.
153
153
154 By the default, the address is (localhost, 0), where 0 means a random
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
155 port.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """callback for stream.on_recv
174 """callback for stream.on_recv
175
175
176 unpacks message, and calls handlers with it.
176 unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 self.call_handlers(self.session.unserialize(smsg))
179 self.call_handlers(self.session.unserialize(smsg))
180
180
181
181
182
182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellSocketChannel(ZMQSocketChannel):
184 """The DEALER channel for issues request/replies to the kernel.
184 """The DEALER channel for issues request/replies to the kernel.
185 """
185 """
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190
190
191 def __init__(self, context, session, address):
191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellSocketChannel, self).__init__(context, session, address)
193 self.ioloop = ioloop.IOLoop()
193 self.ioloop = ioloop.IOLoop()
194
194
195 def run(self):
195 def run(self):
196 """The thread's main activity. Call start() instead."""
196 """The thread's main activity. Call start() instead."""
197 self.socket = self.context.socket(zmq.DEALER)
197 self.socket = self.context.socket(zmq.DEALER)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
199 self.socket.connect('tcp://%s:%i' % self.address)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 self.stream.on_recv(self._handle_recv)
201 self.stream.on_recv(self._handle_recv)
202 self._run_loop()
202 self._run_loop()
203 try:
203 try:
204 self.socket.close()
204 self.socket.close()
205 except:
205 except:
206 pass
206 pass
207
207
208 def stop(self):
208 def stop(self):
209 self.ioloop.stop()
209 self.ioloop.stop()
210 super(ShellSocketChannel, self).stop()
210 super(ShellSocketChannel, self).stop()
211
211
212 def call_handlers(self, msg):
212 def call_handlers(self, msg):
213 """This method is called in the ioloop thread when a message arrives.
213 """This method is called in the ioloop thread when a message arrives.
214
214
215 Subclasses should override this method to handle incoming messages.
215 Subclasses should override this method to handle incoming messages.
216 It is important to remember that this method is called in the thread
216 It is important to remember that this method is called in the thread
217 so that some logic must be done to ensure that the application leve
217 so that some logic must be done to ensure that the application leve
218 handlers are called in the application thread.
218 handlers are called in the application thread.
219 """
219 """
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221
221
222 def execute(self, code, silent=False, store_history=True,
222 def execute(self, code, silent=False, store_history=True,
223 user_variables=None, user_expressions=None, allow_stdin=None):
223 user_variables=None, user_expressions=None, allow_stdin=None):
224 """Execute code in the kernel.
224 """Execute code in the kernel.
225
225
226 Parameters
226 Parameters
227 ----------
227 ----------
228 code : str
228 code : str
229 A string of Python code.
229 A string of Python code.
230
230
231 silent : bool, optional (default False)
231 silent : bool, optional (default False)
232 If set, the kernel will execute the code as quietly possible, and
232 If set, the kernel will execute the code as quietly possible, and
233 will force store_history to be False.
233 will force store_history to be False.
234
234
235 store_history : bool, optional (default True)
235 store_history : bool, optional (default True)
236 If set, the kernel will execute store command history. This is forced
236 If set, the kernel will store command history. This is forced
237 to be False if silent is True.
237 to be False if silent is True.
238
238
239 user_variables : list, optional
239 user_variables : list, optional
240 A list of variable names to pull from the user's namespace. They
240 A list of variable names to pull from the user's namespace. They
241 will come back as a dict with these names as keys and their
241 will come back as a dict with these names as keys and their
242 :func:`repr` as values.
242 :func:`repr` as values.
243
243
244 user_expressions : dict, optional
244 user_expressions : dict, optional
245 A dict with string keys and to pull from the user's
245 A dict with string keys and to pull from the user's
246 namespace. They will come back as a dict with these names as keys
246 namespace. They will come back as a dict with these names as keys
247 and their :func:`repr` as values.
247 and their :func:`repr` as values.
248
248
249 allow_stdin : bool, optional
249 allow_stdin : bool, optional
250 Flag for
250 Flag for
251 A dict with string keys and to pull from the user's
251 A dict with string keys and to pull from the user's
252 namespace. They will come back as a dict with these names as keys
252 namespace. They will come back as a dict with these names as keys
253 and their :func:`repr` as values.
253 and their :func:`repr` as values.
254
254
255 Returns
255 Returns
256 -------
256 -------
257 The msg_id of the message sent.
257 The msg_id of the message sent.
258 """
258 """
259 if user_variables is None:
259 if user_variables is None:
260 user_variables = []
260 user_variables = []
261 if user_expressions is None:
261 if user_expressions is None:
262 user_expressions = {}
262 user_expressions = {}
263 if allow_stdin is None:
263 if allow_stdin is None:
264 allow_stdin = self.allow_stdin
264 allow_stdin = self.allow_stdin
265
265
266
266
267 # Don't waste network traffic if inputs are invalid
267 # Don't waste network traffic if inputs are invalid
268 if not isinstance(code, basestring):
268 if not isinstance(code, basestring):
269 raise ValueError('code %r must be a string' % code)
269 raise ValueError('code %r must be a string' % code)
270 validate_string_list(user_variables)
270 validate_string_list(user_variables)
271 validate_string_dict(user_expressions)
271 validate_string_dict(user_expressions)
272
272
273 # Create class for content/msg creation. Related to, but possibly
273 # Create class for content/msg creation. Related to, but possibly
274 # not in Session.
274 # not in Session.
275 content = dict(code=code, silent=silent, store_history=store_history,
275 content = dict(code=code, silent=silent, store_history=store_history,
276 user_variables=user_variables,
276 user_variables=user_variables,
277 user_expressions=user_expressions,
277 user_expressions=user_expressions,
278 allow_stdin=allow_stdin,
278 allow_stdin=allow_stdin,
279 )
279 )
280 msg = self.session.msg('execute_request', content)
280 msg = self.session.msg('execute_request', content)
281 self._queue_send(msg)
281 self._queue_send(msg)
282 return msg['header']['msg_id']
282 return msg['header']['msg_id']
283
283
284 def complete(self, text, line, cursor_pos, block=None):
284 def complete(self, text, line, cursor_pos, block=None):
285 """Tab complete text in the kernel's namespace.
285 """Tab complete text in the kernel's namespace.
286
286
287 Parameters
287 Parameters
288 ----------
288 ----------
289 text : str
289 text : str
290 The text to complete.
290 The text to complete.
291 line : str
291 line : str
292 The full line of text that is the surrounding context for the
292 The full line of text that is the surrounding context for the
293 text to complete.
293 text to complete.
294 cursor_pos : int
294 cursor_pos : int
295 The position of the cursor in the line where the completion was
295 The position of the cursor in the line where the completion was
296 requested.
296 requested.
297 block : str, optional
297 block : str, optional
298 The full block of code in which the completion is being requested.
298 The full block of code in which the completion is being requested.
299
299
300 Returns
300 Returns
301 -------
301 -------
302 The msg_id of the message sent.
302 The msg_id of the message sent.
303 """
303 """
304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
305 msg = self.session.msg('complete_request', content)
305 msg = self.session.msg('complete_request', content)
306 self._queue_send(msg)
306 self._queue_send(msg)
307 return msg['header']['msg_id']
307 return msg['header']['msg_id']
308
308
309 def object_info(self, oname, detail_level=0):
309 def object_info(self, oname, detail_level=0):
310 """Get metadata information about an object.
310 """Get metadata information about an object.
311
311
312 Parameters
312 Parameters
313 ----------
313 ----------
314 oname : str
314 oname : str
315 A string specifying the object name.
315 A string specifying the object name.
316 detail_level : int, optional
316 detail_level : int, optional
317 The level of detail for the introspection (0-2)
317 The level of detail for the introspection (0-2)
318
318
319 Returns
319 Returns
320 -------
320 -------
321 The msg_id of the message sent.
321 The msg_id of the message sent.
322 """
322 """
323 content = dict(oname=oname, detail_level=detail_level)
323 content = dict(oname=oname, detail_level=detail_level)
324 msg = self.session.msg('object_info_request', content)
324 msg = self.session.msg('object_info_request', content)
325 self._queue_send(msg)
325 self._queue_send(msg)
326 return msg['header']['msg_id']
326 return msg['header']['msg_id']
327
327
328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
329 """Get entries from the history list.
329 """Get entries from the history list.
330
330
331 Parameters
331 Parameters
332 ----------
332 ----------
333 raw : bool
333 raw : bool
334 If True, return the raw input.
334 If True, return the raw input.
335 output : bool
335 output : bool
336 If True, then return the output as well.
336 If True, then return the output as well.
337 hist_access_type : str
337 hist_access_type : str
338 'range' (fill in session, start and stop params), 'tail' (fill in n)
338 'range' (fill in session, start and stop params), 'tail' (fill in n)
339 or 'search' (fill in pattern param).
339 or 'search' (fill in pattern param).
340
340
341 session : int
341 session : int
342 For a range request, the session from which to get lines. Session
342 For a range request, the session from which to get lines. Session
343 numbers are positive integers; negative ones count back from the
343 numbers are positive integers; negative ones count back from the
344 current session.
344 current session.
345 start : int
345 start : int
346 The first line number of a history range.
346 The first line number of a history range.
347 stop : int
347 stop : int
348 The final (excluded) line number of a history range.
348 The final (excluded) line number of a history range.
349
349
350 n : int
350 n : int
351 The number of lines of history to get for a tail request.
351 The number of lines of history to get for a tail request.
352
352
353 pattern : str
353 pattern : str
354 The glob-syntax pattern for a search request.
354 The glob-syntax pattern for a search request.
355
355
356 Returns
356 Returns
357 -------
357 -------
358 The msg_id of the message sent.
358 The msg_id of the message sent.
359 """
359 """
360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
361 **kwargs)
361 **kwargs)
362 msg = self.session.msg('history_request', content)
362 msg = self.session.msg('history_request', content)
363 self._queue_send(msg)
363 self._queue_send(msg)
364 return msg['header']['msg_id']
364 return msg['header']['msg_id']
365
365
366 def shutdown(self, restart=False):
366 def shutdown(self, restart=False):
367 """Request an immediate kernel shutdown.
367 """Request an immediate kernel shutdown.
368
368
369 Upon receipt of the (empty) reply, client code can safely assume that
369 Upon receipt of the (empty) reply, client code can safely assume that
370 the kernel has shut down and it's safe to forcefully terminate it if
370 the kernel has shut down and it's safe to forcefully terminate it if
371 it's still alive.
371 it's still alive.
372
372
373 The kernel will send the reply via a function registered with Python's
373 The kernel will send the reply via a function registered with Python's
374 atexit module, ensuring it's truly done as the kernel is done with all
374 atexit module, ensuring it's truly done as the kernel is done with all
375 normal operation.
375 normal operation.
376 """
376 """
377 # Send quit message to kernel. Once we implement kernel-side setattr,
377 # Send quit message to kernel. Once we implement kernel-side setattr,
378 # this should probably be done that way, but for now this will do.
378 # this should probably be done that way, but for now this will do.
379 msg = self.session.msg('shutdown_request', {'restart':restart})
379 msg = self.session.msg('shutdown_request', {'restart':restart})
380 self._queue_send(msg)
380 self._queue_send(msg)
381 return msg['header']['msg_id']
381 return msg['header']['msg_id']
382
382
383
383
384
384
385 class SubSocketChannel(ZMQSocketChannel):
385 class SubSocketChannel(ZMQSocketChannel):
386 """The SUB channel which listens for messages that the kernel publishes.
386 """The SUB channel which listens for messages that the kernel publishes.
387 """
387 """
388
388
389 def __init__(self, context, session, address):
389 def __init__(self, context, session, address):
390 super(SubSocketChannel, self).__init__(context, session, address)
390 super(SubSocketChannel, self).__init__(context, session, address)
391 self.ioloop = ioloop.IOLoop()
391 self.ioloop = ioloop.IOLoop()
392
392
393 def run(self):
393 def run(self):
394 """The thread's main activity. Call start() instead."""
394 """The thread's main activity. Call start() instead."""
395 self.socket = self.context.socket(zmq.SUB)
395 self.socket = self.context.socket(zmq.SUB)
396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
398 self.socket.connect('tcp://%s:%i' % self.address)
398 self.socket.connect('tcp://%s:%i' % self.address)
399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
400 self.stream.on_recv(self._handle_recv)
400 self.stream.on_recv(self._handle_recv)
401 self._run_loop()
401 self._run_loop()
402 try:
402 try:
403 self.socket.close()
403 self.socket.close()
404 except:
404 except:
405 pass
405 pass
406
406
407 def stop(self):
407 def stop(self):
408 self.ioloop.stop()
408 self.ioloop.stop()
409 super(SubSocketChannel, self).stop()
409 super(SubSocketChannel, self).stop()
410
410
411 def call_handlers(self, msg):
411 def call_handlers(self, msg):
412 """This method is called in the ioloop thread when a message arrives.
412 """This method is called in the ioloop thread when a message arrives.
413
413
414 Subclasses should override this method to handle incoming messages.
414 Subclasses should override this method to handle incoming messages.
415 It is important to remember that this method is called in the thread
415 It is important to remember that this method is called in the thread
416 so that some logic must be done to ensure that the application leve
416 so that some logic must be done to ensure that the application leve
417 handlers are called in the application thread.
417 handlers are called in the application thread.
418 """
418 """
419 raise NotImplementedError('call_handlers must be defined in a subclass.')
419 raise NotImplementedError('call_handlers must be defined in a subclass.')
420
420
421 def flush(self, timeout=1.0):
421 def flush(self, timeout=1.0):
422 """Immediately processes all pending messages on the SUB channel.
422 """Immediately processes all pending messages on the SUB channel.
423
423
424 Callers should use this method to ensure that :method:`call_handlers`
424 Callers should use this method to ensure that :method:`call_handlers`
425 has been called for all messages that have been received on the
425 has been called for all messages that have been received on the
426 0MQ SUB socket of this channel.
426 0MQ SUB socket of this channel.
427
427
428 This method is thread safe.
428 This method is thread safe.
429
429
430 Parameters
430 Parameters
431 ----------
431 ----------
432 timeout : float, optional
432 timeout : float, optional
433 The maximum amount of time to spend flushing, in seconds. The
433 The maximum amount of time to spend flushing, in seconds. The
434 default is one second.
434 default is one second.
435 """
435 """
436 # We do the IOLoop callback process twice to ensure that the IOLoop
436 # We do the IOLoop callback process twice to ensure that the IOLoop
437 # gets to perform at least one full poll.
437 # gets to perform at least one full poll.
438 stop_time = time.time() + timeout
438 stop_time = time.time() + timeout
439 for i in xrange(2):
439 for i in xrange(2):
440 self._flushed = False
440 self._flushed = False
441 self.ioloop.add_callback(self._flush)
441 self.ioloop.add_callback(self._flush)
442 while not self._flushed and time.time() < stop_time:
442 while not self._flushed and time.time() < stop_time:
443 time.sleep(0.01)
443 time.sleep(0.01)
444
444
445 def _flush(self):
445 def _flush(self):
446 """Callback for :method:`self.flush`."""
446 """Callback for :method:`self.flush`."""
447 self.stream.flush()
447 self.stream.flush()
448 self._flushed = True
448 self._flushed = True
449
449
450
450
451 class StdInSocketChannel(ZMQSocketChannel):
451 class StdInSocketChannel(ZMQSocketChannel):
452 """A reply channel to handle raw_input requests that the kernel makes."""
452 """A reply channel to handle raw_input requests that the kernel makes."""
453
453
454 msg_queue = None
454 msg_queue = None
455
455
456 def __init__(self, context, session, address):
456 def __init__(self, context, session, address):
457 super(StdInSocketChannel, self).__init__(context, session, address)
457 super(StdInSocketChannel, self).__init__(context, session, address)
458 self.ioloop = ioloop.IOLoop()
458 self.ioloop = ioloop.IOLoop()
459
459
460 def run(self):
460 def run(self):
461 """The thread's main activity. Call start() instead."""
461 """The thread's main activity. Call start() instead."""
462 self.socket = self.context.socket(zmq.DEALER)
462 self.socket = self.context.socket(zmq.DEALER)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 self.socket.connect('tcp://%s:%i' % self.address)
464 self.socket.connect('tcp://%s:%i' % self.address)
465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
466 self.stream.on_recv(self._handle_recv)
466 self.stream.on_recv(self._handle_recv)
467 self._run_loop()
467 self._run_loop()
468 try:
468 try:
469 self.socket.close()
469 self.socket.close()
470 except:
470 except:
471 pass
471 pass
472
472
473
473
474 def stop(self):
474 def stop(self):
475 self.ioloop.stop()
475 self.ioloop.stop()
476 super(StdInSocketChannel, self).stop()
476 super(StdInSocketChannel, self).stop()
477
477
478 def call_handlers(self, msg):
478 def call_handlers(self, msg):
479 """This method is called in the ioloop thread when a message arrives.
479 """This method is called in the ioloop thread when a message arrives.
480
480
481 Subclasses should override this method to handle incoming messages.
481 Subclasses should override this method to handle incoming messages.
482 It is important to remember that this method is called in the thread
482 It is important to remember that this method is called in the thread
483 so that some logic must be done to ensure that the application leve
483 so that some logic must be done to ensure that the application leve
484 handlers are called in the application thread.
484 handlers are called in the application thread.
485 """
485 """
486 raise NotImplementedError('call_handlers must be defined in a subclass.')
486 raise NotImplementedError('call_handlers must be defined in a subclass.')
487
487
488 def input(self, string):
488 def input(self, string):
489 """Send a string of raw input to the kernel."""
489 """Send a string of raw input to the kernel."""
490 content = dict(value=string)
490 content = dict(value=string)
491 msg = self.session.msg('input_reply', content)
491 msg = self.session.msg('input_reply', content)
492 self._queue_send(msg)
492 self._queue_send(msg)
493
493
494
494
495 class HBSocketChannel(ZMQSocketChannel):
495 class HBSocketChannel(ZMQSocketChannel):
496 """The heartbeat channel which monitors the kernel heartbeat.
496 """The heartbeat channel which monitors the kernel heartbeat.
497
497
498 Note that the heartbeat channel is paused by default. As long as you start
498 Note that the heartbeat channel is paused by default. As long as you start
499 this channel, the kernel manager will ensure that it is paused and un-paused
499 this channel, the kernel manager will ensure that it is paused and un-paused
500 as appropriate.
500 as appropriate.
501 """
501 """
502
502
503 time_to_dead = 3.0
503 time_to_dead = 3.0
504 socket = None
504 socket = None
505 poller = None
505 poller = None
506 _running = None
506 _running = None
507 _pause = None
507 _pause = None
508 _beating = None
508 _beating = None
509
509
510 def __init__(self, context, session, address):
510 def __init__(self, context, session, address):
511 super(HBSocketChannel, self).__init__(context, session, address)
511 super(HBSocketChannel, self).__init__(context, session, address)
512 self._running = False
512 self._running = False
513 self._pause =True
513 self._pause =True
514 self.poller = zmq.Poller()
514 self.poller = zmq.Poller()
515
515
516 def _create_socket(self):
516 def _create_socket(self):
517 if self.socket is not None:
517 if self.socket is not None:
518 # close previous socket, before opening a new one
518 # close previous socket, before opening a new one
519 self.poller.unregister(self.socket)
519 self.poller.unregister(self.socket)
520 self.socket.close()
520 self.socket.close()
521 self.socket = self.context.socket(zmq.REQ)
521 self.socket = self.context.socket(zmq.REQ)
522 self.socket.setsockopt(zmq.LINGER, 0)
522 self.socket.setsockopt(zmq.LINGER, 0)
523 self.socket.connect('tcp://%s:%i' % self.address)
523 self.socket.connect('tcp://%s:%i' % self.address)
524
524
525 self.poller.register(self.socket, zmq.POLLIN)
525 self.poller.register(self.socket, zmq.POLLIN)
526
526
527 def _poll(self, start_time):
527 def _poll(self, start_time):
528 """poll for heartbeat replies until we reach self.time_to_dead
528 """poll for heartbeat replies until we reach self.time_to_dead
529
529
530 Ignores interrupts, and returns the result of poll(), which
530 Ignores interrupts, and returns the result of poll(), which
531 will be an empty list if no messages arrived before the timeout,
531 will be an empty list if no messages arrived before the timeout,
532 or the event tuple if there is a message to receive.
532 or the event tuple if there is a message to receive.
533 """
533 """
534
534
535 until_dead = self.time_to_dead - (time.time() - start_time)
535 until_dead = self.time_to_dead - (time.time() - start_time)
536 # ensure poll at least once
536 # ensure poll at least once
537 until_dead = max(until_dead, 1e-3)
537 until_dead = max(until_dead, 1e-3)
538 events = []
538 events = []
539 while True:
539 while True:
540 try:
540 try:
541 events = self.poller.poll(1000 * until_dead)
541 events = self.poller.poll(1000 * until_dead)
542 except ZMQError as e:
542 except ZMQError as e:
543 if e.errno == errno.EINTR:
543 if e.errno == errno.EINTR:
544 # ignore interrupts during heartbeat
544 # ignore interrupts during heartbeat
545 # this may never actually happen
545 # this may never actually happen
546 until_dead = self.time_to_dead - (time.time() - start_time)
546 until_dead = self.time_to_dead - (time.time() - start_time)
547 until_dead = max(until_dead, 1e-3)
547 until_dead = max(until_dead, 1e-3)
548 pass
548 pass
549 else:
549 else:
550 raise
550 raise
551 except Exception:
551 except Exception:
552 if self._exiting:
552 if self._exiting:
553 break
553 break
554 else:
554 else:
555 raise
555 raise
556 else:
556 else:
557 break
557 break
558 return events
558 return events
559
559
560 def run(self):
560 def run(self):
561 """The thread's main activity. Call start() instead."""
561 """The thread's main activity. Call start() instead."""
562 self._create_socket()
562 self._create_socket()
563 self._running = True
563 self._running = True
564 self._beating = True
564 self._beating = True
565
565
566 while self._running:
566 while self._running:
567 if self._pause:
567 if self._pause:
568 # just sleep, and skip the rest of the loop
568 # just sleep, and skip the rest of the loop
569 time.sleep(self.time_to_dead)
569 time.sleep(self.time_to_dead)
570 continue
570 continue
571
571
572 since_last_heartbeat = 0.0
572 since_last_heartbeat = 0.0
573 # io.rprint('Ping from HB channel') # dbg
573 # io.rprint('Ping from HB channel') # dbg
574 # no need to catch EFSM here, because the previous event was
574 # no need to catch EFSM here, because the previous event was
575 # either a recv or connect, which cannot be followed by EFSM
575 # either a recv or connect, which cannot be followed by EFSM
576 self.socket.send(b'ping')
576 self.socket.send(b'ping')
577 request_time = time.time()
577 request_time = time.time()
578 ready = self._poll(request_time)
578 ready = self._poll(request_time)
579 if ready:
579 if ready:
580 self._beating = True
580 self._beating = True
581 # the poll above guarantees we have something to recv
581 # the poll above guarantees we have something to recv
582 self.socket.recv()
582 self.socket.recv()
583 # sleep the remainder of the cycle
583 # sleep the remainder of the cycle
584 remainder = self.time_to_dead - (time.time() - request_time)
584 remainder = self.time_to_dead - (time.time() - request_time)
585 if remainder > 0:
585 if remainder > 0:
586 time.sleep(remainder)
586 time.sleep(remainder)
587 continue
587 continue
588 else:
588 else:
589 # nothing was received within the time limit, signal heart failure
589 # nothing was received within the time limit, signal heart failure
590 self._beating = False
590 self._beating = False
591 since_last_heartbeat = time.time() - request_time
591 since_last_heartbeat = time.time() - request_time
592 self.call_handlers(since_last_heartbeat)
592 self.call_handlers(since_last_heartbeat)
593 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 # and close/reopen the socket, because the REQ/REP cycle has been broken
594 self._create_socket()
594 self._create_socket()
595 continue
595 continue
596 try:
596 try:
597 self.socket.close()
597 self.socket.close()
598 except:
598 except:
599 pass
599 pass
600
600
601 def pause(self):
601 def pause(self):
602 """Pause the heartbeat."""
602 """Pause the heartbeat."""
603 self._pause = True
603 self._pause = True
604
604
605 def unpause(self):
605 def unpause(self):
606 """Unpause the heartbeat."""
606 """Unpause the heartbeat."""
607 self._pause = False
607 self._pause = False
608
608
609 def is_beating(self):
609 def is_beating(self):
610 """Is the heartbeat running and responsive (and not paused)."""
610 """Is the heartbeat running and responsive (and not paused)."""
611 if self.is_alive() and not self._pause and self._beating:
611 if self.is_alive() and not self._pause and self._beating:
612 return True
612 return True
613 else:
613 else:
614 return False
614 return False
615
615
616 def stop(self):
616 def stop(self):
617 self._running = False
617 self._running = False
618 super(HBSocketChannel, self).stop()
618 super(HBSocketChannel, self).stop()
619
619
620 def call_handlers(self, since_last_heartbeat):
620 def call_handlers(self, since_last_heartbeat):
621 """This method is called in the ioloop thread when a message arrives.
621 """This method is called in the ioloop thread when a message arrives.
622
622
623 Subclasses should override this method to handle incoming messages.
623 Subclasses should override this method to handle incoming messages.
624 It is important to remember that this method is called in the thread
624 It is important to remember that this method is called in the thread
625 so that some logic must be done to ensure that the application level
625 so that some logic must be done to ensure that the application level
626 handlers are called in the application thread.
626 handlers are called in the application thread.
627 """
627 """
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
629
629
630
630
631 #-----------------------------------------------------------------------------
631 #-----------------------------------------------------------------------------
632 # Main kernel manager class
632 # Main kernel manager class
633 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
634
634
635 class KernelManager(HasTraits):
635 class KernelManager(HasTraits):
636 """ Manages a kernel for a frontend.
636 """ Manages a kernel for a frontend.
637
637
638 The SUB channel is for the frontend to receive messages published by the
638 The SUB channel is for the frontend to receive messages published by the
639 kernel.
639 kernel.
640
640
641 The REQ channel is for the frontend to make requests of the kernel.
641 The REQ channel is for the frontend to make requests of the kernel.
642
642
643 The REP channel is for the kernel to request stdin (raw_input) from the
643 The REP channel is for the kernel to request stdin (raw_input) from the
644 frontend.
644 frontend.
645 """
645 """
646 # config object for passing to child configurables
646 # config object for passing to child configurables
647 config = Instance(Config)
647 config = Instance(Config)
648
648
649 # The PyZMQ Context to use for communication with the kernel.
649 # The PyZMQ Context to use for communication with the kernel.
650 context = Instance(zmq.Context)
650 context = Instance(zmq.Context)
651 def _context_default(self):
651 def _context_default(self):
652 return zmq.Context.instance()
652 return zmq.Context.instance()
653
653
654 # The Session to use for communication with the kernel.
654 # The Session to use for communication with the kernel.
655 session = Instance(Session)
655 session = Instance(Session)
656
656
657 # The kernel process with which the KernelManager is communicating.
657 # The kernel process with which the KernelManager is communicating.
658 kernel = Instance(Popen)
658 kernel = Instance(Popen)
659
659
660 # The addresses for the communication channels.
660 # The addresses for the communication channels.
661 connection_file = Unicode('')
661 connection_file = Unicode('')
662 ip = Unicode(LOCALHOST)
662 ip = Unicode(LOCALHOST)
663 def _ip_changed(self, name, old, new):
663 def _ip_changed(self, name, old, new):
664 if new == '*':
664 if new == '*':
665 self.ip = '0.0.0.0'
665 self.ip = '0.0.0.0'
666 shell_port = Integer(0)
666 shell_port = Integer(0)
667 iopub_port = Integer(0)
667 iopub_port = Integer(0)
668 stdin_port = Integer(0)
668 stdin_port = Integer(0)
669 hb_port = Integer(0)
669 hb_port = Integer(0)
670
670
671 # The classes to use for the various channels.
671 # The classes to use for the various channels.
672 shell_channel_class = Type(ShellSocketChannel)
672 shell_channel_class = Type(ShellSocketChannel)
673 sub_channel_class = Type(SubSocketChannel)
673 sub_channel_class = Type(SubSocketChannel)
674 stdin_channel_class = Type(StdInSocketChannel)
674 stdin_channel_class = Type(StdInSocketChannel)
675 hb_channel_class = Type(HBSocketChannel)
675 hb_channel_class = Type(HBSocketChannel)
676
676
677 # Protected traits.
677 # Protected traits.
678 _launch_args = Any
678 _launch_args = Any
679 _shell_channel = Any
679 _shell_channel = Any
680 _sub_channel = Any
680 _sub_channel = Any
681 _stdin_channel = Any
681 _stdin_channel = Any
682 _hb_channel = Any
682 _hb_channel = Any
683 _connection_file_written=Bool(False)
683 _connection_file_written=Bool(False)
684
684
685 def __init__(self, **kwargs):
685 def __init__(self, **kwargs):
686 super(KernelManager, self).__init__(**kwargs)
686 super(KernelManager, self).__init__(**kwargs)
687 if self.session is None:
687 if self.session is None:
688 self.session = Session(config=self.config)
688 self.session = Session(config=self.config)
689
689
690 def __del__(self):
690 def __del__(self):
691 self.cleanup_connection_file()
691 self.cleanup_connection_file()
692
692
693
693
694 #--------------------------------------------------------------------------
694 #--------------------------------------------------------------------------
695 # Channel management methods:
695 # Channel management methods:
696 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
697
697
698 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
698 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
699 """Starts the channels for this kernel.
699 """Starts the channels for this kernel.
700
700
701 This will create the channels if they do not exist and then start
701 This will create the channels if they do not exist and then start
702 them. If port numbers of 0 are being used (random ports) then you
702 them. If port numbers of 0 are being used (random ports) then you
703 must first call :method:`start_kernel`. If the channels have been
703 must first call :method:`start_kernel`. If the channels have been
704 stopped and you call this, :class:`RuntimeError` will be raised.
704 stopped and you call this, :class:`RuntimeError` will be raised.
705 """
705 """
706 if shell:
706 if shell:
707 self.shell_channel.start()
707 self.shell_channel.start()
708 if sub:
708 if sub:
709 self.sub_channel.start()
709 self.sub_channel.start()
710 if stdin:
710 if stdin:
711 self.stdin_channel.start()
711 self.stdin_channel.start()
712 self.shell_channel.allow_stdin = True
712 self.shell_channel.allow_stdin = True
713 else:
713 else:
714 self.shell_channel.allow_stdin = False
714 self.shell_channel.allow_stdin = False
715 if hb:
715 if hb:
716 self.hb_channel.start()
716 self.hb_channel.start()
717
717
718 def stop_channels(self):
718 def stop_channels(self):
719 """Stops all the running channels for this kernel.
719 """Stops all the running channels for this kernel.
720 """
720 """
721 if self.shell_channel.is_alive():
721 if self.shell_channel.is_alive():
722 self.shell_channel.stop()
722 self.shell_channel.stop()
723 if self.sub_channel.is_alive():
723 if self.sub_channel.is_alive():
724 self.sub_channel.stop()
724 self.sub_channel.stop()
725 if self.stdin_channel.is_alive():
725 if self.stdin_channel.is_alive():
726 self.stdin_channel.stop()
726 self.stdin_channel.stop()
727 if self.hb_channel.is_alive():
727 if self.hb_channel.is_alive():
728 self.hb_channel.stop()
728 self.hb_channel.stop()
729
729
730 @property
730 @property
731 def channels_running(self):
731 def channels_running(self):
732 """Are any of the channels created and running?"""
732 """Are any of the channels created and running?"""
733 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
733 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
734 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
734 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
735
735
736 #--------------------------------------------------------------------------
736 #--------------------------------------------------------------------------
737 # Kernel process management methods:
737 # Kernel process management methods:
738 #--------------------------------------------------------------------------
738 #--------------------------------------------------------------------------
739
739
740 def cleanup_connection_file(self):
740 def cleanup_connection_file(self):
741 """cleanup connection file *if we wrote it*
741 """cleanup connection file *if we wrote it*
742
742
743 Will not raise if the connection file was already removed somehow.
743 Will not raise if the connection file was already removed somehow.
744 """
744 """
745 if self._connection_file_written:
745 if self._connection_file_written:
746 # cleanup connection files on full shutdown of kernel we started
746 # cleanup connection files on full shutdown of kernel we started
747 self._connection_file_written = False
747 self._connection_file_written = False
748 try:
748 try:
749 os.remove(self.connection_file)
749 os.remove(self.connection_file)
750 except OSError:
750 except OSError:
751 pass
751 pass
752
752
753 def load_connection_file(self):
753 def load_connection_file(self):
754 """load connection info from JSON dict in self.connection_file"""
754 """load connection info from JSON dict in self.connection_file"""
755 with open(self.connection_file) as f:
755 with open(self.connection_file) as f:
756 cfg = json.loads(f.read())
756 cfg = json.loads(f.read())
757
757
758 self.ip = cfg['ip']
758 self.ip = cfg['ip']
759 self.shell_port = cfg['shell_port']
759 self.shell_port = cfg['shell_port']
760 self.stdin_port = cfg['stdin_port']
760 self.stdin_port = cfg['stdin_port']
761 self.iopub_port = cfg['iopub_port']
761 self.iopub_port = cfg['iopub_port']
762 self.hb_port = cfg['hb_port']
762 self.hb_port = cfg['hb_port']
763 self.session.key = str_to_bytes(cfg['key'])
763 self.session.key = str_to_bytes(cfg['key'])
764
764
765 def write_connection_file(self):
765 def write_connection_file(self):
766 """write connection info to JSON dict in self.connection_file"""
766 """write connection info to JSON dict in self.connection_file"""
767 if self._connection_file_written:
767 if self._connection_file_written:
768 return
768 return
769 self.connection_file,cfg = write_connection_file(self.connection_file,
769 self.connection_file,cfg = write_connection_file(self.connection_file,
770 ip=self.ip, key=self.session.key,
770 ip=self.ip, key=self.session.key,
771 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
771 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
772 shell_port=self.shell_port, hb_port=self.hb_port)
772 shell_port=self.shell_port, hb_port=self.hb_port)
773 # write_connection_file also sets default ports:
773 # write_connection_file also sets default ports:
774 self.shell_port = cfg['shell_port']
774 self.shell_port = cfg['shell_port']
775 self.stdin_port = cfg['stdin_port']
775 self.stdin_port = cfg['stdin_port']
776 self.iopub_port = cfg['iopub_port']
776 self.iopub_port = cfg['iopub_port']
777 self.hb_port = cfg['hb_port']
777 self.hb_port = cfg['hb_port']
778
778
779 self._connection_file_written = True
779 self._connection_file_written = True
780
780
781 def start_kernel(self, **kw):
781 def start_kernel(self, **kw):
782 """Starts a kernel process and configures the manager to use it.
782 """Starts a kernel process and configures the manager to use it.
783
783
784 If random ports (port=0) are being used, this method must be called
784 If random ports (port=0) are being used, this method must be called
785 before the channels are created.
785 before the channels are created.
786
786
787 Parameters:
787 Parameters:
788 -----------
788 -----------
789 launcher : callable, optional (default None)
789 launcher : callable, optional (default None)
790 A custom function for launching the kernel process (generally a
790 A custom function for launching the kernel process (generally a
791 wrapper around ``entry_point.base_launch_kernel``). In most cases,
791 wrapper around ``entry_point.base_launch_kernel``). In most cases,
792 it should not be necessary to use this parameter.
792 it should not be necessary to use this parameter.
793
793
794 **kw : optional
794 **kw : optional
795 See respective options for IPython and Python kernels.
795 See respective options for IPython and Python kernels.
796 """
796 """
797 if self.ip not in LOCAL_IPS:
797 if self.ip not in LOCAL_IPS:
798 raise RuntimeError("Can only launch a kernel on a local interface. "
798 raise RuntimeError("Can only launch a kernel on a local interface. "
799 "Make sure that the '*_address' attributes are "
799 "Make sure that the '*_address' attributes are "
800 "configured properly. "
800 "configured properly. "
801 "Currently valid addresses are: %s"%LOCAL_IPS
801 "Currently valid addresses are: %s"%LOCAL_IPS
802 )
802 )
803
803
804 # write connection file / get default ports
804 # write connection file / get default ports
805 self.write_connection_file()
805 self.write_connection_file()
806
806
807 self._launch_args = kw.copy()
807 self._launch_args = kw.copy()
808 launch_kernel = kw.pop('launcher', None)
808 launch_kernel = kw.pop('launcher', None)
809 if launch_kernel is None:
809 if launch_kernel is None:
810 from ipkernel import launch_kernel
810 from ipkernel import launch_kernel
811 self.kernel = launch_kernel(fname=self.connection_file, **kw)
811 self.kernel = launch_kernel(fname=self.connection_file, **kw)
812
812
813 def shutdown_kernel(self, restart=False):
813 def shutdown_kernel(self, restart=False):
814 """ Attempts to the stop the kernel process cleanly. If the kernel
814 """ Attempts to the stop the kernel process cleanly. If the kernel
815 cannot be stopped, it is killed, if possible.
815 cannot be stopped, it is killed, if possible.
816 """
816 """
817 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
817 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
818 if sys.platform == 'win32':
818 if sys.platform == 'win32':
819 self.kill_kernel()
819 self.kill_kernel()
820 return
820 return
821
821
822 # Pause the heart beat channel if it exists.
822 # Pause the heart beat channel if it exists.
823 if self._hb_channel is not None:
823 if self._hb_channel is not None:
824 self._hb_channel.pause()
824 self._hb_channel.pause()
825
825
826 # Don't send any additional kernel kill messages immediately, to give
826 # Don't send any additional kernel kill messages immediately, to give
827 # the kernel a chance to properly execute shutdown actions. Wait for at
827 # the kernel a chance to properly execute shutdown actions. Wait for at
828 # most 1s, checking every 0.1s.
828 # most 1s, checking every 0.1s.
829 self.shell_channel.shutdown(restart=restart)
829 self.shell_channel.shutdown(restart=restart)
830 for i in range(10):
830 for i in range(10):
831 if self.is_alive:
831 if self.is_alive:
832 time.sleep(0.1)
832 time.sleep(0.1)
833 else:
833 else:
834 break
834 break
835 else:
835 else:
836 # OK, we've waited long enough.
836 # OK, we've waited long enough.
837 if self.has_kernel:
837 if self.has_kernel:
838 self.kill_kernel()
838 self.kill_kernel()
839
839
840 if not restart and self._connection_file_written:
840 if not restart and self._connection_file_written:
841 # cleanup connection files on full shutdown of kernel we started
841 # cleanup connection files on full shutdown of kernel we started
842 self._connection_file_written = False
842 self._connection_file_written = False
843 try:
843 try:
844 os.remove(self.connection_file)
844 os.remove(self.connection_file)
845 except IOError:
845 except IOError:
846 pass
846 pass
847
847
848 def restart_kernel(self, now=False, **kw):
848 def restart_kernel(self, now=False, **kw):
849 """Restarts a kernel with the arguments that were used to launch it.
849 """Restarts a kernel with the arguments that were used to launch it.
850
850
851 If the old kernel was launched with random ports, the same ports will be
851 If the old kernel was launched with random ports, the same ports will be
852 used for the new kernel.
852 used for the new kernel.
853
853
854 Parameters
854 Parameters
855 ----------
855 ----------
856 now : bool, optional
856 now : bool, optional
857 If True, the kernel is forcefully restarted *immediately*, without
857 If True, the kernel is forcefully restarted *immediately*, without
858 having a chance to do any cleanup action. Otherwise the kernel is
858 having a chance to do any cleanup action. Otherwise the kernel is
859 given 1s to clean up before a forceful restart is issued.
859 given 1s to clean up before a forceful restart is issued.
860
860
861 In all cases the kernel is restarted, the only difference is whether
861 In all cases the kernel is restarted, the only difference is whether
862 it is given a chance to perform a clean shutdown or not.
862 it is given a chance to perform a clean shutdown or not.
863
863
864 **kw : optional
864 **kw : optional
865 Any options specified here will replace those used to launch the
865 Any options specified here will replace those used to launch the
866 kernel.
866 kernel.
867 """
867 """
868 if self._launch_args is None:
868 if self._launch_args is None:
869 raise RuntimeError("Cannot restart the kernel. "
869 raise RuntimeError("Cannot restart the kernel. "
870 "No previous call to 'start_kernel'.")
870 "No previous call to 'start_kernel'.")
871 else:
871 else:
872 # Stop currently running kernel.
872 # Stop currently running kernel.
873 if self.has_kernel:
873 if self.has_kernel:
874 if now:
874 if now:
875 self.kill_kernel()
875 self.kill_kernel()
876 else:
876 else:
877 self.shutdown_kernel(restart=True)
877 self.shutdown_kernel(restart=True)
878
878
879 # Start new kernel.
879 # Start new kernel.
880 self._launch_args.update(kw)
880 self._launch_args.update(kw)
881 self.start_kernel(**self._launch_args)
881 self.start_kernel(**self._launch_args)
882
882
883 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
883 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
884 # unless there is some delay here.
884 # unless there is some delay here.
885 if sys.platform == 'win32':
885 if sys.platform == 'win32':
886 time.sleep(0.2)
886 time.sleep(0.2)
887
887
888 @property
888 @property
889 def has_kernel(self):
889 def has_kernel(self):
890 """Returns whether a kernel process has been specified for the kernel
890 """Returns whether a kernel process has been specified for the kernel
891 manager.
891 manager.
892 """
892 """
893 return self.kernel is not None
893 return self.kernel is not None
894
894
895 def kill_kernel(self):
895 def kill_kernel(self):
896 """ Kill the running kernel. """
896 """ Kill the running kernel. """
897 if self.has_kernel:
897 if self.has_kernel:
898 # Pause the heart beat channel if it exists.
898 # Pause the heart beat channel if it exists.
899 if self._hb_channel is not None:
899 if self._hb_channel is not None:
900 self._hb_channel.pause()
900 self._hb_channel.pause()
901
901
902 # Attempt to kill the kernel.
902 # Attempt to kill the kernel.
903 try:
903 try:
904 self.kernel.kill()
904 self.kernel.kill()
905 except OSError as e:
905 except OSError as e:
906 # In Windows, we will get an Access Denied error if the process
906 # In Windows, we will get an Access Denied error if the process
907 # has already terminated. Ignore it.
907 # has already terminated. Ignore it.
908 if sys.platform == 'win32':
908 if sys.platform == 'win32':
909 if e.winerror != 5:
909 if e.winerror != 5:
910 raise
910 raise
911 # On Unix, we may get an ESRCH error if the process has already
911 # On Unix, we may get an ESRCH error if the process has already
912 # terminated. Ignore it.
912 # terminated. Ignore it.
913 else:
913 else:
914 from errno import ESRCH
914 from errno import ESRCH
915 if e.errno != ESRCH:
915 if e.errno != ESRCH:
916 raise
916 raise
917 self.kernel = None
917 self.kernel = None
918 else:
918 else:
919 raise RuntimeError("Cannot kill kernel. No kernel is running!")
919 raise RuntimeError("Cannot kill kernel. No kernel is running!")
920
920
921 def interrupt_kernel(self):
921 def interrupt_kernel(self):
922 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
922 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
923 well supported on all platforms.
923 well supported on all platforms.
924 """
924 """
925 if self.has_kernel:
925 if self.has_kernel:
926 if sys.platform == 'win32':
926 if sys.platform == 'win32':
927 from parentpoller import ParentPollerWindows as Poller
927 from parentpoller import ParentPollerWindows as Poller
928 Poller.send_interrupt(self.kernel.win32_interrupt_event)
928 Poller.send_interrupt(self.kernel.win32_interrupt_event)
929 else:
929 else:
930 self.kernel.send_signal(signal.SIGINT)
930 self.kernel.send_signal(signal.SIGINT)
931 else:
931 else:
932 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
932 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
933
933
934 def signal_kernel(self, signum):
934 def signal_kernel(self, signum):
935 """ Sends a signal to the kernel. Note that since only SIGTERM is
935 """ Sends a signal to the kernel. Note that since only SIGTERM is
936 supported on Windows, this function is only useful on Unix systems.
936 supported on Windows, this function is only useful on Unix systems.
937 """
937 """
938 if self.has_kernel:
938 if self.has_kernel:
939 self.kernel.send_signal(signum)
939 self.kernel.send_signal(signum)
940 else:
940 else:
941 raise RuntimeError("Cannot signal kernel. No kernel is running!")
941 raise RuntimeError("Cannot signal kernel. No kernel is running!")
942
942
943 @property
943 @property
944 def is_alive(self):
944 def is_alive(self):
945 """Is the kernel process still running?"""
945 """Is the kernel process still running?"""
946 if self.has_kernel:
946 if self.has_kernel:
947 if self.kernel.poll() is None:
947 if self.kernel.poll() is None:
948 return True
948 return True
949 else:
949 else:
950 return False
950 return False
951 elif self._hb_channel is not None:
951 elif self._hb_channel is not None:
952 # We didn't start the kernel with this KernelManager so we
952 # We didn't start the kernel with this KernelManager so we
953 # use the heartbeat.
953 # use the heartbeat.
954 return self._hb_channel.is_beating()
954 return self._hb_channel.is_beating()
955 else:
955 else:
956 # no heartbeat and not local, we can't tell if it's running,
956 # no heartbeat and not local, we can't tell if it's running,
957 # so naively return True
957 # so naively return True
958 return True
958 return True
959
959
960 #--------------------------------------------------------------------------
960 #--------------------------------------------------------------------------
961 # Channels used for communication with the kernel:
961 # Channels used for communication with the kernel:
962 #--------------------------------------------------------------------------
962 #--------------------------------------------------------------------------
963
963
964 @property
964 @property
965 def shell_channel(self):
965 def shell_channel(self):
966 """Get the REQ socket channel object to make requests of the kernel."""
966 """Get the REQ socket channel object to make requests of the kernel."""
967 if self._shell_channel is None:
967 if self._shell_channel is None:
968 self._shell_channel = self.shell_channel_class(self.context,
968 self._shell_channel = self.shell_channel_class(self.context,
969 self.session,
969 self.session,
970 (self.ip, self.shell_port))
970 (self.ip, self.shell_port))
971 return self._shell_channel
971 return self._shell_channel
972
972
973 @property
973 @property
974 def sub_channel(self):
974 def sub_channel(self):
975 """Get the SUB socket channel object."""
975 """Get the SUB socket channel object."""
976 if self._sub_channel is None:
976 if self._sub_channel is None:
977 self._sub_channel = self.sub_channel_class(self.context,
977 self._sub_channel = self.sub_channel_class(self.context,
978 self.session,
978 self.session,
979 (self.ip, self.iopub_port))
979 (self.ip, self.iopub_port))
980 return self._sub_channel
980 return self._sub_channel
981
981
982 @property
982 @property
983 def stdin_channel(self):
983 def stdin_channel(self):
984 """Get the REP socket channel object to handle stdin (raw_input)."""
984 """Get the REP socket channel object to handle stdin (raw_input)."""
985 if self._stdin_channel is None:
985 if self._stdin_channel is None:
986 self._stdin_channel = self.stdin_channel_class(self.context,
986 self._stdin_channel = self.stdin_channel_class(self.context,
987 self.session,
987 self.session,
988 (self.ip, self.stdin_port))
988 (self.ip, self.stdin_port))
989 return self._stdin_channel
989 return self._stdin_channel
990
990
991 @property
991 @property
992 def hb_channel(self):
992 def hb_channel(self):
993 """Get the heartbeat socket channel object to check that the
993 """Get the heartbeat socket channel object to check that the
994 kernel is alive."""
994 kernel is alive."""
995 if self._hb_channel is None:
995 if self._hb_channel is None:
996 self._hb_channel = self.hb_channel_class(self.context,
996 self._hb_channel = self.hb_channel_class(self.context,
997 self.session,
997 self.session,
998 (self.ip, self.hb_port))
998 (self.ip, self.hb_port))
999 return self._hb_channel
999 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now