##// END OF EJS Templates
More kernelmanager docstring cleanup.
Brian E. Granger -
Show More
@@ -1,1055 +1,1083 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45 from IPython.zmq.kernelmanagerabc import (
45 from IPython.zmq.kernelmanagerabc import (
46 ShellChannelABC, IOPubChannelABC,
46 ShellChannelABC, IOPubChannelABC,
47 HBChannelABC, StdInChannelABC,
47 HBChannelABC, StdInChannelABC,
48 KernelManagerABC
48 KernelManagerABC
49 )
49 )
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Constants and exceptions
53 # Constants and exceptions
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class InvalidPortNumber(Exception):
56 class InvalidPortNumber(Exception):
57 pass
57 pass
58
58
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60 # Utility functions
60 # Utility functions
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62
62
63 # some utilities to validate message structure, these might get moved elsewhere
63 # some utilities to validate message structure, these might get moved elsewhere
64 # if they prove to have more generic utility
64 # if they prove to have more generic utility
65
65
66 def validate_string_list(lst):
66 def validate_string_list(lst):
67 """Validate that the input is a list of strings.
67 """Validate that the input is a list of strings.
68
68
69 Raises ValueError if not."""
69 Raises ValueError if not."""
70 if not isinstance(lst, list):
70 if not isinstance(lst, list):
71 raise ValueError('input %r must be a list' % lst)
71 raise ValueError('input %r must be a list' % lst)
72 for x in lst:
72 for x in lst:
73 if not isinstance(x, basestring):
73 if not isinstance(x, basestring):
74 raise ValueError('element %r in list must be a string' % x)
74 raise ValueError('element %r in list must be a string' % x)
75
75
76
76
77 def validate_string_dict(dct):
77 def validate_string_dict(dct):
78 """Validate that the input is a dict with string keys and values.
78 """Validate that the input is a dict with string keys and values.
79
79
80 Raises ValueError if not."""
80 Raises ValueError if not."""
81 for k,v in dct.iteritems():
81 for k,v in dct.iteritems():
82 if not isinstance(k, basestring):
82 if not isinstance(k, basestring):
83 raise ValueError('key %r in dict must be a string' % k)
83 raise ValueError('key %r in dict must be a string' % k)
84 if not isinstance(v, basestring):
84 if not isinstance(v, basestring):
85 raise ValueError('value %r in dict must be a string' % v)
85 raise ValueError('value %r in dict must be a string' % v)
86
86
87
87
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89 # ZMQ Socket Channel classes
89 # ZMQ Socket Channel classes
90 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
91
91
92 class ZMQSocketChannel(Thread):
92 class ZMQSocketChannel(Thread):
93 """The base class for the channels that use ZMQ sockets.
93 """The base class for the channels that use ZMQ sockets."""
94 """
95 context = None
94 context = None
96 session = None
95 session = None
97 socket = None
96 socket = None
98 ioloop = None
97 ioloop = None
99 stream = None
98 stream = None
100 _address = None
99 _address = None
101 _exiting = False
100 _exiting = False
102
101
103 def __init__(self, context, session, address):
102 def __init__(self, context, session, address):
104 """Create a channel
103 """Create a channel.
105
104
106 Parameters
105 Parameters
107 ----------
106 ----------
108 context : :class:`zmq.Context`
107 context : :class:`zmq.Context`
109 The ZMQ context to use.
108 The ZMQ context to use.
110 session : :class:`session.Session`
109 session : :class:`session.Session`
111 The session to use.
110 The session to use.
112 address : zmq url
111 address : zmq url
113 Standard (ip, port) tuple that the kernel is listening on.
112 Standard (ip, port) tuple that the kernel is listening on.
114 """
113 """
115 super(ZMQSocketChannel, self).__init__()
114 super(ZMQSocketChannel, self).__init__()
116 self.daemon = True
115 self.daemon = True
117
116
118 self.context = context
117 self.context = context
119 self.session = session
118 self.session = session
120 if isinstance(address, tuple):
119 if isinstance(address, tuple):
121 if address[1] == 0:
120 if address[1] == 0:
122 message = 'The port number for a channel cannot be 0.'
121 message = 'The port number for a channel cannot be 0.'
123 raise InvalidPortNumber(message)
122 raise InvalidPortNumber(message)
124 address = "tcp://%s:%i" % address
123 address = "tcp://%s:%i" % address
125 self._address = address
124 self._address = address
126 atexit.register(self._notice_exit)
125 atexit.register(self._notice_exit)
127
126
128 def _notice_exit(self):
127 def _notice_exit(self):
129 self._exiting = True
128 self._exiting = True
130
129
131 def _run_loop(self):
130 def _run_loop(self):
132 """Run my loop, ignoring EINTR events in the poller"""
131 """Run my loop, ignoring EINTR events in the poller"""
133 while True:
132 while True:
134 try:
133 try:
135 self.ioloop.start()
134 self.ioloop.start()
136 except ZMQError as e:
135 except ZMQError as e:
137 if e.errno == errno.EINTR:
136 if e.errno == errno.EINTR:
138 continue
137 continue
139 else:
138 else:
140 raise
139 raise
141 except Exception:
140 except Exception:
142 if self._exiting:
141 if self._exiting:
143 break
142 break
144 else:
143 else:
145 raise
144 raise
146 else:
145 else:
147 break
146 break
148
147
149 def stop(self):
148 def stop(self):
150 """Stop the channel's activity.
149 """Stop the channel's event loop and join its thread.
151
150
152 This calls :method:`Thread.join` and returns when the thread
151 This calls :method:`Thread.join` and returns when the thread
153 terminates. :class:`RuntimeError` will be raised if
152 terminates. :class:`RuntimeError` will be raised if
154 :method:`self.start` is called again.
153 :method:`self.start` is called again.
155 """
154 """
156 self.join()
155 self.join()
157
156
158 @property
157 @property
159 def address(self):
158 def address(self):
160 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
159 """Get the channel's address as a zmq url string.
160
161 These URLS have the form: 'tcp://127.0.0.1:5555'.
161 """
162 """
162 return self._address
163 return self._address
163
164
164 def _queue_send(self, msg):
165 def _queue_send(self, msg):
165 """Queue a message to be sent from the IOLoop's thread.
166 """Queue a message to be sent from the IOLoop's thread.
166
167
167 Parameters
168 Parameters
168 ----------
169 ----------
169 msg : message to send
170 msg : message to send
170
171
171 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
172 thread control of the action.
173 thread control of the action.
173 """
174 """
174 def thread_send():
175 def thread_send():
175 self.session.send(self.stream, msg)
176 self.session.send(self.stream, msg)
176 self.ioloop.add_callback(thread_send)
177 self.ioloop.add_callback(thread_send)
177
178
178 def _handle_recv(self, msg):
179 def _handle_recv(self, msg):
179 """callback for stream.on_recv
180 """Callback for stream.on_recv.
180
181
181 unpacks message, and calls handlers with it.
182 Unpacks message, and calls handlers with it.
182 """
183 """
183 ident,smsg = self.session.feed_identities(msg)
184 ident,smsg = self.session.feed_identities(msg)
184 self.call_handlers(self.session.unserialize(smsg))
185 self.call_handlers(self.session.unserialize(smsg))
185
186
186
187
187
188
188 class ShellChannel(ZMQSocketChannel):
189 class ShellChannel(ZMQSocketChannel):
189 """The DEALER channel for issues request/replies to the kernel.
190 """The shell channel for issuing request/replies to the kernel."""
190 """
191
191
192 command_queue = None
192 command_queue = None
193 # flag for whether execute requests should be allowed to call raw_input:
193 # flag for whether execute requests should be allowed to call raw_input:
194 allow_stdin = True
194 allow_stdin = True
195
195
196 def __init__(self, context, session, address):
196 def __init__(self, context, session, address):
197 super(ShellChannel, self).__init__(context, session, address)
197 super(ShellChannel, self).__init__(context, session, address)
198 self.ioloop = ioloop.IOLoop()
198 self.ioloop = ioloop.IOLoop()
199
199
200 def run(self):
200 def run(self):
201 """The thread's main activity. Call start() instead."""
201 """The thread's main activity. Call start() instead."""
202 self.socket = self.context.socket(zmq.DEALER)
202 self.socket = self.context.socket(zmq.DEALER)
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.linger = 0
204 self.socket.linger = 0
205 self.socket.connect(self.address)
205 self.socket.connect(self.address)
206 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
207 self.stream.on_recv(self._handle_recv)
207 self.stream.on_recv(self._handle_recv)
208 self._run_loop()
208 self._run_loop()
209 try:
209 try:
210 self.socket.close()
210 self.socket.close()
211 except:
211 except:
212 pass
212 pass
213
213
214 def stop(self):
214 def stop(self):
215 """Stop the channel's event loop and join its thread."""
215 self.ioloop.stop()
216 self.ioloop.stop()
216 super(ShellChannel, self).stop()
217 super(ShellChannel, self).stop()
217
218
218 def call_handlers(self, msg):
219 def call_handlers(self, msg):
219 """This method is called in the ioloop thread when a message arrives.
220 """This method is called in the ioloop thread when a message arrives.
220
221
221 Subclasses should override this method to handle incoming messages.
222 Subclasses should override this method to handle incoming messages.
222 It is important to remember that this method is called in the thread
223 It is important to remember that this method is called in the thread
223 so that some logic must be done to ensure that the application leve
224 so that some logic must be done to ensure that the application leve
224 handlers are called in the application thread.
225 handlers are called in the application thread.
225 """
226 """
226 raise NotImplementedError('call_handlers must be defined in a subclass.')
227 raise NotImplementedError('call_handlers must be defined in a subclass.')
227
228
228 def execute(self, code, silent=False, store_history=True,
229 def execute(self, code, silent=False, store_history=True,
229 user_variables=None, user_expressions=None, allow_stdin=None):
230 user_variables=None, user_expressions=None, allow_stdin=None):
230 """Execute code in the kernel.
231 """Execute code in the kernel.
231
232
232 Parameters
233 Parameters
233 ----------
234 ----------
234 code : str
235 code : str
235 A string of Python code.
236 A string of Python code.
236
237
237 silent : bool, optional (default False)
238 silent : bool, optional (default False)
238 If set, the kernel will execute the code as quietly possible, and
239 If set, the kernel will execute the code as quietly possible, and
239 will force store_history to be False.
240 will force store_history to be False.
240
241
241 store_history : bool, optional (default True)
242 store_history : bool, optional (default True)
242 If set, the kernel will store command history. This is forced
243 If set, the kernel will store command history. This is forced
243 to be False if silent is True.
244 to be False if silent is True.
244
245
245 user_variables : list, optional
246 user_variables : list, optional
246 A list of variable names to pull from the user's namespace. They
247 A list of variable names to pull from the user's namespace. They
247 will come back as a dict with these names as keys and their
248 will come back as a dict with these names as keys and their
248 :func:`repr` as values.
249 :func:`repr` as values.
249
250
250 user_expressions : dict, optional
251 user_expressions : dict, optional
251 A dict mapping names to expressions to be evaluated in the user's
252 A dict mapping names to expressions to be evaluated in the user's
252 dict. The expression values are returned as strings formatted using
253 dict. The expression values are returned as strings formatted using
253 :func:`repr`.
254 :func:`repr`.
254
255
255 allow_stdin : bool, optional (default self.allow_stdin)
256 allow_stdin : bool, optional (default self.allow_stdin)
256 Flag for whether the kernel can send stdin requests to frontends.
257 Flag for whether the kernel can send stdin requests to frontends.
257
258
258 Some frontends (e.g. the Notebook) do not support stdin requests.
259 Some frontends (e.g. the Notebook) do not support stdin requests.
259 If raw_input is called from code executed from such a frontend, a
260 If raw_input is called from code executed from such a frontend, a
260 StdinNotImplementedError will be raised.
261 StdinNotImplementedError will be raised.
261
262
262 Returns
263 Returns
263 -------
264 -------
264 The msg_id of the message sent.
265 The msg_id of the message sent.
265 """
266 """
266 if user_variables is None:
267 if user_variables is None:
267 user_variables = []
268 user_variables = []
268 if user_expressions is None:
269 if user_expressions is None:
269 user_expressions = {}
270 user_expressions = {}
270 if allow_stdin is None:
271 if allow_stdin is None:
271 allow_stdin = self.allow_stdin
272 allow_stdin = self.allow_stdin
272
273
273
274
274 # Don't waste network traffic if inputs are invalid
275 # Don't waste network traffic if inputs are invalid
275 if not isinstance(code, basestring):
276 if not isinstance(code, basestring):
276 raise ValueError('code %r must be a string' % code)
277 raise ValueError('code %r must be a string' % code)
277 validate_string_list(user_variables)
278 validate_string_list(user_variables)
278 validate_string_dict(user_expressions)
279 validate_string_dict(user_expressions)
279
280
280 # Create class for content/msg creation. Related to, but possibly
281 # Create class for content/msg creation. Related to, but possibly
281 # not in Session.
282 # not in Session.
282 content = dict(code=code, silent=silent, store_history=store_history,
283 content = dict(code=code, silent=silent, store_history=store_history,
283 user_variables=user_variables,
284 user_variables=user_variables,
284 user_expressions=user_expressions,
285 user_expressions=user_expressions,
285 allow_stdin=allow_stdin,
286 allow_stdin=allow_stdin,
286 )
287 )
287 msg = self.session.msg('execute_request', content)
288 msg = self.session.msg('execute_request', content)
288 self._queue_send(msg)
289 self._queue_send(msg)
289 return msg['header']['msg_id']
290 return msg['header']['msg_id']
290
291
291 def complete(self, text, line, cursor_pos, block=None):
292 def complete(self, text, line, cursor_pos, block=None):
292 """Tab complete text in the kernel's namespace.
293 """Tab complete text in the kernel's namespace.
293
294
294 Parameters
295 Parameters
295 ----------
296 ----------
296 text : str
297 text : str
297 The text to complete.
298 The text to complete.
298 line : str
299 line : str
299 The full line of text that is the surrounding context for the
300 The full line of text that is the surrounding context for the
300 text to complete.
301 text to complete.
301 cursor_pos : int
302 cursor_pos : int
302 The position of the cursor in the line where the completion was
303 The position of the cursor in the line where the completion was
303 requested.
304 requested.
304 block : str, optional
305 block : str, optional
305 The full block of code in which the completion is being requested.
306 The full block of code in which the completion is being requested.
306
307
307 Returns
308 Returns
308 -------
309 -------
309 The msg_id of the message sent.
310 The msg_id of the message sent.
310 """
311 """
311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 msg = self.session.msg('complete_request', content)
313 msg = self.session.msg('complete_request', content)
313 self._queue_send(msg)
314 self._queue_send(msg)
314 return msg['header']['msg_id']
315 return msg['header']['msg_id']
315
316
316 def object_info(self, oname, detail_level=0):
317 def object_info(self, oname, detail_level=0):
317 """Get metadata information about an object.
318 """Get metadata information about an object in the kernel's namespace.
318
319
319 Parameters
320 Parameters
320 ----------
321 ----------
321 oname : str
322 oname : str
322 A string specifying the object name.
323 A string specifying the object name.
323 detail_level : int, optional
324 detail_level : int, optional
324 The level of detail for the introspection (0-2)
325 The level of detail for the introspection (0-2)
325
326
326 Returns
327 Returns
327 -------
328 -------
328 The msg_id of the message sent.
329 The msg_id of the message sent.
329 """
330 """
330 content = dict(oname=oname, detail_level=detail_level)
331 content = dict(oname=oname, detail_level=detail_level)
331 msg = self.session.msg('object_info_request', content)
332 msg = self.session.msg('object_info_request', content)
332 self._queue_send(msg)
333 self._queue_send(msg)
333 return msg['header']['msg_id']
334 return msg['header']['msg_id']
334
335
335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 """Get entries from the history list.
337 """Get entries from the kernel's history list.
337
338
338 Parameters
339 Parameters
339 ----------
340 ----------
340 raw : bool
341 raw : bool
341 If True, return the raw input.
342 If True, return the raw input.
342 output : bool
343 output : bool
343 If True, then return the output as well.
344 If True, then return the output as well.
344 hist_access_type : str
345 hist_access_type : str
345 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 or 'search' (fill in pattern param).
347 or 'search' (fill in pattern param).
347
348
348 session : int
349 session : int
349 For a range request, the session from which to get lines. Session
350 For a range request, the session from which to get lines. Session
350 numbers are positive integers; negative ones count back from the
351 numbers are positive integers; negative ones count back from the
351 current session.
352 current session.
352 start : int
353 start : int
353 The first line number of a history range.
354 The first line number of a history range.
354 stop : int
355 stop : int
355 The final (excluded) line number of a history range.
356 The final (excluded) line number of a history range.
356
357
357 n : int
358 n : int
358 The number of lines of history to get for a tail request.
359 The number of lines of history to get for a tail request.
359
360
360 pattern : str
361 pattern : str
361 The glob-syntax pattern for a search request.
362 The glob-syntax pattern for a search request.
362
363
363 Returns
364 Returns
364 -------
365 -------
365 The msg_id of the message sent.
366 The msg_id of the message sent.
366 """
367 """
367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 **kwargs)
369 **kwargs)
369 msg = self.session.msg('history_request', content)
370 msg = self.session.msg('history_request', content)
370 self._queue_send(msg)
371 self._queue_send(msg)
371 return msg['header']['msg_id']
372 return msg['header']['msg_id']
372
373
373 def kernel_info(self):
374 def kernel_info(self):
374 """Request kernel info."""
375 """Request kernel info."""
375 msg = self.session.msg('kernel_info_request')
376 msg = self.session.msg('kernel_info_request')
376 self._queue_send(msg)
377 self._queue_send(msg)
377 return msg['header']['msg_id']
378 return msg['header']['msg_id']
378
379
379 def shutdown(self, restart=False):
380 def shutdown(self, restart=False):
380 """Request an immediate kernel shutdown.
381 """Request an immediate kernel shutdown.
381
382
382 Upon receipt of the (empty) reply, client code can safely assume that
383 Upon receipt of the (empty) reply, client code can safely assume that
383 the kernel has shut down and it's safe to forcefully terminate it if
384 the kernel has shut down and it's safe to forcefully terminate it if
384 it's still alive.
385 it's still alive.
385
386
386 The kernel will send the reply via a function registered with Python's
387 The kernel will send the reply via a function registered with Python's
387 atexit module, ensuring it's truly done as the kernel is done with all
388 atexit module, ensuring it's truly done as the kernel is done with all
388 normal operation.
389 normal operation.
389 """
390 """
390 # Send quit message to kernel. Once we implement kernel-side setattr,
391 # Send quit message to kernel. Once we implement kernel-side setattr,
391 # this should probably be done that way, but for now this will do.
392 # this should probably be done that way, but for now this will do.
392 msg = self.session.msg('shutdown_request', {'restart':restart})
393 msg = self.session.msg('shutdown_request', {'restart':restart})
393 self._queue_send(msg)
394 self._queue_send(msg)
394 return msg['header']['msg_id']
395 return msg['header']['msg_id']
395
396
396
397
397
398
398 class IOPubChannel(ZMQSocketChannel):
399 class IOPubChannel(ZMQSocketChannel):
399 """The SUB channel which listens for messages that the kernel publishes.
400 """The iopub channel which listens for messages that the kernel publishes.
401
402 This channel is where all output is published to frontends.
400 """
403 """
401
404
402 def __init__(self, context, session, address):
405 def __init__(self, context, session, address):
403 super(IOPubChannel, self).__init__(context, session, address)
406 super(IOPubChannel, self).__init__(context, session, address)
404 self.ioloop = ioloop.IOLoop()
407 self.ioloop = ioloop.IOLoop()
405
408
406 def run(self):
409 def run(self):
407 """The thread's main activity. Call start() instead."""
410 """The thread's main activity. Call start() instead."""
408 self.socket = self.context.socket(zmq.SUB)
411 self.socket = self.context.socket(zmq.SUB)
409 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
410 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
411 self.socket.connect(self.address)
414 self.socket.connect(self.address)
412 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
413 self.stream.on_recv(self._handle_recv)
416 self.stream.on_recv(self._handle_recv)
414 self._run_loop()
417 self._run_loop()
415 try:
418 try:
416 self.socket.close()
419 self.socket.close()
417 except:
420 except:
418 pass
421 pass
419
422
420 def stop(self):
423 def stop(self):
424 """Stop the channel's event loop and join its thread."""
421 self.ioloop.stop()
425 self.ioloop.stop()
422 super(IOPubChannel, self).stop()
426 super(IOPubChannel, self).stop()
423
427
424 def call_handlers(self, msg):
428 def call_handlers(self, msg):
425 """This method is called in the ioloop thread when a message arrives.
429 """This method is called in the ioloop thread when a message arrives.
426
430
427 Subclasses should override this method to handle incoming messages.
431 Subclasses should override this method to handle incoming messages.
428 It is important to remember that this method is called in the thread
432 It is important to remember that this method is called in the thread
429 so that some logic must be done to ensure that the application leve
433 so that some logic must be done to ensure that the application leve
430 handlers are called in the application thread.
434 handlers are called in the application thread.
431 """
435 """
432 raise NotImplementedError('call_handlers must be defined in a subclass.')
436 raise NotImplementedError('call_handlers must be defined in a subclass.')
433
437
434 def flush(self, timeout=1.0):
438 def flush(self, timeout=1.0):
435 """Immediately processes all pending messages on the SUB channel.
439 """Immediately processes all pending messages on the iopub channel.
436
440
437 Callers should use this method to ensure that :method:`call_handlers`
441 Callers should use this method to ensure that :method:`call_handlers`
438 has been called for all messages that have been received on the
442 has been called for all messages that have been received on the
439 0MQ SUB socket of this channel.
443 0MQ SUB socket of this channel.
440
444
441 This method is thread safe.
445 This method is thread safe.
442
446
443 Parameters
447 Parameters
444 ----------
448 ----------
445 timeout : float, optional
449 timeout : float, optional
446 The maximum amount of time to spend flushing, in seconds. The
450 The maximum amount of time to spend flushing, in seconds. The
447 default is one second.
451 default is one second.
448 """
452 """
449 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # We do the IOLoop callback process twice to ensure that the IOLoop
450 # gets to perform at least one full poll.
454 # gets to perform at least one full poll.
451 stop_time = time.time() + timeout
455 stop_time = time.time() + timeout
452 for i in xrange(2):
456 for i in xrange(2):
453 self._flushed = False
457 self._flushed = False
454 self.ioloop.add_callback(self._flush)
458 self.ioloop.add_callback(self._flush)
455 while not self._flushed and time.time() < stop_time:
459 while not self._flushed and time.time() < stop_time:
456 time.sleep(0.01)
460 time.sleep(0.01)
457
461
458 def _flush(self):
462 def _flush(self):
459 """Callback for :method:`self.flush`."""
463 """Callback for :method:`self.flush`."""
460 self.stream.flush()
464 self.stream.flush()
461 self._flushed = True
465 self._flushed = True
462
466
463
467
464 class StdInChannel(ZMQSocketChannel):
468 class StdInChannel(ZMQSocketChannel):
465 """A reply channel to handle raw_input requests that the kernel makes."""
469 """The stdin channel to handle raw_input requests that the kernel makes."""
466
470
467 msg_queue = None
471 msg_queue = None
468
472
469 def __init__(self, context, session, address):
473 def __init__(self, context, session, address):
470 super(StdInChannel, self).__init__(context, session, address)
474 super(StdInChannel, self).__init__(context, session, address)
471 self.ioloop = ioloop.IOLoop()
475 self.ioloop = ioloop.IOLoop()
472
476
473 def run(self):
477 def run(self):
474 """The thread's main activity. Call start() instead."""
478 """The thread's main activity. Call start() instead."""
475 self.socket = self.context.socket(zmq.DEALER)
479 self.socket = self.context.socket(zmq.DEALER)
476 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
477 self.socket.connect(self.address)
481 self.socket.connect(self.address)
478 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
479 self.stream.on_recv(self._handle_recv)
483 self.stream.on_recv(self._handle_recv)
480 self._run_loop()
484 self._run_loop()
481 try:
485 try:
482 self.socket.close()
486 self.socket.close()
483 except:
487 except:
484 pass
488 pass
485
486
489
487 def stop(self):
490 def stop(self):
491 """Stop the channel's event loop and join its thread."""
488 self.ioloop.stop()
492 self.ioloop.stop()
489 super(StdInChannel, self).stop()
493 super(StdInChannel, self).stop()
490
494
491 def call_handlers(self, msg):
495 def call_handlers(self, msg):
492 """This method is called in the ioloop thread when a message arrives.
496 """This method is called in the ioloop thread when a message arrives.
493
497
494 Subclasses should override this method to handle incoming messages.
498 Subclasses should override this method to handle incoming messages.
495 It is important to remember that this method is called in the thread
499 It is important to remember that this method is called in the thread
496 so that some logic must be done to ensure that the application leve
500 so that some logic must be done to ensure that the application leve
497 handlers are called in the application thread.
501 handlers are called in the application thread.
498 """
502 """
499 raise NotImplementedError('call_handlers must be defined in a subclass.')
503 raise NotImplementedError('call_handlers must be defined in a subclass.')
500
504
501 def input(self, string):
505 def input(self, string):
502 """Send a string of raw input to the kernel."""
506 """Send a string of raw input to the kernel."""
503 content = dict(value=string)
507 content = dict(value=string)
504 msg = self.session.msg('input_reply', content)
508 msg = self.session.msg('input_reply', content)
505 self._queue_send(msg)
509 self._queue_send(msg)
506
510
507
511
508 class HBChannel(ZMQSocketChannel):
512 class HBChannel(ZMQSocketChannel):
509 """The heartbeat channel which monitors the kernel heartbeat.
513 """The heartbeat channel which monitors the kernel heartbeat.
510
514
511 Note that the heartbeat channel is paused by default. As long as you start
515 Note that the heartbeat channel is paused by default. As long as you start
512 this channel, the kernel manager will ensure that it is paused and un-paused
516 this channel, the kernel manager will ensure that it is paused and un-paused
513 as appropriate.
517 as appropriate.
514 """
518 """
515
519
516 time_to_dead = 3.0
520 time_to_dead = 3.0
517 socket = None
521 socket = None
518 poller = None
522 poller = None
519 _running = None
523 _running = None
520 _pause = None
524 _pause = None
521 _beating = None
525 _beating = None
522
526
523 def __init__(self, context, session, address):
527 def __init__(self, context, session, address):
524 super(HBChannel, self).__init__(context, session, address)
528 super(HBChannel, self).__init__(context, session, address)
525 self._running = False
529 self._running = False
526 self._pause =True
530 self._pause =True
527 self.poller = zmq.Poller()
531 self.poller = zmq.Poller()
528
532
529 def _create_socket(self):
533 def _create_socket(self):
530 if self.socket is not None:
534 if self.socket is not None:
531 # close previous socket, before opening a new one
535 # close previous socket, before opening a new one
532 self.poller.unregister(self.socket)
536 self.poller.unregister(self.socket)
533 self.socket.close()
537 self.socket.close()
534 self.socket = self.context.socket(zmq.REQ)
538 self.socket = self.context.socket(zmq.REQ)
535 self.socket.setsockopt(zmq.LINGER, 0)
539 self.socket.setsockopt(zmq.LINGER, 0)
536 self.socket.connect(self.address)
540 self.socket.connect(self.address)
537
541
538 self.poller.register(self.socket, zmq.POLLIN)
542 self.poller.register(self.socket, zmq.POLLIN)
539
543
540 def _poll(self, start_time):
544 def _poll(self, start_time):
541 """poll for heartbeat replies until we reach self.time_to_dead
545 """poll for heartbeat replies until we reach self.time_to_dead.
542
546
543 Ignores interrupts, and returns the result of poll(), which
547 Ignores interrupts, and returns the result of poll(), which
544 will be an empty list if no messages arrived before the timeout,
548 will be an empty list if no messages arrived before the timeout,
545 or the event tuple if there is a message to receive.
549 or the event tuple if there is a message to receive.
546 """
550 """
547
551
548 until_dead = self.time_to_dead - (time.time() - start_time)
552 until_dead = self.time_to_dead - (time.time() - start_time)
549 # ensure poll at least once
553 # ensure poll at least once
550 until_dead = max(until_dead, 1e-3)
554 until_dead = max(until_dead, 1e-3)
551 events = []
555 events = []
552 while True:
556 while True:
553 try:
557 try:
554 events = self.poller.poll(1000 * until_dead)
558 events = self.poller.poll(1000 * until_dead)
555 except ZMQError as e:
559 except ZMQError as e:
556 if e.errno == errno.EINTR:
560 if e.errno == errno.EINTR:
557 # ignore interrupts during heartbeat
561 # ignore interrupts during heartbeat
558 # this may never actually happen
562 # this may never actually happen
559 until_dead = self.time_to_dead - (time.time() - start_time)
563 until_dead = self.time_to_dead - (time.time() - start_time)
560 until_dead = max(until_dead, 1e-3)
564 until_dead = max(until_dead, 1e-3)
561 pass
565 pass
562 else:
566 else:
563 raise
567 raise
564 except Exception:
568 except Exception:
565 if self._exiting:
569 if self._exiting:
566 break
570 break
567 else:
571 else:
568 raise
572 raise
569 else:
573 else:
570 break
574 break
571 return events
575 return events
572
576
573 def run(self):
577 def run(self):
574 """The thread's main activity. Call start() instead."""
578 """The thread's main activity. Call start() instead."""
575 self._create_socket()
579 self._create_socket()
576 self._running = True
580 self._running = True
577 self._beating = True
581 self._beating = True
578
582
579 while self._running:
583 while self._running:
580 if self._pause:
584 if self._pause:
581 # just sleep, and skip the rest of the loop
585 # just sleep, and skip the rest of the loop
582 time.sleep(self.time_to_dead)
586 time.sleep(self.time_to_dead)
583 continue
587 continue
584
588
585 since_last_heartbeat = 0.0
589 since_last_heartbeat = 0.0
586 # io.rprint('Ping from HB channel') # dbg
590 # io.rprint('Ping from HB channel') # dbg
587 # no need to catch EFSM here, because the previous event was
591 # no need to catch EFSM here, because the previous event was
588 # either a recv or connect, which cannot be followed by EFSM
592 # either a recv or connect, which cannot be followed by EFSM
589 self.socket.send(b'ping')
593 self.socket.send(b'ping')
590 request_time = time.time()
594 request_time = time.time()
591 ready = self._poll(request_time)
595 ready = self._poll(request_time)
592 if ready:
596 if ready:
593 self._beating = True
597 self._beating = True
594 # the poll above guarantees we have something to recv
598 # the poll above guarantees we have something to recv
595 self.socket.recv()
599 self.socket.recv()
596 # sleep the remainder of the cycle
600 # sleep the remainder of the cycle
597 remainder = self.time_to_dead - (time.time() - request_time)
601 remainder = self.time_to_dead - (time.time() - request_time)
598 if remainder > 0:
602 if remainder > 0:
599 time.sleep(remainder)
603 time.sleep(remainder)
600 continue
604 continue
601 else:
605 else:
602 # nothing was received within the time limit, signal heart failure
606 # nothing was received within the time limit, signal heart failure
603 self._beating = False
607 self._beating = False
604 since_last_heartbeat = time.time() - request_time
608 since_last_heartbeat = time.time() - request_time
605 self.call_handlers(since_last_heartbeat)
609 self.call_handlers(since_last_heartbeat)
606 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 # and close/reopen the socket, because the REQ/REP cycle has been broken
607 self._create_socket()
611 self._create_socket()
608 continue
612 continue
609 try:
613 try:
610 self.socket.close()
614 self.socket.close()
611 except:
615 except:
612 pass
616 pass
613
617
614 def pause(self):
618 def pause(self):
615 """Pause the heartbeat."""
619 """Pause the heartbeat."""
616 self._pause = True
620 self._pause = True
617
621
618 def unpause(self):
622 def unpause(self):
619 """Unpause the heartbeat."""
623 """Unpause the heartbeat."""
620 self._pause = False
624 self._pause = False
621
625
622 def is_beating(self):
626 def is_beating(self):
623 """Is the heartbeat running and responsive (and not paused)."""
627 """Is the heartbeat running and responsive (and not paused)."""
624 if self.is_alive() and not self._pause and self._beating:
628 if self.is_alive() and not self._pause and self._beating:
625 return True
629 return True
626 else:
630 else:
627 return False
631 return False
628
632
629 def stop(self):
633 def stop(self):
634 """Stop the channel's event loop and join its thread."""
630 self._running = False
635 self._running = False
631 super(HBChannel, self).stop()
636 super(HBChannel, self).stop()
632
637
633 def call_handlers(self, since_last_heartbeat):
638 def call_handlers(self, since_last_heartbeat):
634 """This method is called in the ioloop thread when a message arrives.
639 """This method is called in the ioloop thread when a message arrives.
635
640
636 Subclasses should override this method to handle incoming messages.
641 Subclasses should override this method to handle incoming messages.
637 It is important to remember that this method is called in the thread
642 It is important to remember that this method is called in the thread
638 so that some logic must be done to ensure that the application level
643 so that some logic must be done to ensure that the application level
639 handlers are called in the application thread.
644 handlers are called in the application thread.
640 """
645 """
641 raise NotImplementedError('call_handlers must be defined in a subclass.')
646 raise NotImplementedError('call_handlers must be defined in a subclass.')
642
647
643
648
644 #-----------------------------------------------------------------------------
649 #-----------------------------------------------------------------------------
645 # Main kernel manager class
650 # Main kernel manager class
646 #-----------------------------------------------------------------------------
651 #-----------------------------------------------------------------------------
647
652
648 class KernelManager(Configurable):
653 class KernelManager(Configurable):
649 """ Manages a kernel for a frontend.
654 """Manages a single kernel on this host along with its channels.
650
655
651 The SUB channel is for the frontend to receive messages published by the
656 There are four channels associated with each kernel:
652 kernel.
653
657
654 The REQ channel is for the frontend to make requests of the kernel.
658 * shell: for request/reply calls to the kernel.
659 * iopub: for the kernel to publish results to frontends.
660 * hb: for monitoring the kernel's heartbeat.
661 * stdin: for frontends to reply to raw_input calls in the kernel.
655
662
656 The REP channel is for the kernel to request stdin (raw_input) from the
663 The usage of the channels that this class manages is optional. It is
657 frontend.
664 entirely possible to connect to the kernels directly using ZeroMQ
665 sockets. These channels are useful primarily for talking to a kernel
666 whose :class:`KernelManager` is in the same process.
667
668 This version manages kernels started using Popen.
658 """
669 """
659 # The PyZMQ Context to use for communication with the kernel.
670 # The PyZMQ Context to use for communication with the kernel.
660 context = Instance(zmq.Context)
671 context = Instance(zmq.Context)
661 def _context_default(self):
672 def _context_default(self):
662 return zmq.Context.instance()
673 return zmq.Context.instance()
663
674
664 # The Session to use for communication with the kernel.
675 # The Session to use for communication with the kernel.
665 session = Instance(Session)
676 session = Instance(Session)
666 def _session_default(self):
677 def _session_default(self):
667 return Session(config=self.config)
678 return Session(config=self.config)
668
679
669 # The kernel process with which the KernelManager is communicating.
680 # The kernel process with which the KernelManager is communicating.
670 kernel = Instance(Popen)
681 kernel = Instance(Popen)
671
682
672 # The addresses for the communication channels.
683 # The addresses for the communication channels.
673 connection_file = Unicode('')
684 connection_file = Unicode('')
674
685
675 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
676
687
677 ip = Unicode(LOCALHOST, config=True)
688 ip = Unicode(LOCALHOST, config=True)
678 def _ip_changed(self, name, old, new):
689 def _ip_changed(self, name, old, new):
679 if new == '*':
690 if new == '*':
680 self.ip = '0.0.0.0'
691 self.ip = '0.0.0.0'
681 shell_port = Integer(0)
692 shell_port = Integer(0)
682 iopub_port = Integer(0)
693 iopub_port = Integer(0)
683 stdin_port = Integer(0)
694 stdin_port = Integer(0)
684 hb_port = Integer(0)
695 hb_port = Integer(0)
685
696
686 # The classes to use for the various channels.
697 # The classes to use for the various channels.
687 shell_channel_class = Type(ShellChannel)
698 shell_channel_class = Type(ShellChannel)
688 iopub_channel_class = Type(IOPubChannel)
699 iopub_channel_class = Type(IOPubChannel)
689 stdin_channel_class = Type(StdInChannel)
700 stdin_channel_class = Type(StdInChannel)
690 hb_channel_class = Type(HBChannel)
701 hb_channel_class = Type(HBChannel)
691
702
692 # Protected traits.
703 # Protected traits.
693 _launch_args = Any
704 _launch_args = Any
694 _shell_channel = Any
705 _shell_channel = Any
695 _iopub_channel = Any
706 _iopub_channel = Any
696 _stdin_channel = Any
707 _stdin_channel = Any
697 _hb_channel = Any
708 _hb_channel = Any
698 _connection_file_written=Bool(False)
709 _connection_file_written=Bool(False)
699
710
700 def __del__(self):
711 def __del__(self):
701 self.cleanup_connection_file()
712 self.cleanup_connection_file()
702
713
703 #--------------------------------------------------------------------------
714 #--------------------------------------------------------------------------
704 # Channel management methods:
715 # Channel management methods:
705 #--------------------------------------------------------------------------
716 #--------------------------------------------------------------------------
706
717
707 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
718 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
708 """Starts the channels for this kernel.
719 """Starts the channels for this kernel.
709
720
710 This will create the channels if they do not exist and then start
721 This will create the channels if they do not exist and then start
711 them. If port numbers of 0 are being used (random ports) then you
722 them (their activity runs in a thread). If port numbers of 0 are
712 must first call :method:`start_kernel`. If the channels have been
723 being used (random ports) then you must first call
713 stopped and you call this, :class:`RuntimeError` will be raised.
724 :method:`start_kernel`. If the channels have been stopped and you
725 call this, :class:`RuntimeError` will be raised.
714 """
726 """
715 if shell:
727 if shell:
716 self.shell_channel.start()
728 self.shell_channel.start()
717 if iopub:
729 if iopub:
718 self.iopub_channel.start()
730 self.iopub_channel.start()
719 if stdin:
731 if stdin:
720 self.stdin_channel.start()
732 self.stdin_channel.start()
721 self.shell_channel.allow_stdin = True
733 self.shell_channel.allow_stdin = True
722 else:
734 else:
723 self.shell_channel.allow_stdin = False
735 self.shell_channel.allow_stdin = False
724 if hb:
736 if hb:
725 self.hb_channel.start()
737 self.hb_channel.start()
726
738
727 def stop_channels(self):
739 def stop_channels(self):
728 """Stops all the running channels for this kernel.
740 """Stops all the running channels for this kernel.
741
742 This stops their event loops and joins their threads.
729 """
743 """
730 if self.shell_channel.is_alive():
744 if self.shell_channel.is_alive():
731 self.shell_channel.stop()
745 self.shell_channel.stop()
732 if self.iopub_channel.is_alive():
746 if self.iopub_channel.is_alive():
733 self.iopub_channel.stop()
747 self.iopub_channel.stop()
734 if self.stdin_channel.is_alive():
748 if self.stdin_channel.is_alive():
735 self.stdin_channel.stop()
749 self.stdin_channel.stop()
736 if self.hb_channel.is_alive():
750 if self.hb_channel.is_alive():
737 self.hb_channel.stop()
751 self.hb_channel.stop()
738
752
739 @property
753 @property
740 def channels_running(self):
754 def channels_running(self):
741 """Are any of the channels created and running?"""
755 """Are any of the channels created and running?"""
742 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
756 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
743 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
757 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
744
758
745 def _make_url(self, port):
759 def _make_url(self, port):
746 """make a zmq url with a port"""
760 """Make a zmq url with a port.
761
762 There are two cases that this handles:
763
764 * tcp: tcp://ip:port
765 * ipc: ipc://ip-port
766 """
747 if self.transport == 'tcp':
767 if self.transport == 'tcp':
748 return "tcp://%s:%i" % (self.ip, port)
768 return "tcp://%s:%i" % (self.ip, port)
749 else:
769 else:
750 return "%s://%s-%s" % (self.transport, self.ip, port)
770 return "%s://%s-%s" % (self.transport, self.ip, port)
751
771
752 @property
772 @property
753 def shell_channel(self):
773 def shell_channel(self):
754 """Get the REQ socket channel object to make requests of the kernel."""
774 """Get the shell channel object for this kernel."""
755 if self._shell_channel is None:
775 if self._shell_channel is None:
756 self._shell_channel = self.shell_channel_class(self.context,
776 self._shell_channel = self.shell_channel_class(
757 self.session,
777 self.context, self.session, self._make_url(self.shell_port)
758 self._make_url(self.shell_port),
759 )
778 )
760 return self._shell_channel
779 return self._shell_channel
761
780
762 @property
781 @property
763 def iopub_channel(self):
782 def iopub_channel(self):
764 """Get the SUB socket channel object."""
783 """Get the iopub channel object for this kernel."""
765 if self._iopub_channel is None:
784 if self._iopub_channel is None:
766 self._iopub_channel = self.iopub_channel_class(self.context,
785 self._iopub_channel = self.iopub_channel_class(
767 self.session,
786 self.context, self.session, self._make_url(self.iopub_port)
768 self._make_url(self.iopub_port),
769 )
787 )
770 return self._iopub_channel
788 return self._iopub_channel
771
789
772 @property
790 @property
773 def stdin_channel(self):
791 def stdin_channel(self):
774 """Get the REP socket channel object to handle stdin (raw_input)."""
792 """Get the stdin channel object for this kernel."""
775 if self._stdin_channel is None:
793 if self._stdin_channel is None:
776 self._stdin_channel = self.stdin_channel_class(self.context,
794 self._stdin_channel = self.stdin_channel_class(
777 self.session,
795 self.context, self.session, self._make_url(self.stdin_port)
778 self._make_url(self.stdin_port),
779 )
796 )
780 return self._stdin_channel
797 return self._stdin_channel
781
798
782 @property
799 @property
783 def hb_channel(self):
800 def hb_channel(self):
784 """Get the heartbeat socket channel object to check that the
801 """Get the hb channel object for this kernel."""
785 kernel is alive."""
786 if self._hb_channel is None:
802 if self._hb_channel is None:
787 self._hb_channel = self.hb_channel_class(self.context,
803 self._hb_channel = self.hb_channel_class(
788 self.session,
804 self.context, self.session, self._make_url(self.hb_port)
789 self._make_url(self.hb_port),
790 )
805 )
791 return self._hb_channel
806 return self._hb_channel
792
807
793 #--------------------------------------------------------------------------
808 #--------------------------------------------------------------------------
794 # Connection and ipc file management.
809 # Connection and ipc file management.
795 #--------------------------------------------------------------------------
810 #--------------------------------------------------------------------------
796
811
797 def cleanup_connection_file(self):
812 def cleanup_connection_file(self):
798 """cleanup connection file *if we wrote it*
813 """Cleanup connection file *if we wrote it*
799
814
800 Will not raise if the connection file was already removed somehow.
815 Will not raise if the connection file was already removed somehow.
801 """
816 """
802 if self._connection_file_written:
817 if self._connection_file_written:
803 # cleanup connection files on full shutdown of kernel we started
818 # cleanup connection files on full shutdown of kernel we started
804 self._connection_file_written = False
819 self._connection_file_written = False
805 try:
820 try:
806 os.remove(self.connection_file)
821 os.remove(self.connection_file)
807 except (IOError, OSError):
822 except (IOError, OSError):
808 pass
823 pass
809
824
810 self.cleanup_ipc_files()
825 self.cleanup_ipc_files()
811
826
812 def cleanup_ipc_files(self):
827 def cleanup_ipc_files(self):
813 """cleanup ipc files if we wrote them"""
828 """Cleanup ipc files if we wrote them."""
814 if self.transport != 'ipc':
829 if self.transport != 'ipc':
815 return
830 return
816 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
831 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
817 ipcfile = "%s-%i" % (self.ip, port)
832 ipcfile = "%s-%i" % (self.ip, port)
818 try:
833 try:
819 os.remove(ipcfile)
834 os.remove(ipcfile)
820 except (IOError, OSError):
835 except (IOError, OSError):
821 pass
836 pass
822
837
823 def load_connection_file(self):
838 def load_connection_file(self):
824 """load connection info from JSON dict in self.connection_file"""
839 """Load connection info from JSON dict in self.connection_file."""
825 with open(self.connection_file) as f:
840 with open(self.connection_file) as f:
826 cfg = json.loads(f.read())
841 cfg = json.loads(f.read())
827
842
828 from pprint import pprint
843 from pprint import pprint
829 pprint(cfg)
844 pprint(cfg)
830 self.transport = cfg.get('transport', 'tcp')
845 self.transport = cfg.get('transport', 'tcp')
831 self.ip = cfg['ip']
846 self.ip = cfg['ip']
832 self.shell_port = cfg['shell_port']
847 self.shell_port = cfg['shell_port']
833 self.stdin_port = cfg['stdin_port']
848 self.stdin_port = cfg['stdin_port']
834 self.iopub_port = cfg['iopub_port']
849 self.iopub_port = cfg['iopub_port']
835 self.hb_port = cfg['hb_port']
850 self.hb_port = cfg['hb_port']
836 self.session.key = str_to_bytes(cfg['key'])
851 self.session.key = str_to_bytes(cfg['key'])
837
852
838 def write_connection_file(self):
853 def write_connection_file(self):
839 """write connection info to JSON dict in self.connection_file"""
854 """Write connection info to JSON dict in self.connection_file."""
840 if self._connection_file_written:
855 if self._connection_file_written:
841 return
856 return
842 self.connection_file,cfg = write_connection_file(self.connection_file,
857 self.connection_file,cfg = write_connection_file(self.connection_file,
843 transport=self.transport, ip=self.ip, key=self.session.key,
858 transport=self.transport, ip=self.ip, key=self.session.key,
844 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
859 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
845 shell_port=self.shell_port, hb_port=self.hb_port)
860 shell_port=self.shell_port, hb_port=self.hb_port)
846 # write_connection_file also sets default ports:
861 # write_connection_file also sets default ports:
847 self.shell_port = cfg['shell_port']
862 self.shell_port = cfg['shell_port']
848 self.stdin_port = cfg['stdin_port']
863 self.stdin_port = cfg['stdin_port']
849 self.iopub_port = cfg['iopub_port']
864 self.iopub_port = cfg['iopub_port']
850 self.hb_port = cfg['hb_port']
865 self.hb_port = cfg['hb_port']
851
866
852 self._connection_file_written = True
867 self._connection_file_written = True
853
868
854 #--------------------------------------------------------------------------
869 #--------------------------------------------------------------------------
855 # Kernel management.
870 # Kernel management.
856 #--------------------------------------------------------------------------
871 #--------------------------------------------------------------------------
857
872
858 def start_kernel(self, **kw):
873 def start_kernel(self, **kw):
859 """Starts a kernel process and configures the manager to use it.
874 """Starts a kernel on this host in a separate process.
860
875
861 If random ports (port=0) are being used, this method must be called
876 If random ports (port=0) are being used, this method must be called
862 before the channels are created.
877 before the channels are created.
863
878
864 Parameters:
879 Parameters:
865 -----------
880 -----------
866 launcher : callable, optional (default None)
881 launcher : callable, optional (default None)
867 A custom function for launching the kernel process (generally a
882 A custom function for launching the kernel process (generally a
868 wrapper around ``entry_point.base_launch_kernel``). In most cases,
883 wrapper around ``entry_point.base_launch_kernel``). In most cases,
869 it should not be necessary to use this parameter.
884 it should not be necessary to use this parameter.
870
885
871 **kw : optional
886 **kw : optional
872 See respective options for IPython and Python kernels.
887 keyword arguments that are passed down into the launcher
888 callable.
873 """
889 """
874 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
890 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
875 raise RuntimeError("Can only launch a kernel on a local interface. "
891 raise RuntimeError("Can only launch a kernel on a local interface. "
876 "Make sure that the '*_address' attributes are "
892 "Make sure that the '*_address' attributes are "
877 "configured properly. "
893 "configured properly. "
878 "Currently valid addresses are: %s"%LOCAL_IPS
894 "Currently valid addresses are: %s"%LOCAL_IPS
879 )
895 )
880
896
881 # write connection file / get default ports
897 # write connection file / get default ports
882 self.write_connection_file()
898 self.write_connection_file()
883
899
884 self._launch_args = kw.copy()
900 self._launch_args = kw.copy()
885 launch_kernel = kw.pop('launcher', None)
901 launch_kernel = kw.pop('launcher', None)
886 if launch_kernel is None:
902 if launch_kernel is None:
887 from ipkernel import launch_kernel
903 from ipkernel import launch_kernel
888 self.kernel = launch_kernel(fname=self.connection_file, **kw)
904 self.kernel = launch_kernel(fname=self.connection_file, **kw)
889
905
890 def shutdown_kernel(self, now=False, restart=False):
906 def shutdown_kernel(self, now=False, restart=False):
891 """ Attempts to the stop the kernel process cleanly.
907 """Attempts to the stop the kernel process cleanly.
908
909 This attempts to shutdown the kernels cleanly by:
910
911 1. Sending it a shutdown message over the shell channel.
912 2. If that fails, the kernel is shutdown forcibly by sending it
913 a signal.
892
914
893 If the kernel cannot be stopped and the kernel is local, it is killed.
915 Parameters:
916 -----------
917 now : bool
918 Should the kernel be forcible killed *now*. This skips the
919 first, nice shutdown attempt.
920 restart: bool
921 Will this kernel be restarted after it is shutdown. When this
922 is True, connection files will not be cleaned up.
894 """
923 """
895 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
924 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
896 if sys.platform == 'win32':
925 if sys.platform == 'win32':
897 self.kill_kernel()
926 self.kill_kernel()
898 return
927 return
899
928
900 # Pause the heart beat channel if it exists.
929 # Pause the heart beat channel if it exists.
901 if self._hb_channel is not None:
930 if self._hb_channel is not None:
902 self._hb_channel.pause()
931 self._hb_channel.pause()
903
932
904 if now:
933 if now:
905 if self.has_kernel:
934 if self.has_kernel:
906 self.kill_kernel()
935 self.kill_kernel()
907 else:
936 else:
908 # Don't send any additional kernel kill messages immediately, to give
937 # Don't send any additional kernel kill messages immediately, to give
909 # the kernel a chance to properly execute shutdown actions. Wait for at
938 # the kernel a chance to properly execute shutdown actions. Wait for at
910 # most 1s, checking every 0.1s.
939 # most 1s, checking every 0.1s.
911 self.shell_channel.shutdown(restart=restart)
940 self.shell_channel.shutdown(restart=restart)
912 for i in range(10):
941 for i in range(10):
913 if self.is_alive:
942 if self.is_alive:
914 time.sleep(0.1)
943 time.sleep(0.1)
915 else:
944 else:
916 break
945 break
917 else:
946 else:
918 # OK, we've waited long enough.
947 # OK, we've waited long enough.
919 if self.has_kernel:
948 if self.has_kernel:
920 self.kill_kernel()
949 self.kill_kernel()
921
950
922 if not restart:
951 if not restart:
923 self.cleanup_connection_file()
952 self.cleanup_connection_file()
924 else:
953 else:
925 self.cleanup_ipc_files()
954 self.cleanup_ipc_files()
926
955
927 def restart_kernel(self, now=False, **kw):
956 def restart_kernel(self, now=False, **kw):
928 """Restarts a kernel with the arguments that were used to launch it.
957 """Restarts a kernel with the arguments that were used to launch it.
929
958
930 If the old kernel was launched with random ports, the same ports will be
959 If the old kernel was launched with random ports, the same ports will be
931 used for the new kernel.
960 used for the new kernel. The same connection file is used again.
932
961
933 Parameters
962 Parameters
934 ----------
963 ----------
935 now : bool, optional
964 now : bool, optional
936 If True, the kernel is forcefully restarted *immediately*, without
965 If True, the kernel is forcefully restarted *immediately*, without
937 having a chance to do any cleanup action. Otherwise the kernel is
966 having a chance to do any cleanup action. Otherwise the kernel is
938 given 1s to clean up before a forceful restart is issued.
967 given 1s to clean up before a forceful restart is issued.
939
968
940 In all cases the kernel is restarted, the only difference is whether
969 In all cases the kernel is restarted, the only difference is whether
941 it is given a chance to perform a clean shutdown or not.
970 it is given a chance to perform a clean shutdown or not.
942
971
943 **kw : optional
972 **kw : optional
944 Any options specified here will replace those used to launch the
973 Any options specified here will overwrite those used to launch the
945 kernel.
974 kernel.
946 """
975 """
947 if self._launch_args is None:
976 if self._launch_args is None:
948 raise RuntimeError("Cannot restart the kernel. "
977 raise RuntimeError("Cannot restart the kernel. "
949 "No previous call to 'start_kernel'.")
978 "No previous call to 'start_kernel'.")
950 else:
979 else:
951 # Stop currently running kernel.
980 # Stop currently running kernel.
952 self.shutdown_kernel(now=now, restart=True)
981 self.shutdown_kernel(now=now, restart=True)
953
982
954 # Start new kernel.
983 # Start new kernel.
955 self._launch_args.update(kw)
984 self._launch_args.update(kw)
956 self.start_kernel(**self._launch_args)
985 self.start_kernel(**self._launch_args)
957
986
958 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
987 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
959 # unless there is some delay here.
988 # unless there is some delay here.
960 if sys.platform == 'win32':
989 if sys.platform == 'win32':
961 time.sleep(0.2)
990 time.sleep(0.2)
962
991
963 @property
992 @property
964 def has_kernel(self):
993 def has_kernel(self):
965 """Returns whether a kernel process has been specified for the kernel
994 """Has a kernel been started that we are managing."""
966 manager.
967 """
968 return self.kernel is not None
995 return self.kernel is not None
969
996
970 def kill_kernel(self):
997 def kill_kernel(self):
971 """ Kill the running kernel.
998 """Kill the running kernel.
972
999
973 This method blocks until the kernel process has terminated.
1000 This is a private method, callers should use shutdown_kernel(now=True).
974 """
1001 """
975 if self.has_kernel:
1002 if self.has_kernel:
976 # Pause the heart beat channel if it exists.
1003 # Pause the heart beat channel if it exists.
977 if self._hb_channel is not None:
1004 if self._hb_channel is not None:
978 self._hb_channel.pause()
1005 self._hb_channel.pause()
979
1006
980 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1007 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
981 # TerminateProcess() on Win32).
1008 # TerminateProcess() on Win32).
982 try:
1009 try:
983 self.kernel.kill()
1010 self.kernel.kill()
984 except OSError as e:
1011 except OSError as e:
985 # In Windows, we will get an Access Denied error if the process
1012 # In Windows, we will get an Access Denied error if the process
986 # has already terminated. Ignore it.
1013 # has already terminated. Ignore it.
987 if sys.platform == 'win32':
1014 if sys.platform == 'win32':
988 if e.winerror != 5:
1015 if e.winerror != 5:
989 raise
1016 raise
990 # On Unix, we may get an ESRCH error if the process has already
1017 # On Unix, we may get an ESRCH error if the process has already
991 # terminated. Ignore it.
1018 # terminated. Ignore it.
992 else:
1019 else:
993 from errno import ESRCH
1020 from errno import ESRCH
994 if e.errno != ESRCH:
1021 if e.errno != ESRCH:
995 raise
1022 raise
996
1023
997 # Block until the kernel terminates.
1024 # Block until the kernel terminates.
998 self.kernel.wait()
1025 self.kernel.wait()
999 self.kernel = None
1026 self.kernel = None
1000 else:
1027 else:
1001 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1028 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1002
1029
1003 def interrupt_kernel(self):
1030 def interrupt_kernel(self):
1004 """ Interrupts the kernel.
1031 """Interrupts the kernel by sending it a signal.
1005
1032
1006 Unlike ``signal_kernel``, this operation is well supported on all
1033 Unlike ``signal_kernel``, this operation is well supported on all
1007 platforms.
1034 platforms.
1008 """
1035 """
1009 if self.has_kernel:
1036 if self.has_kernel:
1010 if sys.platform == 'win32':
1037 if sys.platform == 'win32':
1011 from parentpoller import ParentPollerWindows as Poller
1038 from parentpoller import ParentPollerWindows as Poller
1012 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1039 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1013 else:
1040 else:
1014 self.kernel.send_signal(signal.SIGINT)
1041 self.kernel.send_signal(signal.SIGINT)
1015 else:
1042 else:
1016 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1043 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1017
1044
1018 def signal_kernel(self, signum):
1045 def signal_kernel(self, signum):
1019 """ Sends a signal to the kernel.
1046 """Sends a signal to the kernel.
1020
1047
1021 Note that since only SIGTERM is supported on Windows, this function is
1048 Note that since only SIGTERM is supported on Windows, this function is
1022 only useful on Unix systems.
1049 only useful on Unix systems.
1023 """
1050 """
1024 if self.has_kernel:
1051 if self.has_kernel:
1025 self.kernel.send_signal(signum)
1052 self.kernel.send_signal(signum)
1026 else:
1053 else:
1027 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1054 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1028
1055
1029 @property
1056 @property
1030 def is_alive(self):
1057 def is_alive(self):
1031 """Is the kernel process still running?"""
1058 """Is the kernel process still running?"""
1032 if self.has_kernel:
1059 if self.has_kernel:
1033 if self.kernel.poll() is None:
1060 if self.kernel.poll() is None:
1034 return True
1061 return True
1035 else:
1062 else:
1036 return False
1063 return False
1037 elif self._hb_channel is not None:
1064 elif self._hb_channel is not None:
1038 # We didn't start the kernel with this KernelManager so we
1065 # We didn't start the kernel with this KernelManager so we
1039 # use the heartbeat.
1066 # use the heartbeat.
1040 return self._hb_channel.is_beating()
1067 return self._hb_channel.is_beating()
1041 else:
1068 else:
1042 # no heartbeat and not local, we can't tell if it's running,
1069 # no heartbeat and not local, we can't tell if it's running,
1043 # so naively return True
1070 # so naively return True
1044 return True
1071 return True
1045
1072
1046
1073
1047 #-----------------------------------------------------------------------------
1074 #-----------------------------------------------------------------------------
1048 # ABC Registration
1075 # ABC Registration
1049 #-----------------------------------------------------------------------------
1076 #-----------------------------------------------------------------------------
1050
1077
1051 ShellChannelABC.register(ShellChannel)
1078 ShellChannelABC.register(ShellChannel)
1052 IOPubChannelABC.register(IOPubChannel)
1079 IOPubChannelABC.register(IOPubChannel)
1053 HBChannelABC.register(HBChannel)
1080 HBChannelABC.register(HBChannel)
1054 StdInChannelABC.register(StdInChannel)
1081 StdInChannelABC.register(StdInChannel)
1055 KernelManagerABC.register(KernelManager)
1082 KernelManagerABC.register(KernelManager)
1083
General Comments 0
You need to be logged in to leave comments. Login now