##// END OF EJS Templates
Correct import for kernelmanager on Windows
Thomas Kluyver -
Show More
@@ -1,1130 +1,1130
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import absolute_import
18 from __future__ import absolute_import
19
19
20 # Standard library imports
20 # Standard library imports
21 import atexit
21 import atexit
22 import errno
22 import errno
23 import json
23 import json
24 from subprocess import Popen
24 from subprocess import Popen
25 import os
25 import os
26 import signal
26 import signal
27 import sys
27 import sys
28 from threading import Thread
28 from threading import Thread
29 import time
29 import time
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 # during garbage collection of threads at exit:
34 # during garbage collection of threads at exit:
35 from zmq import ZMQError
35 from zmq import ZMQError
36 from zmq.eventloop import ioloop, zmqstream
36 from zmq.eventloop import ioloop, zmqstream
37
37
38 # Local imports
38 # Local imports
39 from IPython.config.configurable import Configurable
39 from IPython.config.configurable import Configurable
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 from IPython.utils.traitlets import (
41 from IPython.utils.traitlets import (
42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
43 )
43 )
44 from IPython.utils.py3compat import str_to_bytes
44 from IPython.utils.py3compat import str_to_bytes
45 from IPython.kernel import (
45 from IPython.kernel import (
46 write_connection_file,
46 write_connection_file,
47 make_ipkernel_cmd,
47 make_ipkernel_cmd,
48 launch_kernel,
48 launch_kernel,
49 )
49 )
50 from .zmq.session import Session
50 from .zmq.session import Session
51 from .kernelmanagerabc import (
51 from .kernelmanagerabc import (
52 ShellChannelABC, IOPubChannelABC,
52 ShellChannelABC, IOPubChannelABC,
53 HBChannelABC, StdInChannelABC,
53 HBChannelABC, StdInChannelABC,
54 KernelManagerABC
54 KernelManagerABC
55 )
55 )
56
56
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # Constants and exceptions
59 # Constants and exceptions
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62 class InvalidPortNumber(Exception):
62 class InvalidPortNumber(Exception):
63 pass
63 pass
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Utility functions
66 # Utility functions
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 # some utilities to validate message structure, these might get moved elsewhere
69 # some utilities to validate message structure, these might get moved elsewhere
70 # if they prove to have more generic utility
70 # if they prove to have more generic utility
71
71
72 def validate_string_list(lst):
72 def validate_string_list(lst):
73 """Validate that the input is a list of strings.
73 """Validate that the input is a list of strings.
74
74
75 Raises ValueError if not."""
75 Raises ValueError if not."""
76 if not isinstance(lst, list):
76 if not isinstance(lst, list):
77 raise ValueError('input %r must be a list' % lst)
77 raise ValueError('input %r must be a list' % lst)
78 for x in lst:
78 for x in lst:
79 if not isinstance(x, basestring):
79 if not isinstance(x, basestring):
80 raise ValueError('element %r in list must be a string' % x)
80 raise ValueError('element %r in list must be a string' % x)
81
81
82
82
83 def validate_string_dict(dct):
83 def validate_string_dict(dct):
84 """Validate that the input is a dict with string keys and values.
84 """Validate that the input is a dict with string keys and values.
85
85
86 Raises ValueError if not."""
86 Raises ValueError if not."""
87 for k,v in dct.iteritems():
87 for k,v in dct.iteritems():
88 if not isinstance(k, basestring):
88 if not isinstance(k, basestring):
89 raise ValueError('key %r in dict must be a string' % k)
89 raise ValueError('key %r in dict must be a string' % k)
90 if not isinstance(v, basestring):
90 if not isinstance(v, basestring):
91 raise ValueError('value %r in dict must be a string' % v)
91 raise ValueError('value %r in dict must be a string' % v)
92
92
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # ZMQ Socket Channel classes
95 # ZMQ Socket Channel classes
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97
97
98 class ZMQSocketChannel(Thread):
98 class ZMQSocketChannel(Thread):
99 """The base class for the channels that use ZMQ sockets."""
99 """The base class for the channels that use ZMQ sockets."""
100 context = None
100 context = None
101 session = None
101 session = None
102 socket = None
102 socket = None
103 ioloop = None
103 ioloop = None
104 stream = None
104 stream = None
105 _address = None
105 _address = None
106 _exiting = False
106 _exiting = False
107
107
108 def __init__(self, context, session, address):
108 def __init__(self, context, session, address):
109 """Create a channel.
109 """Create a channel.
110
110
111 Parameters
111 Parameters
112 ----------
112 ----------
113 context : :class:`zmq.Context`
113 context : :class:`zmq.Context`
114 The ZMQ context to use.
114 The ZMQ context to use.
115 session : :class:`session.Session`
115 session : :class:`session.Session`
116 The session to use.
116 The session to use.
117 address : zmq url
117 address : zmq url
118 Standard (ip, port) tuple that the kernel is listening on.
118 Standard (ip, port) tuple that the kernel is listening on.
119 """
119 """
120 super(ZMQSocketChannel, self).__init__()
120 super(ZMQSocketChannel, self).__init__()
121 self.daemon = True
121 self.daemon = True
122
122
123 self.context = context
123 self.context = context
124 self.session = session
124 self.session = session
125 if isinstance(address, tuple):
125 if isinstance(address, tuple):
126 if address[1] == 0:
126 if address[1] == 0:
127 message = 'The port number for a channel cannot be 0.'
127 message = 'The port number for a channel cannot be 0.'
128 raise InvalidPortNumber(message)
128 raise InvalidPortNumber(message)
129 address = "tcp://%s:%i" % address
129 address = "tcp://%s:%i" % address
130 self._address = address
130 self._address = address
131 atexit.register(self._notice_exit)
131 atexit.register(self._notice_exit)
132
132
133 def _notice_exit(self):
133 def _notice_exit(self):
134 self._exiting = True
134 self._exiting = True
135
135
136 def _run_loop(self):
136 def _run_loop(self):
137 """Run my loop, ignoring EINTR events in the poller"""
137 """Run my loop, ignoring EINTR events in the poller"""
138 while True:
138 while True:
139 try:
139 try:
140 self.ioloop.start()
140 self.ioloop.start()
141 except ZMQError as e:
141 except ZMQError as e:
142 if e.errno == errno.EINTR:
142 if e.errno == errno.EINTR:
143 continue
143 continue
144 else:
144 else:
145 raise
145 raise
146 except Exception:
146 except Exception:
147 if self._exiting:
147 if self._exiting:
148 break
148 break
149 else:
149 else:
150 raise
150 raise
151 else:
151 else:
152 break
152 break
153
153
154 def stop(self):
154 def stop(self):
155 """Stop the channel's event loop and join its thread.
155 """Stop the channel's event loop and join its thread.
156
156
157 This calls :method:`Thread.join` and returns when the thread
157 This calls :method:`Thread.join` and returns when the thread
158 terminates. :class:`RuntimeError` will be raised if
158 terminates. :class:`RuntimeError` will be raised if
159 :method:`self.start` is called again.
159 :method:`self.start` is called again.
160 """
160 """
161 self.join()
161 self.join()
162
162
163 @property
163 @property
164 def address(self):
164 def address(self):
165 """Get the channel's address as a zmq url string.
165 """Get the channel's address as a zmq url string.
166
166
167 These URLS have the form: 'tcp://127.0.0.1:5555'.
167 These URLS have the form: 'tcp://127.0.0.1:5555'.
168 """
168 """
169 return self._address
169 return self._address
170
170
171 def _queue_send(self, msg):
171 def _queue_send(self, msg):
172 """Queue a message to be sent from the IOLoop's thread.
172 """Queue a message to be sent from the IOLoop's thread.
173
173
174 Parameters
174 Parameters
175 ----------
175 ----------
176 msg : message to send
176 msg : message to send
177
177
178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
179 thread control of the action.
179 thread control of the action.
180 """
180 """
181 def thread_send():
181 def thread_send():
182 self.session.send(self.stream, msg)
182 self.session.send(self.stream, msg)
183 self.ioloop.add_callback(thread_send)
183 self.ioloop.add_callback(thread_send)
184
184
185 def _handle_recv(self, msg):
185 def _handle_recv(self, msg):
186 """Callback for stream.on_recv.
186 """Callback for stream.on_recv.
187
187
188 Unpacks message, and calls handlers with it.
188 Unpacks message, and calls handlers with it.
189 """
189 """
190 ident,smsg = self.session.feed_identities(msg)
190 ident,smsg = self.session.feed_identities(msg)
191 self.call_handlers(self.session.unserialize(smsg))
191 self.call_handlers(self.session.unserialize(smsg))
192
192
193
193
194
194
195 class ShellChannel(ZMQSocketChannel):
195 class ShellChannel(ZMQSocketChannel):
196 """The shell channel for issuing request/replies to the kernel."""
196 """The shell channel for issuing request/replies to the kernel."""
197
197
198 command_queue = None
198 command_queue = None
199 # flag for whether execute requests should be allowed to call raw_input:
199 # flag for whether execute requests should be allowed to call raw_input:
200 allow_stdin = True
200 allow_stdin = True
201
201
202 def __init__(self, context, session, address):
202 def __init__(self, context, session, address):
203 super(ShellChannel, self).__init__(context, session, address)
203 super(ShellChannel, self).__init__(context, session, address)
204 self.ioloop = ioloop.IOLoop()
204 self.ioloop = ioloop.IOLoop()
205
205
206 def run(self):
206 def run(self):
207 """The thread's main activity. Call start() instead."""
207 """The thread's main activity. Call start() instead."""
208 self.socket = self.context.socket(zmq.DEALER)
208 self.socket = self.context.socket(zmq.DEALER)
209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
210 self.socket.connect(self.address)
210 self.socket.connect(self.address)
211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 self.stream.on_recv(self._handle_recv)
212 self.stream.on_recv(self._handle_recv)
213 self._run_loop()
213 self._run_loop()
214 try:
214 try:
215 self.socket.close()
215 self.socket.close()
216 except:
216 except:
217 pass
217 pass
218
218
219 def stop(self):
219 def stop(self):
220 """Stop the channel's event loop and join its thread."""
220 """Stop the channel's event loop and join its thread."""
221 self.ioloop.stop()
221 self.ioloop.stop()
222 super(ShellChannel, self).stop()
222 super(ShellChannel, self).stop()
223
223
224 def call_handlers(self, msg):
224 def call_handlers(self, msg):
225 """This method is called in the ioloop thread when a message arrives.
225 """This method is called in the ioloop thread when a message arrives.
226
226
227 Subclasses should override this method to handle incoming messages.
227 Subclasses should override this method to handle incoming messages.
228 It is important to remember that this method is called in the thread
228 It is important to remember that this method is called in the thread
229 so that some logic must be done to ensure that the application leve
229 so that some logic must be done to ensure that the application leve
230 handlers are called in the application thread.
230 handlers are called in the application thread.
231 """
231 """
232 raise NotImplementedError('call_handlers must be defined in a subclass.')
232 raise NotImplementedError('call_handlers must be defined in a subclass.')
233
233
234 def execute(self, code, silent=False, store_history=True,
234 def execute(self, code, silent=False, store_history=True,
235 user_variables=None, user_expressions=None, allow_stdin=None):
235 user_variables=None, user_expressions=None, allow_stdin=None):
236 """Execute code in the kernel.
236 """Execute code in the kernel.
237
237
238 Parameters
238 Parameters
239 ----------
239 ----------
240 code : str
240 code : str
241 A string of Python code.
241 A string of Python code.
242
242
243 silent : bool, optional (default False)
243 silent : bool, optional (default False)
244 If set, the kernel will execute the code as quietly possible, and
244 If set, the kernel will execute the code as quietly possible, and
245 will force store_history to be False.
245 will force store_history to be False.
246
246
247 store_history : bool, optional (default True)
247 store_history : bool, optional (default True)
248 If set, the kernel will store command history. This is forced
248 If set, the kernel will store command history. This is forced
249 to be False if silent is True.
249 to be False if silent is True.
250
250
251 user_variables : list, optional
251 user_variables : list, optional
252 A list of variable names to pull from the user's namespace. They
252 A list of variable names to pull from the user's namespace. They
253 will come back as a dict with these names as keys and their
253 will come back as a dict with these names as keys and their
254 :func:`repr` as values.
254 :func:`repr` as values.
255
255
256 user_expressions : dict, optional
256 user_expressions : dict, optional
257 A dict mapping names to expressions to be evaluated in the user's
257 A dict mapping names to expressions to be evaluated in the user's
258 dict. The expression values are returned as strings formatted using
258 dict. The expression values are returned as strings formatted using
259 :func:`repr`.
259 :func:`repr`.
260
260
261 allow_stdin : bool, optional (default self.allow_stdin)
261 allow_stdin : bool, optional (default self.allow_stdin)
262 Flag for whether the kernel can send stdin requests to frontends.
262 Flag for whether the kernel can send stdin requests to frontends.
263
263
264 Some frontends (e.g. the Notebook) do not support stdin requests.
264 Some frontends (e.g. the Notebook) do not support stdin requests.
265 If raw_input is called from code executed from such a frontend, a
265 If raw_input is called from code executed from such a frontend, a
266 StdinNotImplementedError will be raised.
266 StdinNotImplementedError will be raised.
267
267
268 Returns
268 Returns
269 -------
269 -------
270 The msg_id of the message sent.
270 The msg_id of the message sent.
271 """
271 """
272 if user_variables is None:
272 if user_variables is None:
273 user_variables = []
273 user_variables = []
274 if user_expressions is None:
274 if user_expressions is None:
275 user_expressions = {}
275 user_expressions = {}
276 if allow_stdin is None:
276 if allow_stdin is None:
277 allow_stdin = self.allow_stdin
277 allow_stdin = self.allow_stdin
278
278
279
279
280 # Don't waste network traffic if inputs are invalid
280 # Don't waste network traffic if inputs are invalid
281 if not isinstance(code, basestring):
281 if not isinstance(code, basestring):
282 raise ValueError('code %r must be a string' % code)
282 raise ValueError('code %r must be a string' % code)
283 validate_string_list(user_variables)
283 validate_string_list(user_variables)
284 validate_string_dict(user_expressions)
284 validate_string_dict(user_expressions)
285
285
286 # Create class for content/msg creation. Related to, but possibly
286 # Create class for content/msg creation. Related to, but possibly
287 # not in Session.
287 # not in Session.
288 content = dict(code=code, silent=silent, store_history=store_history,
288 content = dict(code=code, silent=silent, store_history=store_history,
289 user_variables=user_variables,
289 user_variables=user_variables,
290 user_expressions=user_expressions,
290 user_expressions=user_expressions,
291 allow_stdin=allow_stdin,
291 allow_stdin=allow_stdin,
292 )
292 )
293 msg = self.session.msg('execute_request', content)
293 msg = self.session.msg('execute_request', content)
294 self._queue_send(msg)
294 self._queue_send(msg)
295 return msg['header']['msg_id']
295 return msg['header']['msg_id']
296
296
297 def complete(self, text, line, cursor_pos, block=None):
297 def complete(self, text, line, cursor_pos, block=None):
298 """Tab complete text in the kernel's namespace.
298 """Tab complete text in the kernel's namespace.
299
299
300 Parameters
300 Parameters
301 ----------
301 ----------
302 text : str
302 text : str
303 The text to complete.
303 The text to complete.
304 line : str
304 line : str
305 The full line of text that is the surrounding context for the
305 The full line of text that is the surrounding context for the
306 text to complete.
306 text to complete.
307 cursor_pos : int
307 cursor_pos : int
308 The position of the cursor in the line where the completion was
308 The position of the cursor in the line where the completion was
309 requested.
309 requested.
310 block : str, optional
310 block : str, optional
311 The full block of code in which the completion is being requested.
311 The full block of code in which the completion is being requested.
312
312
313 Returns
313 Returns
314 -------
314 -------
315 The msg_id of the message sent.
315 The msg_id of the message sent.
316 """
316 """
317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
318 msg = self.session.msg('complete_request', content)
318 msg = self.session.msg('complete_request', content)
319 self._queue_send(msg)
319 self._queue_send(msg)
320 return msg['header']['msg_id']
320 return msg['header']['msg_id']
321
321
322 def object_info(self, oname, detail_level=0):
322 def object_info(self, oname, detail_level=0):
323 """Get metadata information about an object in the kernel's namespace.
323 """Get metadata information about an object in the kernel's namespace.
324
324
325 Parameters
325 Parameters
326 ----------
326 ----------
327 oname : str
327 oname : str
328 A string specifying the object name.
328 A string specifying the object name.
329 detail_level : int, optional
329 detail_level : int, optional
330 The level of detail for the introspection (0-2)
330 The level of detail for the introspection (0-2)
331
331
332 Returns
332 Returns
333 -------
333 -------
334 The msg_id of the message sent.
334 The msg_id of the message sent.
335 """
335 """
336 content = dict(oname=oname, detail_level=detail_level)
336 content = dict(oname=oname, detail_level=detail_level)
337 msg = self.session.msg('object_info_request', content)
337 msg = self.session.msg('object_info_request', content)
338 self._queue_send(msg)
338 self._queue_send(msg)
339 return msg['header']['msg_id']
339 return msg['header']['msg_id']
340
340
341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
342 """Get entries from the kernel's history list.
342 """Get entries from the kernel's history list.
343
343
344 Parameters
344 Parameters
345 ----------
345 ----------
346 raw : bool
346 raw : bool
347 If True, return the raw input.
347 If True, return the raw input.
348 output : bool
348 output : bool
349 If True, then return the output as well.
349 If True, then return the output as well.
350 hist_access_type : str
350 hist_access_type : str
351 'range' (fill in session, start and stop params), 'tail' (fill in n)
351 'range' (fill in session, start and stop params), 'tail' (fill in n)
352 or 'search' (fill in pattern param).
352 or 'search' (fill in pattern param).
353
353
354 session : int
354 session : int
355 For a range request, the session from which to get lines. Session
355 For a range request, the session from which to get lines. Session
356 numbers are positive integers; negative ones count back from the
356 numbers are positive integers; negative ones count back from the
357 current session.
357 current session.
358 start : int
358 start : int
359 The first line number of a history range.
359 The first line number of a history range.
360 stop : int
360 stop : int
361 The final (excluded) line number of a history range.
361 The final (excluded) line number of a history range.
362
362
363 n : int
363 n : int
364 The number of lines of history to get for a tail request.
364 The number of lines of history to get for a tail request.
365
365
366 pattern : str
366 pattern : str
367 The glob-syntax pattern for a search request.
367 The glob-syntax pattern for a search request.
368
368
369 Returns
369 Returns
370 -------
370 -------
371 The msg_id of the message sent.
371 The msg_id of the message sent.
372 """
372 """
373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
374 **kwargs)
374 **kwargs)
375 msg = self.session.msg('history_request', content)
375 msg = self.session.msg('history_request', content)
376 self._queue_send(msg)
376 self._queue_send(msg)
377 return msg['header']['msg_id']
377 return msg['header']['msg_id']
378
378
379 def kernel_info(self):
379 def kernel_info(self):
380 """Request kernel info."""
380 """Request kernel info."""
381 msg = self.session.msg('kernel_info_request')
381 msg = self.session.msg('kernel_info_request')
382 self._queue_send(msg)
382 self._queue_send(msg)
383 return msg['header']['msg_id']
383 return msg['header']['msg_id']
384
384
385 def shutdown(self, restart=False):
385 def shutdown(self, restart=False):
386 """Request an immediate kernel shutdown.
386 """Request an immediate kernel shutdown.
387
387
388 Upon receipt of the (empty) reply, client code can safely assume that
388 Upon receipt of the (empty) reply, client code can safely assume that
389 the kernel has shut down and it's safe to forcefully terminate it if
389 the kernel has shut down and it's safe to forcefully terminate it if
390 it's still alive.
390 it's still alive.
391
391
392 The kernel will send the reply via a function registered with Python's
392 The kernel will send the reply via a function registered with Python's
393 atexit module, ensuring it's truly done as the kernel is done with all
393 atexit module, ensuring it's truly done as the kernel is done with all
394 normal operation.
394 normal operation.
395 """
395 """
396 # Send quit message to kernel. Once we implement kernel-side setattr,
396 # Send quit message to kernel. Once we implement kernel-side setattr,
397 # this should probably be done that way, but for now this will do.
397 # this should probably be done that way, but for now this will do.
398 msg = self.session.msg('shutdown_request', {'restart':restart})
398 msg = self.session.msg('shutdown_request', {'restart':restart})
399 self._queue_send(msg)
399 self._queue_send(msg)
400 return msg['header']['msg_id']
400 return msg['header']['msg_id']
401
401
402
402
403
403
404 class IOPubChannel(ZMQSocketChannel):
404 class IOPubChannel(ZMQSocketChannel):
405 """The iopub channel which listens for messages that the kernel publishes.
405 """The iopub channel which listens for messages that the kernel publishes.
406
406
407 This channel is where all output is published to frontends.
407 This channel is where all output is published to frontends.
408 """
408 """
409
409
410 def __init__(self, context, session, address):
410 def __init__(self, context, session, address):
411 super(IOPubChannel, self).__init__(context, session, address)
411 super(IOPubChannel, self).__init__(context, session, address)
412 self.ioloop = ioloop.IOLoop()
412 self.ioloop = ioloop.IOLoop()
413
413
414 def run(self):
414 def run(self):
415 """The thread's main activity. Call start() instead."""
415 """The thread's main activity. Call start() instead."""
416 self.socket = self.context.socket(zmq.SUB)
416 self.socket = self.context.socket(zmq.SUB)
417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
419 self.socket.connect(self.address)
419 self.socket.connect(self.address)
420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
421 self.stream.on_recv(self._handle_recv)
421 self.stream.on_recv(self._handle_recv)
422 self._run_loop()
422 self._run_loop()
423 try:
423 try:
424 self.socket.close()
424 self.socket.close()
425 except:
425 except:
426 pass
426 pass
427
427
428 def stop(self):
428 def stop(self):
429 """Stop the channel's event loop and join its thread."""
429 """Stop the channel's event loop and join its thread."""
430 self.ioloop.stop()
430 self.ioloop.stop()
431 super(IOPubChannel, self).stop()
431 super(IOPubChannel, self).stop()
432
432
433 def call_handlers(self, msg):
433 def call_handlers(self, msg):
434 """This method is called in the ioloop thread when a message arrives.
434 """This method is called in the ioloop thread when a message arrives.
435
435
436 Subclasses should override this method to handle incoming messages.
436 Subclasses should override this method to handle incoming messages.
437 It is important to remember that this method is called in the thread
437 It is important to remember that this method is called in the thread
438 so that some logic must be done to ensure that the application leve
438 so that some logic must be done to ensure that the application leve
439 handlers are called in the application thread.
439 handlers are called in the application thread.
440 """
440 """
441 raise NotImplementedError('call_handlers must be defined in a subclass.')
441 raise NotImplementedError('call_handlers must be defined in a subclass.')
442
442
443 def flush(self, timeout=1.0):
443 def flush(self, timeout=1.0):
444 """Immediately processes all pending messages on the iopub channel.
444 """Immediately processes all pending messages on the iopub channel.
445
445
446 Callers should use this method to ensure that :method:`call_handlers`
446 Callers should use this method to ensure that :method:`call_handlers`
447 has been called for all messages that have been received on the
447 has been called for all messages that have been received on the
448 0MQ SUB socket of this channel.
448 0MQ SUB socket of this channel.
449
449
450 This method is thread safe.
450 This method is thread safe.
451
451
452 Parameters
452 Parameters
453 ----------
453 ----------
454 timeout : float, optional
454 timeout : float, optional
455 The maximum amount of time to spend flushing, in seconds. The
455 The maximum amount of time to spend flushing, in seconds. The
456 default is one second.
456 default is one second.
457 """
457 """
458 # We do the IOLoop callback process twice to ensure that the IOLoop
458 # We do the IOLoop callback process twice to ensure that the IOLoop
459 # gets to perform at least one full poll.
459 # gets to perform at least one full poll.
460 stop_time = time.time() + timeout
460 stop_time = time.time() + timeout
461 for i in xrange(2):
461 for i in xrange(2):
462 self._flushed = False
462 self._flushed = False
463 self.ioloop.add_callback(self._flush)
463 self.ioloop.add_callback(self._flush)
464 while not self._flushed and time.time() < stop_time:
464 while not self._flushed and time.time() < stop_time:
465 time.sleep(0.01)
465 time.sleep(0.01)
466
466
467 def _flush(self):
467 def _flush(self):
468 """Callback for :method:`self.flush`."""
468 """Callback for :method:`self.flush`."""
469 self.stream.flush()
469 self.stream.flush()
470 self._flushed = True
470 self._flushed = True
471
471
472
472
473 class StdInChannel(ZMQSocketChannel):
473 class StdInChannel(ZMQSocketChannel):
474 """The stdin channel to handle raw_input requests that the kernel makes."""
474 """The stdin channel to handle raw_input requests that the kernel makes."""
475
475
476 msg_queue = None
476 msg_queue = None
477
477
478 def __init__(self, context, session, address):
478 def __init__(self, context, session, address):
479 super(StdInChannel, self).__init__(context, session, address)
479 super(StdInChannel, self).__init__(context, session, address)
480 self.ioloop = ioloop.IOLoop()
480 self.ioloop = ioloop.IOLoop()
481
481
482 def run(self):
482 def run(self):
483 """The thread's main activity. Call start() instead."""
483 """The thread's main activity. Call start() instead."""
484 self.socket = self.context.socket(zmq.DEALER)
484 self.socket = self.context.socket(zmq.DEALER)
485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
486 self.socket.connect(self.address)
486 self.socket.connect(self.address)
487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
488 self.stream.on_recv(self._handle_recv)
488 self.stream.on_recv(self._handle_recv)
489 self._run_loop()
489 self._run_loop()
490 try:
490 try:
491 self.socket.close()
491 self.socket.close()
492 except:
492 except:
493 pass
493 pass
494
494
495 def stop(self):
495 def stop(self):
496 """Stop the channel's event loop and join its thread."""
496 """Stop the channel's event loop and join its thread."""
497 self.ioloop.stop()
497 self.ioloop.stop()
498 super(StdInChannel, self).stop()
498 super(StdInChannel, self).stop()
499
499
500 def call_handlers(self, msg):
500 def call_handlers(self, msg):
501 """This method is called in the ioloop thread when a message arrives.
501 """This method is called in the ioloop thread when a message arrives.
502
502
503 Subclasses should override this method to handle incoming messages.
503 Subclasses should override this method to handle incoming messages.
504 It is important to remember that this method is called in the thread
504 It is important to remember that this method is called in the thread
505 so that some logic must be done to ensure that the application leve
505 so that some logic must be done to ensure that the application leve
506 handlers are called in the application thread.
506 handlers are called in the application thread.
507 """
507 """
508 raise NotImplementedError('call_handlers must be defined in a subclass.')
508 raise NotImplementedError('call_handlers must be defined in a subclass.')
509
509
510 def input(self, string):
510 def input(self, string):
511 """Send a string of raw input to the kernel."""
511 """Send a string of raw input to the kernel."""
512 content = dict(value=string)
512 content = dict(value=string)
513 msg = self.session.msg('input_reply', content)
513 msg = self.session.msg('input_reply', content)
514 self._queue_send(msg)
514 self._queue_send(msg)
515
515
516
516
517 class HBChannel(ZMQSocketChannel):
517 class HBChannel(ZMQSocketChannel):
518 """The heartbeat channel which monitors the kernel heartbeat.
518 """The heartbeat channel which monitors the kernel heartbeat.
519
519
520 Note that the heartbeat channel is paused by default. As long as you start
520 Note that the heartbeat channel is paused by default. As long as you start
521 this channel, the kernel manager will ensure that it is paused and un-paused
521 this channel, the kernel manager will ensure that it is paused and un-paused
522 as appropriate.
522 as appropriate.
523 """
523 """
524
524
525 time_to_dead = 3.0
525 time_to_dead = 3.0
526 socket = None
526 socket = None
527 poller = None
527 poller = None
528 _running = None
528 _running = None
529 _pause = None
529 _pause = None
530 _beating = None
530 _beating = None
531
531
532 def __init__(self, context, session, address):
532 def __init__(self, context, session, address):
533 super(HBChannel, self).__init__(context, session, address)
533 super(HBChannel, self).__init__(context, session, address)
534 self._running = False
534 self._running = False
535 self._pause =True
535 self._pause =True
536 self.poller = zmq.Poller()
536 self.poller = zmq.Poller()
537
537
538 def _create_socket(self):
538 def _create_socket(self):
539 if self.socket is not None:
539 if self.socket is not None:
540 # close previous socket, before opening a new one
540 # close previous socket, before opening a new one
541 self.poller.unregister(self.socket)
541 self.poller.unregister(self.socket)
542 self.socket.close()
542 self.socket.close()
543 self.socket = self.context.socket(zmq.REQ)
543 self.socket = self.context.socket(zmq.REQ)
544 self.socket.setsockopt(zmq.LINGER, 0)
544 self.socket.setsockopt(zmq.LINGER, 0)
545 self.socket.connect(self.address)
545 self.socket.connect(self.address)
546
546
547 self.poller.register(self.socket, zmq.POLLIN)
547 self.poller.register(self.socket, zmq.POLLIN)
548
548
549 def _poll(self, start_time):
549 def _poll(self, start_time):
550 """poll for heartbeat replies until we reach self.time_to_dead.
550 """poll for heartbeat replies until we reach self.time_to_dead.
551
551
552 Ignores interrupts, and returns the result of poll(), which
552 Ignores interrupts, and returns the result of poll(), which
553 will be an empty list if no messages arrived before the timeout,
553 will be an empty list if no messages arrived before the timeout,
554 or the event tuple if there is a message to receive.
554 or the event tuple if there is a message to receive.
555 """
555 """
556
556
557 until_dead = self.time_to_dead - (time.time() - start_time)
557 until_dead = self.time_to_dead - (time.time() - start_time)
558 # ensure poll at least once
558 # ensure poll at least once
559 until_dead = max(until_dead, 1e-3)
559 until_dead = max(until_dead, 1e-3)
560 events = []
560 events = []
561 while True:
561 while True:
562 try:
562 try:
563 events = self.poller.poll(1000 * until_dead)
563 events = self.poller.poll(1000 * until_dead)
564 except ZMQError as e:
564 except ZMQError as e:
565 if e.errno == errno.EINTR:
565 if e.errno == errno.EINTR:
566 # ignore interrupts during heartbeat
566 # ignore interrupts during heartbeat
567 # this may never actually happen
567 # this may never actually happen
568 until_dead = self.time_to_dead - (time.time() - start_time)
568 until_dead = self.time_to_dead - (time.time() - start_time)
569 until_dead = max(until_dead, 1e-3)
569 until_dead = max(until_dead, 1e-3)
570 pass
570 pass
571 else:
571 else:
572 raise
572 raise
573 except Exception:
573 except Exception:
574 if self._exiting:
574 if self._exiting:
575 break
575 break
576 else:
576 else:
577 raise
577 raise
578 else:
578 else:
579 break
579 break
580 return events
580 return events
581
581
582 def run(self):
582 def run(self):
583 """The thread's main activity. Call start() instead."""
583 """The thread's main activity. Call start() instead."""
584 self._create_socket()
584 self._create_socket()
585 self._running = True
585 self._running = True
586 self._beating = True
586 self._beating = True
587
587
588 while self._running:
588 while self._running:
589 if self._pause:
589 if self._pause:
590 # just sleep, and skip the rest of the loop
590 # just sleep, and skip the rest of the loop
591 time.sleep(self.time_to_dead)
591 time.sleep(self.time_to_dead)
592 continue
592 continue
593
593
594 since_last_heartbeat = 0.0
594 since_last_heartbeat = 0.0
595 # io.rprint('Ping from HB channel') # dbg
595 # io.rprint('Ping from HB channel') # dbg
596 # no need to catch EFSM here, because the previous event was
596 # no need to catch EFSM here, because the previous event was
597 # either a recv or connect, which cannot be followed by EFSM
597 # either a recv or connect, which cannot be followed by EFSM
598 self.socket.send(b'ping')
598 self.socket.send(b'ping')
599 request_time = time.time()
599 request_time = time.time()
600 ready = self._poll(request_time)
600 ready = self._poll(request_time)
601 if ready:
601 if ready:
602 self._beating = True
602 self._beating = True
603 # the poll above guarantees we have something to recv
603 # the poll above guarantees we have something to recv
604 self.socket.recv()
604 self.socket.recv()
605 # sleep the remainder of the cycle
605 # sleep the remainder of the cycle
606 remainder = self.time_to_dead - (time.time() - request_time)
606 remainder = self.time_to_dead - (time.time() - request_time)
607 if remainder > 0:
607 if remainder > 0:
608 time.sleep(remainder)
608 time.sleep(remainder)
609 continue
609 continue
610 else:
610 else:
611 # nothing was received within the time limit, signal heart failure
611 # nothing was received within the time limit, signal heart failure
612 self._beating = False
612 self._beating = False
613 since_last_heartbeat = time.time() - request_time
613 since_last_heartbeat = time.time() - request_time
614 self.call_handlers(since_last_heartbeat)
614 self.call_handlers(since_last_heartbeat)
615 # and close/reopen the socket, because the REQ/REP cycle has been broken
615 # and close/reopen the socket, because the REQ/REP cycle has been broken
616 self._create_socket()
616 self._create_socket()
617 continue
617 continue
618 try:
618 try:
619 self.socket.close()
619 self.socket.close()
620 except:
620 except:
621 pass
621 pass
622
622
623 def pause(self):
623 def pause(self):
624 """Pause the heartbeat."""
624 """Pause the heartbeat."""
625 self._pause = True
625 self._pause = True
626
626
627 def unpause(self):
627 def unpause(self):
628 """Unpause the heartbeat."""
628 """Unpause the heartbeat."""
629 self._pause = False
629 self._pause = False
630
630
631 def is_beating(self):
631 def is_beating(self):
632 """Is the heartbeat running and responsive (and not paused)."""
632 """Is the heartbeat running and responsive (and not paused)."""
633 if self.is_alive() and not self._pause and self._beating:
633 if self.is_alive() and not self._pause and self._beating:
634 return True
634 return True
635 else:
635 else:
636 return False
636 return False
637
637
638 def stop(self):
638 def stop(self):
639 """Stop the channel's event loop and join its thread."""
639 """Stop the channel's event loop and join its thread."""
640 self._running = False
640 self._running = False
641 super(HBChannel, self).stop()
641 super(HBChannel, self).stop()
642
642
643 def call_handlers(self, since_last_heartbeat):
643 def call_handlers(self, since_last_heartbeat):
644 """This method is called in the ioloop thread when a message arrives.
644 """This method is called in the ioloop thread when a message arrives.
645
645
646 Subclasses should override this method to handle incoming messages.
646 Subclasses should override this method to handle incoming messages.
647 It is important to remember that this method is called in the thread
647 It is important to remember that this method is called in the thread
648 so that some logic must be done to ensure that the application level
648 so that some logic must be done to ensure that the application level
649 handlers are called in the application thread.
649 handlers are called in the application thread.
650 """
650 """
651 raise NotImplementedError('call_handlers must be defined in a subclass.')
651 raise NotImplementedError('call_handlers must be defined in a subclass.')
652
652
653
653
654 #-----------------------------------------------------------------------------
654 #-----------------------------------------------------------------------------
655 # Main kernel manager class
655 # Main kernel manager class
656 #-----------------------------------------------------------------------------
656 #-----------------------------------------------------------------------------
657
657
658 class KernelManager(Configurable):
658 class KernelManager(Configurable):
659 """Manages a single kernel on this host along with its channels.
659 """Manages a single kernel on this host along with its channels.
660
660
661 There are four channels associated with each kernel:
661 There are four channels associated with each kernel:
662
662
663 * shell: for request/reply calls to the kernel.
663 * shell: for request/reply calls to the kernel.
664 * iopub: for the kernel to publish results to frontends.
664 * iopub: for the kernel to publish results to frontends.
665 * hb: for monitoring the kernel's heartbeat.
665 * hb: for monitoring the kernel's heartbeat.
666 * stdin: for frontends to reply to raw_input calls in the kernel.
666 * stdin: for frontends to reply to raw_input calls in the kernel.
667
667
668 The usage of the channels that this class manages is optional. It is
668 The usage of the channels that this class manages is optional. It is
669 entirely possible to connect to the kernels directly using ZeroMQ
669 entirely possible to connect to the kernels directly using ZeroMQ
670 sockets. These channels are useful primarily for talking to a kernel
670 sockets. These channels are useful primarily for talking to a kernel
671 whose :class:`KernelManager` is in the same process.
671 whose :class:`KernelManager` is in the same process.
672
672
673 This version manages kernels started using Popen.
673 This version manages kernels started using Popen.
674 """
674 """
675 # The PyZMQ Context to use for communication with the kernel.
675 # The PyZMQ Context to use for communication with the kernel.
676 context = Instance(zmq.Context)
676 context = Instance(zmq.Context)
677 def _context_default(self):
677 def _context_default(self):
678 return zmq.Context.instance()
678 return zmq.Context.instance()
679
679
680 # The Session to use for communication with the kernel.
680 # The Session to use for communication with the kernel.
681 session = Instance(Session)
681 session = Instance(Session)
682 def _session_default(self):
682 def _session_default(self):
683 return Session(config=self.config)
683 return Session(config=self.config)
684
684
685 # The kernel process with which the KernelManager is communicating.
685 # The kernel process with which the KernelManager is communicating.
686 # generally a Popen instance
686 # generally a Popen instance
687 kernel = Any()
687 kernel = Any()
688
688
689 kernel_cmd = List(Unicode, config=True,
689 kernel_cmd = List(Unicode, config=True,
690 help="""The Popen Command to launch the kernel.
690 help="""The Popen Command to launch the kernel.
691 Override this if you have a custom
691 Override this if you have a custom
692 """
692 """
693 )
693 )
694 def _kernel_cmd_changed(self, name, old, new):
694 def _kernel_cmd_changed(self, name, old, new):
695 self.ipython_kernel = False
695 self.ipython_kernel = False
696
696
697 ipython_kernel = Bool(True)
697 ipython_kernel = Bool(True)
698
698
699
699
700 # The addresses for the communication channels.
700 # The addresses for the communication channels.
701 connection_file = Unicode('')
701 connection_file = Unicode('')
702
702
703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
704
704
705 ip = Unicode(LOCALHOST, config=True,
705 ip = Unicode(LOCALHOST, config=True,
706 help="""Set the kernel\'s IP address [default localhost].
706 help="""Set the kernel\'s IP address [default localhost].
707 If the IP address is something other than localhost, then
707 If the IP address is something other than localhost, then
708 Consoles on other machines will be able to connect
708 Consoles on other machines will be able to connect
709 to the Kernel, so be careful!"""
709 to the Kernel, so be careful!"""
710 )
710 )
711 def _ip_default(self):
711 def _ip_default(self):
712 if self.transport == 'ipc':
712 if self.transport == 'ipc':
713 if self.connection_file:
713 if self.connection_file:
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 else:
715 else:
716 return 'kernel-ipc'
716 return 'kernel-ipc'
717 else:
717 else:
718 return LOCALHOST
718 return LOCALHOST
719 def _ip_changed(self, name, old, new):
719 def _ip_changed(self, name, old, new):
720 if new == '*':
720 if new == '*':
721 self.ip = '0.0.0.0'
721 self.ip = '0.0.0.0'
722 shell_port = Integer(0)
722 shell_port = Integer(0)
723 iopub_port = Integer(0)
723 iopub_port = Integer(0)
724 stdin_port = Integer(0)
724 stdin_port = Integer(0)
725 hb_port = Integer(0)
725 hb_port = Integer(0)
726
726
727 # The classes to use for the various channels.
727 # The classes to use for the various channels.
728 shell_channel_class = Type(ShellChannel)
728 shell_channel_class = Type(ShellChannel)
729 iopub_channel_class = Type(IOPubChannel)
729 iopub_channel_class = Type(IOPubChannel)
730 stdin_channel_class = Type(StdInChannel)
730 stdin_channel_class = Type(StdInChannel)
731 hb_channel_class = Type(HBChannel)
731 hb_channel_class = Type(HBChannel)
732
732
733 # Protected traits.
733 # Protected traits.
734 _launch_args = Any
734 _launch_args = Any
735 _shell_channel = Any
735 _shell_channel = Any
736 _iopub_channel = Any
736 _iopub_channel = Any
737 _stdin_channel = Any
737 _stdin_channel = Any
738 _hb_channel = Any
738 _hb_channel = Any
739 _connection_file_written=Bool(False)
739 _connection_file_written=Bool(False)
740
740
741 def __del__(self):
741 def __del__(self):
742 self.cleanup_connection_file()
742 self.cleanup_connection_file()
743
743
744 #--------------------------------------------------------------------------
744 #--------------------------------------------------------------------------
745 # Channel management methods:
745 # Channel management methods:
746 #--------------------------------------------------------------------------
746 #--------------------------------------------------------------------------
747
747
748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
749 """Starts the channels for this kernel.
749 """Starts the channels for this kernel.
750
750
751 This will create the channels if they do not exist and then start
751 This will create the channels if they do not exist and then start
752 them (their activity runs in a thread). If port numbers of 0 are
752 them (their activity runs in a thread). If port numbers of 0 are
753 being used (random ports) then you must first call
753 being used (random ports) then you must first call
754 :method:`start_kernel`. If the channels have been stopped and you
754 :method:`start_kernel`. If the channels have been stopped and you
755 call this, :class:`RuntimeError` will be raised.
755 call this, :class:`RuntimeError` will be raised.
756 """
756 """
757 if shell:
757 if shell:
758 self.shell_channel.start()
758 self.shell_channel.start()
759 if iopub:
759 if iopub:
760 self.iopub_channel.start()
760 self.iopub_channel.start()
761 if stdin:
761 if stdin:
762 self.stdin_channel.start()
762 self.stdin_channel.start()
763 self.shell_channel.allow_stdin = True
763 self.shell_channel.allow_stdin = True
764 else:
764 else:
765 self.shell_channel.allow_stdin = False
765 self.shell_channel.allow_stdin = False
766 if hb:
766 if hb:
767 self.hb_channel.start()
767 self.hb_channel.start()
768
768
769 def stop_channels(self):
769 def stop_channels(self):
770 """Stops all the running channels for this kernel.
770 """Stops all the running channels for this kernel.
771
771
772 This stops their event loops and joins their threads.
772 This stops their event loops and joins their threads.
773 """
773 """
774 if self.shell_channel.is_alive():
774 if self.shell_channel.is_alive():
775 self.shell_channel.stop()
775 self.shell_channel.stop()
776 if self.iopub_channel.is_alive():
776 if self.iopub_channel.is_alive():
777 self.iopub_channel.stop()
777 self.iopub_channel.stop()
778 if self.stdin_channel.is_alive():
778 if self.stdin_channel.is_alive():
779 self.stdin_channel.stop()
779 self.stdin_channel.stop()
780 if self.hb_channel.is_alive():
780 if self.hb_channel.is_alive():
781 self.hb_channel.stop()
781 self.hb_channel.stop()
782
782
783 @property
783 @property
784 def channels_running(self):
784 def channels_running(self):
785 """Are any of the channels created and running?"""
785 """Are any of the channels created and running?"""
786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
788
788
789 def _make_url(self, port):
789 def _make_url(self, port):
790 """Make a zmq url with a port.
790 """Make a zmq url with a port.
791
791
792 There are two cases that this handles:
792 There are two cases that this handles:
793
793
794 * tcp: tcp://ip:port
794 * tcp: tcp://ip:port
795 * ipc: ipc://ip-port
795 * ipc: ipc://ip-port
796 """
796 """
797 if self.transport == 'tcp':
797 if self.transport == 'tcp':
798 return "tcp://%s:%i" % (self.ip, port)
798 return "tcp://%s:%i" % (self.ip, port)
799 else:
799 else:
800 return "%s://%s-%s" % (self.transport, self.ip, port)
800 return "%s://%s-%s" % (self.transport, self.ip, port)
801
801
802 @property
802 @property
803 def shell_channel(self):
803 def shell_channel(self):
804 """Get the shell channel object for this kernel."""
804 """Get the shell channel object for this kernel."""
805 if self._shell_channel is None:
805 if self._shell_channel is None:
806 self._shell_channel = self.shell_channel_class(
806 self._shell_channel = self.shell_channel_class(
807 self.context, self.session, self._make_url(self.shell_port)
807 self.context, self.session, self._make_url(self.shell_port)
808 )
808 )
809 return self._shell_channel
809 return self._shell_channel
810
810
811 @property
811 @property
812 def iopub_channel(self):
812 def iopub_channel(self):
813 """Get the iopub channel object for this kernel."""
813 """Get the iopub channel object for this kernel."""
814 if self._iopub_channel is None:
814 if self._iopub_channel is None:
815 self._iopub_channel = self.iopub_channel_class(
815 self._iopub_channel = self.iopub_channel_class(
816 self.context, self.session, self._make_url(self.iopub_port)
816 self.context, self.session, self._make_url(self.iopub_port)
817 )
817 )
818 return self._iopub_channel
818 return self._iopub_channel
819
819
820 @property
820 @property
821 def stdin_channel(self):
821 def stdin_channel(self):
822 """Get the stdin channel object for this kernel."""
822 """Get the stdin channel object for this kernel."""
823 if self._stdin_channel is None:
823 if self._stdin_channel is None:
824 self._stdin_channel = self.stdin_channel_class(
824 self._stdin_channel = self.stdin_channel_class(
825 self.context, self.session, self._make_url(self.stdin_port)
825 self.context, self.session, self._make_url(self.stdin_port)
826 )
826 )
827 return self._stdin_channel
827 return self._stdin_channel
828
828
829 @property
829 @property
830 def hb_channel(self):
830 def hb_channel(self):
831 """Get the hb channel object for this kernel."""
831 """Get the hb channel object for this kernel."""
832 if self._hb_channel is None:
832 if self._hb_channel is None:
833 self._hb_channel = self.hb_channel_class(
833 self._hb_channel = self.hb_channel_class(
834 self.context, self.session, self._make_url(self.hb_port)
834 self.context, self.session, self._make_url(self.hb_port)
835 )
835 )
836 return self._hb_channel
836 return self._hb_channel
837
837
838 #--------------------------------------------------------------------------
838 #--------------------------------------------------------------------------
839 # Connection and ipc file management
839 # Connection and ipc file management
840 #--------------------------------------------------------------------------
840 #--------------------------------------------------------------------------
841
841
842 def cleanup_connection_file(self):
842 def cleanup_connection_file(self):
843 """Cleanup connection file *if we wrote it*
843 """Cleanup connection file *if we wrote it*
844
844
845 Will not raise if the connection file was already removed somehow.
845 Will not raise if the connection file was already removed somehow.
846 """
846 """
847 if self._connection_file_written:
847 if self._connection_file_written:
848 # cleanup connection files on full shutdown of kernel we started
848 # cleanup connection files on full shutdown of kernel we started
849 self._connection_file_written = False
849 self._connection_file_written = False
850 try:
850 try:
851 os.remove(self.connection_file)
851 os.remove(self.connection_file)
852 except (IOError, OSError):
852 except (IOError, OSError):
853 pass
853 pass
854
854
855 def cleanup_ipc_files(self):
855 def cleanup_ipc_files(self):
856 """Cleanup ipc files if we wrote them."""
856 """Cleanup ipc files if we wrote them."""
857 if self.transport != 'ipc':
857 if self.transport != 'ipc':
858 return
858 return
859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
860 ipcfile = "%s-%i" % (self.ip, port)
860 ipcfile = "%s-%i" % (self.ip, port)
861 try:
861 try:
862 os.remove(ipcfile)
862 os.remove(ipcfile)
863 except (IOError, OSError):
863 except (IOError, OSError):
864 pass
864 pass
865
865
866 def load_connection_file(self):
866 def load_connection_file(self):
867 """Load connection info from JSON dict in self.connection_file."""
867 """Load connection info from JSON dict in self.connection_file."""
868 with open(self.connection_file) as f:
868 with open(self.connection_file) as f:
869 cfg = json.loads(f.read())
869 cfg = json.loads(f.read())
870
870
871 from pprint import pprint
871 from pprint import pprint
872 pprint(cfg)
872 pprint(cfg)
873 self.transport = cfg.get('transport', 'tcp')
873 self.transport = cfg.get('transport', 'tcp')
874 self.ip = cfg['ip']
874 self.ip = cfg['ip']
875 self.shell_port = cfg['shell_port']
875 self.shell_port = cfg['shell_port']
876 self.stdin_port = cfg['stdin_port']
876 self.stdin_port = cfg['stdin_port']
877 self.iopub_port = cfg['iopub_port']
877 self.iopub_port = cfg['iopub_port']
878 self.hb_port = cfg['hb_port']
878 self.hb_port = cfg['hb_port']
879 self.session.key = str_to_bytes(cfg['key'])
879 self.session.key = str_to_bytes(cfg['key'])
880
880
881 def write_connection_file(self):
881 def write_connection_file(self):
882 """Write connection info to JSON dict in self.connection_file."""
882 """Write connection info to JSON dict in self.connection_file."""
883 if self._connection_file_written:
883 if self._connection_file_written:
884 return
884 return
885 self.connection_file,cfg = write_connection_file(self.connection_file,
885 self.connection_file,cfg = write_connection_file(self.connection_file,
886 transport=self.transport, ip=self.ip, key=self.session.key,
886 transport=self.transport, ip=self.ip, key=self.session.key,
887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
888 shell_port=self.shell_port, hb_port=self.hb_port)
888 shell_port=self.shell_port, hb_port=self.hb_port)
889 # write_connection_file also sets default ports:
889 # write_connection_file also sets default ports:
890 self.shell_port = cfg['shell_port']
890 self.shell_port = cfg['shell_port']
891 self.stdin_port = cfg['stdin_port']
891 self.stdin_port = cfg['stdin_port']
892 self.iopub_port = cfg['iopub_port']
892 self.iopub_port = cfg['iopub_port']
893 self.hb_port = cfg['hb_port']
893 self.hb_port = cfg['hb_port']
894
894
895 self._connection_file_written = True
895 self._connection_file_written = True
896
896
897 #--------------------------------------------------------------------------
897 #--------------------------------------------------------------------------
898 # Kernel management
898 # Kernel management
899 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
900
900
901 def format_kernel_cmd(self, **kw):
901 def format_kernel_cmd(self, **kw):
902 """format templated args (e.g. {connection_file})"""
902 """format templated args (e.g. {connection_file})"""
903 if self.kernel_cmd:
903 if self.kernel_cmd:
904 cmd = self.kernel_cmd
904 cmd = self.kernel_cmd
905 else:
905 else:
906 cmd = make_ipkernel_cmd(
906 cmd = make_ipkernel_cmd(
907 'from IPython.kernel.zmq.kernelapp import main; main()',
907 'from IPython.kernel.zmq.kernelapp import main; main()',
908 **kw
908 **kw
909 )
909 )
910 ns = dict(connection_file=self.connection_file)
910 ns = dict(connection_file=self.connection_file)
911 ns.update(self._launch_args)
911 ns.update(self._launch_args)
912 return [ c.format(**ns) for c in cmd ]
912 return [ c.format(**ns) for c in cmd ]
913
913
914 def _launch_kernel(self, kernel_cmd, **kw):
914 def _launch_kernel(self, kernel_cmd, **kw):
915 """actually launch the kernel
915 """actually launch the kernel
916
916
917 override in a subclass to launch kernel subprocesses differently
917 override in a subclass to launch kernel subprocesses differently
918 """
918 """
919 return launch_kernel(kernel_cmd, **kw)
919 return launch_kernel(kernel_cmd, **kw)
920
920
921 def start_kernel(self, **kw):
921 def start_kernel(self, **kw):
922 """Starts a kernel on this host in a separate process.
922 """Starts a kernel on this host in a separate process.
923
923
924 If random ports (port=0) are being used, this method must be called
924 If random ports (port=0) are being used, this method must be called
925 before the channels are created.
925 before the channels are created.
926
926
927 Parameters:
927 Parameters:
928 -----------
928 -----------
929 **kw : optional
929 **kw : optional
930 keyword arguments that are passed down to build the kernel_cmd
930 keyword arguments that are passed down to build the kernel_cmd
931 and launching the kernel (e.g. Popen kwargs).
931 and launching the kernel (e.g. Popen kwargs).
932 """
932 """
933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
934 raise RuntimeError("Can only launch a kernel on a local interface. "
934 raise RuntimeError("Can only launch a kernel on a local interface. "
935 "Make sure that the '*_address' attributes are "
935 "Make sure that the '*_address' attributes are "
936 "configured properly. "
936 "configured properly. "
937 "Currently valid addresses are: %s"%LOCAL_IPS
937 "Currently valid addresses are: %s"%LOCAL_IPS
938 )
938 )
939
939
940 # write connection file / get default ports
940 # write connection file / get default ports
941 self.write_connection_file()
941 self.write_connection_file()
942
942
943 # save kwargs for use in restart
943 # save kwargs for use in restart
944 self._launch_args = kw.copy()
944 self._launch_args = kw.copy()
945 # build the Popen cmd
945 # build the Popen cmd
946 kernel_cmd = self.format_kernel_cmd(**kw)
946 kernel_cmd = self.format_kernel_cmd(**kw)
947 # launch the kernel subprocess
947 # launch the kernel subprocess
948 self.kernel = self._launch_kernel(kernel_cmd,
948 self.kernel = self._launch_kernel(kernel_cmd,
949 ipython_kernel=self.ipython_kernel,
949 ipython_kernel=self.ipython_kernel,
950 **kw)
950 **kw)
951
951
952 def shutdown_kernel(self, now=False, restart=False):
952 def shutdown_kernel(self, now=False, restart=False):
953 """Attempts to the stop the kernel process cleanly.
953 """Attempts to the stop the kernel process cleanly.
954
954
955 This attempts to shutdown the kernels cleanly by:
955 This attempts to shutdown the kernels cleanly by:
956
956
957 1. Sending it a shutdown message over the shell channel.
957 1. Sending it a shutdown message over the shell channel.
958 2. If that fails, the kernel is shutdown forcibly by sending it
958 2. If that fails, the kernel is shutdown forcibly by sending it
959 a signal.
959 a signal.
960
960
961 Parameters:
961 Parameters:
962 -----------
962 -----------
963 now : bool
963 now : bool
964 Should the kernel be forcible killed *now*. This skips the
964 Should the kernel be forcible killed *now*. This skips the
965 first, nice shutdown attempt.
965 first, nice shutdown attempt.
966 restart: bool
966 restart: bool
967 Will this kernel be restarted after it is shutdown. When this
967 Will this kernel be restarted after it is shutdown. When this
968 is True, connection files will not be cleaned up.
968 is True, connection files will not be cleaned up.
969 """
969 """
970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
971 if sys.platform == 'win32':
971 if sys.platform == 'win32':
972 self._kill_kernel()
972 self._kill_kernel()
973 return
973 return
974
974
975 # Pause the heart beat channel if it exists.
975 # Pause the heart beat channel if it exists.
976 if self._hb_channel is not None:
976 if self._hb_channel is not None:
977 self._hb_channel.pause()
977 self._hb_channel.pause()
978
978
979 if now:
979 if now:
980 if self.has_kernel:
980 if self.has_kernel:
981 self._kill_kernel()
981 self._kill_kernel()
982 else:
982 else:
983 # Don't send any additional kernel kill messages immediately, to give
983 # Don't send any additional kernel kill messages immediately, to give
984 # the kernel a chance to properly execute shutdown actions. Wait for at
984 # the kernel a chance to properly execute shutdown actions. Wait for at
985 # most 1s, checking every 0.1s.
985 # most 1s, checking every 0.1s.
986 self.shell_channel.shutdown(restart=restart)
986 self.shell_channel.shutdown(restart=restart)
987 for i in range(10):
987 for i in range(10):
988 if self.is_alive:
988 if self.is_alive:
989 time.sleep(0.1)
989 time.sleep(0.1)
990 else:
990 else:
991 break
991 break
992 else:
992 else:
993 # OK, we've waited long enough.
993 # OK, we've waited long enough.
994 if self.has_kernel:
994 if self.has_kernel:
995 self._kill_kernel()
995 self._kill_kernel()
996
996
997 if not restart:
997 if not restart:
998 self.cleanup_connection_file()
998 self.cleanup_connection_file()
999 self.cleanup_ipc_files()
999 self.cleanup_ipc_files()
1000 else:
1000 else:
1001 self.cleanup_ipc_files()
1001 self.cleanup_ipc_files()
1002
1002
1003 def restart_kernel(self, now=False, **kw):
1003 def restart_kernel(self, now=False, **kw):
1004 """Restarts a kernel with the arguments that were used to launch it.
1004 """Restarts a kernel with the arguments that were used to launch it.
1005
1005
1006 If the old kernel was launched with random ports, the same ports will be
1006 If the old kernel was launched with random ports, the same ports will be
1007 used for the new kernel. The same connection file is used again.
1007 used for the new kernel. The same connection file is used again.
1008
1008
1009 Parameters
1009 Parameters
1010 ----------
1010 ----------
1011 now : bool, optional
1011 now : bool, optional
1012 If True, the kernel is forcefully restarted *immediately*, without
1012 If True, the kernel is forcefully restarted *immediately*, without
1013 having a chance to do any cleanup action. Otherwise the kernel is
1013 having a chance to do any cleanup action. Otherwise the kernel is
1014 given 1s to clean up before a forceful restart is issued.
1014 given 1s to clean up before a forceful restart is issued.
1015
1015
1016 In all cases the kernel is restarted, the only difference is whether
1016 In all cases the kernel is restarted, the only difference is whether
1017 it is given a chance to perform a clean shutdown or not.
1017 it is given a chance to perform a clean shutdown or not.
1018
1018
1019 **kw : optional
1019 **kw : optional
1020 Any options specified here will overwrite those used to launch the
1020 Any options specified here will overwrite those used to launch the
1021 kernel.
1021 kernel.
1022 """
1022 """
1023 if self._launch_args is None:
1023 if self._launch_args is None:
1024 raise RuntimeError("Cannot restart the kernel. "
1024 raise RuntimeError("Cannot restart the kernel. "
1025 "No previous call to 'start_kernel'.")
1025 "No previous call to 'start_kernel'.")
1026 else:
1026 else:
1027 # Stop currently running kernel.
1027 # Stop currently running kernel.
1028 self.shutdown_kernel(now=now, restart=True)
1028 self.shutdown_kernel(now=now, restart=True)
1029
1029
1030 # Start new kernel.
1030 # Start new kernel.
1031 self._launch_args.update(kw)
1031 self._launch_args.update(kw)
1032 self.start_kernel(**self._launch_args)
1032 self.start_kernel(**self._launch_args)
1033
1033
1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1035 # unless there is some delay here.
1035 # unless there is some delay here.
1036 if sys.platform == 'win32':
1036 if sys.platform == 'win32':
1037 time.sleep(0.2)
1037 time.sleep(0.2)
1038
1038
1039 @property
1039 @property
1040 def has_kernel(self):
1040 def has_kernel(self):
1041 """Has a kernel been started that we are managing."""
1041 """Has a kernel been started that we are managing."""
1042 return self.kernel is not None
1042 return self.kernel is not None
1043
1043
1044 def _kill_kernel(self):
1044 def _kill_kernel(self):
1045 """Kill the running kernel.
1045 """Kill the running kernel.
1046
1046
1047 This is a private method, callers should use shutdown_kernel(now=True).
1047 This is a private method, callers should use shutdown_kernel(now=True).
1048 """
1048 """
1049 if self.has_kernel:
1049 if self.has_kernel:
1050 # Pause the heart beat channel if it exists.
1050 # Pause the heart beat channel if it exists.
1051 if self._hb_channel is not None:
1051 if self._hb_channel is not None:
1052 self._hb_channel.pause()
1052 self._hb_channel.pause()
1053
1053
1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1055 # TerminateProcess() on Win32).
1055 # TerminateProcess() on Win32).
1056 try:
1056 try:
1057 self.kernel.kill()
1057 self.kernel.kill()
1058 except OSError as e:
1058 except OSError as e:
1059 # In Windows, we will get an Access Denied error if the process
1059 # In Windows, we will get an Access Denied error if the process
1060 # has already terminated. Ignore it.
1060 # has already terminated. Ignore it.
1061 if sys.platform == 'win32':
1061 if sys.platform == 'win32':
1062 if e.winerror != 5:
1062 if e.winerror != 5:
1063 raise
1063 raise
1064 # On Unix, we may get an ESRCH error if the process has already
1064 # On Unix, we may get an ESRCH error if the process has already
1065 # terminated. Ignore it.
1065 # terminated. Ignore it.
1066 else:
1066 else:
1067 from errno import ESRCH
1067 from errno import ESRCH
1068 if e.errno != ESRCH:
1068 if e.errno != ESRCH:
1069 raise
1069 raise
1070
1070
1071 # Block until the kernel terminates.
1071 # Block until the kernel terminates.
1072 self.kernel.wait()
1072 self.kernel.wait()
1073 self.kernel = None
1073 self.kernel = None
1074 else:
1074 else:
1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1076
1076
1077 def interrupt_kernel(self):
1077 def interrupt_kernel(self):
1078 """Interrupts the kernel by sending it a signal.
1078 """Interrupts the kernel by sending it a signal.
1079
1079
1080 Unlike ``signal_kernel``, this operation is well supported on all
1080 Unlike ``signal_kernel``, this operation is well supported on all
1081 platforms.
1081 platforms.
1082 """
1082 """
1083 if self.has_kernel:
1083 if self.has_kernel:
1084 if sys.platform == 'win32':
1084 if sys.platform == 'win32':
1085 from parentpoller import ParentPollerWindows as Poller
1085 from .zmq.parentpoller import ParentPollerWindows as Poller
1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1087 else:
1087 else:
1088 self.kernel.send_signal(signal.SIGINT)
1088 self.kernel.send_signal(signal.SIGINT)
1089 else:
1089 else:
1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1091
1091
1092 def signal_kernel(self, signum):
1092 def signal_kernel(self, signum):
1093 """Sends a signal to the kernel.
1093 """Sends a signal to the kernel.
1094
1094
1095 Note that since only SIGTERM is supported on Windows, this function is
1095 Note that since only SIGTERM is supported on Windows, this function is
1096 only useful on Unix systems.
1096 only useful on Unix systems.
1097 """
1097 """
1098 if self.has_kernel:
1098 if self.has_kernel:
1099 self.kernel.send_signal(signum)
1099 self.kernel.send_signal(signum)
1100 else:
1100 else:
1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1102
1102
1103 @property
1103 @property
1104 def is_alive(self):
1104 def is_alive(self):
1105 """Is the kernel process still running?"""
1105 """Is the kernel process still running?"""
1106 if self.has_kernel:
1106 if self.has_kernel:
1107 if self.kernel.poll() is None:
1107 if self.kernel.poll() is None:
1108 return True
1108 return True
1109 else:
1109 else:
1110 return False
1110 return False
1111 elif self._hb_channel is not None:
1111 elif self._hb_channel is not None:
1112 # We didn't start the kernel with this KernelManager so we
1112 # We didn't start the kernel with this KernelManager so we
1113 # use the heartbeat.
1113 # use the heartbeat.
1114 return self._hb_channel.is_beating()
1114 return self._hb_channel.is_beating()
1115 else:
1115 else:
1116 # no heartbeat and not local, we can't tell if it's running,
1116 # no heartbeat and not local, we can't tell if it's running,
1117 # so naively return True
1117 # so naively return True
1118 return True
1118 return True
1119
1119
1120
1120
1121 #-----------------------------------------------------------------------------
1121 #-----------------------------------------------------------------------------
1122 # ABC Registration
1122 # ABC Registration
1123 #-----------------------------------------------------------------------------
1123 #-----------------------------------------------------------------------------
1124
1124
1125 ShellChannelABC.register(ShellChannel)
1125 ShellChannelABC.register(ShellChannel)
1126 IOPubChannelABC.register(IOPubChannel)
1126 IOPubChannelABC.register(IOPubChannel)
1127 HBChannelABC.register(HBChannel)
1127 HBChannelABC.register(HBChannel)
1128 StdInChannelABC.register(StdInChannel)
1128 StdInChannelABC.register(StdInChannel)
1129 KernelManagerABC.register(KernelManager)
1129 KernelManagerABC.register(KernelManager)
1130
1130
General Comments 0
You need to be logged in to leave comments. Login now