##// END OF EJS Templates
remove debug print statement
MinRK -
Show More
@@ -1,1131 +1,1130 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import absolute_import
18 from __future__ import absolute_import
19
19
20 # Standard library imports
20 # Standard library imports
21 import atexit
21 import atexit
22 import errno
22 import errno
23 import json
23 import json
24 from subprocess import Popen
24 from subprocess import Popen
25 import os
25 import os
26 import signal
26 import signal
27 import sys
27 import sys
28 from threading import Thread
28 from threading import Thread
29 import time
29 import time
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 # during garbage collection of threads at exit:
34 # during garbage collection of threads at exit:
35 from zmq import ZMQError
35 from zmq import ZMQError
36 from zmq.eventloop import ioloop, zmqstream
36 from zmq.eventloop import ioloop, zmqstream
37
37
38 # Local imports
38 # Local imports
39 from IPython.config.configurable import Configurable
39 from IPython.config.configurable import Configurable
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 from IPython.utils.traitlets import (
41 from IPython.utils.traitlets import (
42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
43 )
43 )
44 from IPython.utils.py3compat import str_to_bytes
44 from IPython.utils.py3compat import str_to_bytes
45 from IPython.kernel import (
45 from IPython.kernel import (
46 write_connection_file,
46 write_connection_file,
47 make_ipkernel_cmd,
47 make_ipkernel_cmd,
48 launch_kernel,
48 launch_kernel,
49 )
49 )
50 from .zmq.session import Session
50 from .zmq.session import Session
51 from .kernelmanagerabc import (
51 from .kernelmanagerabc import (
52 ShellChannelABC, IOPubChannelABC,
52 ShellChannelABC, IOPubChannelABC,
53 HBChannelABC, StdInChannelABC,
53 HBChannelABC, StdInChannelABC,
54 KernelManagerABC
54 KernelManagerABC
55 )
55 )
56
56
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # Constants and exceptions
59 # Constants and exceptions
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62 class InvalidPortNumber(Exception):
62 class InvalidPortNumber(Exception):
63 pass
63 pass
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Utility functions
66 # Utility functions
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 # some utilities to validate message structure, these might get moved elsewhere
69 # some utilities to validate message structure, these might get moved elsewhere
70 # if they prove to have more generic utility
70 # if they prove to have more generic utility
71
71
72 def validate_string_list(lst):
72 def validate_string_list(lst):
73 """Validate that the input is a list of strings.
73 """Validate that the input is a list of strings.
74
74
75 Raises ValueError if not."""
75 Raises ValueError if not."""
76 if not isinstance(lst, list):
76 if not isinstance(lst, list):
77 raise ValueError('input %r must be a list' % lst)
77 raise ValueError('input %r must be a list' % lst)
78 for x in lst:
78 for x in lst:
79 if not isinstance(x, basestring):
79 if not isinstance(x, basestring):
80 raise ValueError('element %r in list must be a string' % x)
80 raise ValueError('element %r in list must be a string' % x)
81
81
82
82
83 def validate_string_dict(dct):
83 def validate_string_dict(dct):
84 """Validate that the input is a dict with string keys and values.
84 """Validate that the input is a dict with string keys and values.
85
85
86 Raises ValueError if not."""
86 Raises ValueError if not."""
87 for k,v in dct.iteritems():
87 for k,v in dct.iteritems():
88 if not isinstance(k, basestring):
88 if not isinstance(k, basestring):
89 raise ValueError('key %r in dict must be a string' % k)
89 raise ValueError('key %r in dict must be a string' % k)
90 if not isinstance(v, basestring):
90 if not isinstance(v, basestring):
91 raise ValueError('value %r in dict must be a string' % v)
91 raise ValueError('value %r in dict must be a string' % v)
92
92
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # ZMQ Socket Channel classes
95 # ZMQ Socket Channel classes
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97
97
98 class ZMQSocketChannel(Thread):
98 class ZMQSocketChannel(Thread):
99 """The base class for the channels that use ZMQ sockets."""
99 """The base class for the channels that use ZMQ sockets."""
100 context = None
100 context = None
101 session = None
101 session = None
102 socket = None
102 socket = None
103 ioloop = None
103 ioloop = None
104 stream = None
104 stream = None
105 _address = None
105 _address = None
106 _exiting = False
106 _exiting = False
107
107
108 def __init__(self, context, session, address):
108 def __init__(self, context, session, address):
109 """Create a channel.
109 """Create a channel.
110
110
111 Parameters
111 Parameters
112 ----------
112 ----------
113 context : :class:`zmq.Context`
113 context : :class:`zmq.Context`
114 The ZMQ context to use.
114 The ZMQ context to use.
115 session : :class:`session.Session`
115 session : :class:`session.Session`
116 The session to use.
116 The session to use.
117 address : zmq url
117 address : zmq url
118 Standard (ip, port) tuple that the kernel is listening on.
118 Standard (ip, port) tuple that the kernel is listening on.
119 """
119 """
120 super(ZMQSocketChannel, self).__init__()
120 super(ZMQSocketChannel, self).__init__()
121 self.daemon = True
121 self.daemon = True
122
122
123 self.context = context
123 self.context = context
124 self.session = session
124 self.session = session
125 if isinstance(address, tuple):
125 if isinstance(address, tuple):
126 if address[1] == 0:
126 if address[1] == 0:
127 message = 'The port number for a channel cannot be 0.'
127 message = 'The port number for a channel cannot be 0.'
128 raise InvalidPortNumber(message)
128 raise InvalidPortNumber(message)
129 address = "tcp://%s:%i" % address
129 address = "tcp://%s:%i" % address
130 self._address = address
130 self._address = address
131 atexit.register(self._notice_exit)
131 atexit.register(self._notice_exit)
132
132
133 def _notice_exit(self):
133 def _notice_exit(self):
134 self._exiting = True
134 self._exiting = True
135
135
136 def _run_loop(self):
136 def _run_loop(self):
137 """Run my loop, ignoring EINTR events in the poller"""
137 """Run my loop, ignoring EINTR events in the poller"""
138 while True:
138 while True:
139 try:
139 try:
140 self.ioloop.start()
140 self.ioloop.start()
141 except ZMQError as e:
141 except ZMQError as e:
142 if e.errno == errno.EINTR:
142 if e.errno == errno.EINTR:
143 continue
143 continue
144 else:
144 else:
145 raise
145 raise
146 except Exception:
146 except Exception:
147 if self._exiting:
147 if self._exiting:
148 break
148 break
149 else:
149 else:
150 raise
150 raise
151 else:
151 else:
152 break
152 break
153
153
154 def stop(self):
154 def stop(self):
155 """Stop the channel's event loop and join its thread.
155 """Stop the channel's event loop and join its thread.
156
156
157 This calls :method:`Thread.join` and returns when the thread
157 This calls :method:`Thread.join` and returns when the thread
158 terminates. :class:`RuntimeError` will be raised if
158 terminates. :class:`RuntimeError` will be raised if
159 :method:`self.start` is called again.
159 :method:`self.start` is called again.
160 """
160 """
161 self.join()
161 self.join()
162
162
163 @property
163 @property
164 def address(self):
164 def address(self):
165 """Get the channel's address as a zmq url string.
165 """Get the channel's address as a zmq url string.
166
166
167 These URLS have the form: 'tcp://127.0.0.1:5555'.
167 These URLS have the form: 'tcp://127.0.0.1:5555'.
168 """
168 """
169 return self._address
169 return self._address
170
170
171 def _queue_send(self, msg):
171 def _queue_send(self, msg):
172 """Queue a message to be sent from the IOLoop's thread.
172 """Queue a message to be sent from the IOLoop's thread.
173
173
174 Parameters
174 Parameters
175 ----------
175 ----------
176 msg : message to send
176 msg : message to send
177
177
178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
179 thread control of the action.
179 thread control of the action.
180 """
180 """
181 def thread_send():
181 def thread_send():
182 self.session.send(self.stream, msg)
182 self.session.send(self.stream, msg)
183 self.ioloop.add_callback(thread_send)
183 self.ioloop.add_callback(thread_send)
184
184
185 def _handle_recv(self, msg):
185 def _handle_recv(self, msg):
186 """Callback for stream.on_recv.
186 """Callback for stream.on_recv.
187
187
188 Unpacks message, and calls handlers with it.
188 Unpacks message, and calls handlers with it.
189 """
189 """
190 ident,smsg = self.session.feed_identities(msg)
190 ident,smsg = self.session.feed_identities(msg)
191 self.call_handlers(self.session.unserialize(smsg))
191 self.call_handlers(self.session.unserialize(smsg))
192
192
193
193
194
194
195 class ShellChannel(ZMQSocketChannel):
195 class ShellChannel(ZMQSocketChannel):
196 """The shell channel for issuing request/replies to the kernel."""
196 """The shell channel for issuing request/replies to the kernel."""
197
197
198 command_queue = None
198 command_queue = None
199 # flag for whether execute requests should be allowed to call raw_input:
199 # flag for whether execute requests should be allowed to call raw_input:
200 allow_stdin = True
200 allow_stdin = True
201
201
202 def __init__(self, context, session, address):
202 def __init__(self, context, session, address):
203 super(ShellChannel, self).__init__(context, session, address)
203 super(ShellChannel, self).__init__(context, session, address)
204 self.ioloop = ioloop.IOLoop()
204 self.ioloop = ioloop.IOLoop()
205
205
206 def run(self):
206 def run(self):
207 """The thread's main activity. Call start() instead."""
207 """The thread's main activity. Call start() instead."""
208 self.socket = self.context.socket(zmq.DEALER)
208 self.socket = self.context.socket(zmq.DEALER)
209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
210 self.socket.connect(self.address)
210 self.socket.connect(self.address)
211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 self.stream.on_recv(self._handle_recv)
212 self.stream.on_recv(self._handle_recv)
213 self._run_loop()
213 self._run_loop()
214 try:
214 try:
215 self.socket.close()
215 self.socket.close()
216 except:
216 except:
217 pass
217 pass
218
218
219 def stop(self):
219 def stop(self):
220 """Stop the channel's event loop and join its thread."""
220 """Stop the channel's event loop and join its thread."""
221 self.ioloop.stop()
221 self.ioloop.stop()
222 super(ShellChannel, self).stop()
222 super(ShellChannel, self).stop()
223
223
224 def call_handlers(self, msg):
224 def call_handlers(self, msg):
225 """This method is called in the ioloop thread when a message arrives.
225 """This method is called in the ioloop thread when a message arrives.
226
226
227 Subclasses should override this method to handle incoming messages.
227 Subclasses should override this method to handle incoming messages.
228 It is important to remember that this method is called in the thread
228 It is important to remember that this method is called in the thread
229 so that some logic must be done to ensure that the application leve
229 so that some logic must be done to ensure that the application leve
230 handlers are called in the application thread.
230 handlers are called in the application thread.
231 """
231 """
232 raise NotImplementedError('call_handlers must be defined in a subclass.')
232 raise NotImplementedError('call_handlers must be defined in a subclass.')
233
233
234 def execute(self, code, silent=False, store_history=True,
234 def execute(self, code, silent=False, store_history=True,
235 user_variables=None, user_expressions=None, allow_stdin=None):
235 user_variables=None, user_expressions=None, allow_stdin=None):
236 """Execute code in the kernel.
236 """Execute code in the kernel.
237
237
238 Parameters
238 Parameters
239 ----------
239 ----------
240 code : str
240 code : str
241 A string of Python code.
241 A string of Python code.
242
242
243 silent : bool, optional (default False)
243 silent : bool, optional (default False)
244 If set, the kernel will execute the code as quietly possible, and
244 If set, the kernel will execute the code as quietly possible, and
245 will force store_history to be False.
245 will force store_history to be False.
246
246
247 store_history : bool, optional (default True)
247 store_history : bool, optional (default True)
248 If set, the kernel will store command history. This is forced
248 If set, the kernel will store command history. This is forced
249 to be False if silent is True.
249 to be False if silent is True.
250
250
251 user_variables : list, optional
251 user_variables : list, optional
252 A list of variable names to pull from the user's namespace. They
252 A list of variable names to pull from the user's namespace. They
253 will come back as a dict with these names as keys and their
253 will come back as a dict with these names as keys and their
254 :func:`repr` as values.
254 :func:`repr` as values.
255
255
256 user_expressions : dict, optional
256 user_expressions : dict, optional
257 A dict mapping names to expressions to be evaluated in the user's
257 A dict mapping names to expressions to be evaluated in the user's
258 dict. The expression values are returned as strings formatted using
258 dict. The expression values are returned as strings formatted using
259 :func:`repr`.
259 :func:`repr`.
260
260
261 allow_stdin : bool, optional (default self.allow_stdin)
261 allow_stdin : bool, optional (default self.allow_stdin)
262 Flag for whether the kernel can send stdin requests to frontends.
262 Flag for whether the kernel can send stdin requests to frontends.
263
263
264 Some frontends (e.g. the Notebook) do not support stdin requests.
264 Some frontends (e.g. the Notebook) do not support stdin requests.
265 If raw_input is called from code executed from such a frontend, a
265 If raw_input is called from code executed from such a frontend, a
266 StdinNotImplementedError will be raised.
266 StdinNotImplementedError will be raised.
267
267
268 Returns
268 Returns
269 -------
269 -------
270 The msg_id of the message sent.
270 The msg_id of the message sent.
271 """
271 """
272 if user_variables is None:
272 if user_variables is None:
273 user_variables = []
273 user_variables = []
274 if user_expressions is None:
274 if user_expressions is None:
275 user_expressions = {}
275 user_expressions = {}
276 if allow_stdin is None:
276 if allow_stdin is None:
277 allow_stdin = self.allow_stdin
277 allow_stdin = self.allow_stdin
278
278
279
279
280 # Don't waste network traffic if inputs are invalid
280 # Don't waste network traffic if inputs are invalid
281 if not isinstance(code, basestring):
281 if not isinstance(code, basestring):
282 raise ValueError('code %r must be a string' % code)
282 raise ValueError('code %r must be a string' % code)
283 validate_string_list(user_variables)
283 validate_string_list(user_variables)
284 validate_string_dict(user_expressions)
284 validate_string_dict(user_expressions)
285
285
286 # Create class for content/msg creation. Related to, but possibly
286 # Create class for content/msg creation. Related to, but possibly
287 # not in Session.
287 # not in Session.
288 content = dict(code=code, silent=silent, store_history=store_history,
288 content = dict(code=code, silent=silent, store_history=store_history,
289 user_variables=user_variables,
289 user_variables=user_variables,
290 user_expressions=user_expressions,
290 user_expressions=user_expressions,
291 allow_stdin=allow_stdin,
291 allow_stdin=allow_stdin,
292 )
292 )
293 msg = self.session.msg('execute_request', content)
293 msg = self.session.msg('execute_request', content)
294 self._queue_send(msg)
294 self._queue_send(msg)
295 return msg['header']['msg_id']
295 return msg['header']['msg_id']
296
296
297 def complete(self, text, line, cursor_pos, block=None):
297 def complete(self, text, line, cursor_pos, block=None):
298 """Tab complete text in the kernel's namespace.
298 """Tab complete text in the kernel's namespace.
299
299
300 Parameters
300 Parameters
301 ----------
301 ----------
302 text : str
302 text : str
303 The text to complete.
303 The text to complete.
304 line : str
304 line : str
305 The full line of text that is the surrounding context for the
305 The full line of text that is the surrounding context for the
306 text to complete.
306 text to complete.
307 cursor_pos : int
307 cursor_pos : int
308 The position of the cursor in the line where the completion was
308 The position of the cursor in the line where the completion was
309 requested.
309 requested.
310 block : str, optional
310 block : str, optional
311 The full block of code in which the completion is being requested.
311 The full block of code in which the completion is being requested.
312
312
313 Returns
313 Returns
314 -------
314 -------
315 The msg_id of the message sent.
315 The msg_id of the message sent.
316 """
316 """
317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
318 msg = self.session.msg('complete_request', content)
318 msg = self.session.msg('complete_request', content)
319 self._queue_send(msg)
319 self._queue_send(msg)
320 return msg['header']['msg_id']
320 return msg['header']['msg_id']
321
321
322 def object_info(self, oname, detail_level=0):
322 def object_info(self, oname, detail_level=0):
323 """Get metadata information about an object in the kernel's namespace.
323 """Get metadata information about an object in the kernel's namespace.
324
324
325 Parameters
325 Parameters
326 ----------
326 ----------
327 oname : str
327 oname : str
328 A string specifying the object name.
328 A string specifying the object name.
329 detail_level : int, optional
329 detail_level : int, optional
330 The level of detail for the introspection (0-2)
330 The level of detail for the introspection (0-2)
331
331
332 Returns
332 Returns
333 -------
333 -------
334 The msg_id of the message sent.
334 The msg_id of the message sent.
335 """
335 """
336 content = dict(oname=oname, detail_level=detail_level)
336 content = dict(oname=oname, detail_level=detail_level)
337 msg = self.session.msg('object_info_request', content)
337 msg = self.session.msg('object_info_request', content)
338 self._queue_send(msg)
338 self._queue_send(msg)
339 return msg['header']['msg_id']
339 return msg['header']['msg_id']
340
340
341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
342 """Get entries from the kernel's history list.
342 """Get entries from the kernel's history list.
343
343
344 Parameters
344 Parameters
345 ----------
345 ----------
346 raw : bool
346 raw : bool
347 If True, return the raw input.
347 If True, return the raw input.
348 output : bool
348 output : bool
349 If True, then return the output as well.
349 If True, then return the output as well.
350 hist_access_type : str
350 hist_access_type : str
351 'range' (fill in session, start and stop params), 'tail' (fill in n)
351 'range' (fill in session, start and stop params), 'tail' (fill in n)
352 or 'search' (fill in pattern param).
352 or 'search' (fill in pattern param).
353
353
354 session : int
354 session : int
355 For a range request, the session from which to get lines. Session
355 For a range request, the session from which to get lines. Session
356 numbers are positive integers; negative ones count back from the
356 numbers are positive integers; negative ones count back from the
357 current session.
357 current session.
358 start : int
358 start : int
359 The first line number of a history range.
359 The first line number of a history range.
360 stop : int
360 stop : int
361 The final (excluded) line number of a history range.
361 The final (excluded) line number of a history range.
362
362
363 n : int
363 n : int
364 The number of lines of history to get for a tail request.
364 The number of lines of history to get for a tail request.
365
365
366 pattern : str
366 pattern : str
367 The glob-syntax pattern for a search request.
367 The glob-syntax pattern for a search request.
368
368
369 Returns
369 Returns
370 -------
370 -------
371 The msg_id of the message sent.
371 The msg_id of the message sent.
372 """
372 """
373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
374 **kwargs)
374 **kwargs)
375 msg = self.session.msg('history_request', content)
375 msg = self.session.msg('history_request', content)
376 self._queue_send(msg)
376 self._queue_send(msg)
377 return msg['header']['msg_id']
377 return msg['header']['msg_id']
378
378
379 def kernel_info(self):
379 def kernel_info(self):
380 """Request kernel info."""
380 """Request kernel info."""
381 msg = self.session.msg('kernel_info_request')
381 msg = self.session.msg('kernel_info_request')
382 self._queue_send(msg)
382 self._queue_send(msg)
383 return msg['header']['msg_id']
383 return msg['header']['msg_id']
384
384
385 def shutdown(self, restart=False):
385 def shutdown(self, restart=False):
386 """Request an immediate kernel shutdown.
386 """Request an immediate kernel shutdown.
387
387
388 Upon receipt of the (empty) reply, client code can safely assume that
388 Upon receipt of the (empty) reply, client code can safely assume that
389 the kernel has shut down and it's safe to forcefully terminate it if
389 the kernel has shut down and it's safe to forcefully terminate it if
390 it's still alive.
390 it's still alive.
391
391
392 The kernel will send the reply via a function registered with Python's
392 The kernel will send the reply via a function registered with Python's
393 atexit module, ensuring it's truly done as the kernel is done with all
393 atexit module, ensuring it's truly done as the kernel is done with all
394 normal operation.
394 normal operation.
395 """
395 """
396 # Send quit message to kernel. Once we implement kernel-side setattr,
396 # Send quit message to kernel. Once we implement kernel-side setattr,
397 # this should probably be done that way, but for now this will do.
397 # this should probably be done that way, but for now this will do.
398 msg = self.session.msg('shutdown_request', {'restart':restart})
398 msg = self.session.msg('shutdown_request', {'restart':restart})
399 self._queue_send(msg)
399 self._queue_send(msg)
400 return msg['header']['msg_id']
400 return msg['header']['msg_id']
401
401
402
402
403
403
404 class IOPubChannel(ZMQSocketChannel):
404 class IOPubChannel(ZMQSocketChannel):
405 """The iopub channel which listens for messages that the kernel publishes.
405 """The iopub channel which listens for messages that the kernel publishes.
406
406
407 This channel is where all output is published to frontends.
407 This channel is where all output is published to frontends.
408 """
408 """
409
409
410 def __init__(self, context, session, address):
410 def __init__(self, context, session, address):
411 super(IOPubChannel, self).__init__(context, session, address)
411 super(IOPubChannel, self).__init__(context, session, address)
412 self.ioloop = ioloop.IOLoop()
412 self.ioloop = ioloop.IOLoop()
413
413
414 def run(self):
414 def run(self):
415 """The thread's main activity. Call start() instead."""
415 """The thread's main activity. Call start() instead."""
416 self.socket = self.context.socket(zmq.SUB)
416 self.socket = self.context.socket(zmq.SUB)
417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
419 self.socket.connect(self.address)
419 self.socket.connect(self.address)
420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
421 self.stream.on_recv(self._handle_recv)
421 self.stream.on_recv(self._handle_recv)
422 self._run_loop()
422 self._run_loop()
423 try:
423 try:
424 self.socket.close()
424 self.socket.close()
425 except:
425 except:
426 pass
426 pass
427
427
428 def stop(self):
428 def stop(self):
429 """Stop the channel's event loop and join its thread."""
429 """Stop the channel's event loop and join its thread."""
430 self.ioloop.stop()
430 self.ioloop.stop()
431 super(IOPubChannel, self).stop()
431 super(IOPubChannel, self).stop()
432
432
433 def call_handlers(self, msg):
433 def call_handlers(self, msg):
434 """This method is called in the ioloop thread when a message arrives.
434 """This method is called in the ioloop thread when a message arrives.
435
435
436 Subclasses should override this method to handle incoming messages.
436 Subclasses should override this method to handle incoming messages.
437 It is important to remember that this method is called in the thread
437 It is important to remember that this method is called in the thread
438 so that some logic must be done to ensure that the application leve
438 so that some logic must be done to ensure that the application leve
439 handlers are called in the application thread.
439 handlers are called in the application thread.
440 """
440 """
441 raise NotImplementedError('call_handlers must be defined in a subclass.')
441 raise NotImplementedError('call_handlers must be defined in a subclass.')
442
442
443 def flush(self, timeout=1.0):
443 def flush(self, timeout=1.0):
444 """Immediately processes all pending messages on the iopub channel.
444 """Immediately processes all pending messages on the iopub channel.
445
445
446 Callers should use this method to ensure that :method:`call_handlers`
446 Callers should use this method to ensure that :method:`call_handlers`
447 has been called for all messages that have been received on the
447 has been called for all messages that have been received on the
448 0MQ SUB socket of this channel.
448 0MQ SUB socket of this channel.
449
449
450 This method is thread safe.
450 This method is thread safe.
451
451
452 Parameters
452 Parameters
453 ----------
453 ----------
454 timeout : float, optional
454 timeout : float, optional
455 The maximum amount of time to spend flushing, in seconds. The
455 The maximum amount of time to spend flushing, in seconds. The
456 default is one second.
456 default is one second.
457 """
457 """
458 # We do the IOLoop callback process twice to ensure that the IOLoop
458 # We do the IOLoop callback process twice to ensure that the IOLoop
459 # gets to perform at least one full poll.
459 # gets to perform at least one full poll.
460 stop_time = time.time() + timeout
460 stop_time = time.time() + timeout
461 for i in xrange(2):
461 for i in xrange(2):
462 self._flushed = False
462 self._flushed = False
463 self.ioloop.add_callback(self._flush)
463 self.ioloop.add_callback(self._flush)
464 while not self._flushed and time.time() < stop_time:
464 while not self._flushed and time.time() < stop_time:
465 time.sleep(0.01)
465 time.sleep(0.01)
466
466
467 def _flush(self):
467 def _flush(self):
468 """Callback for :method:`self.flush`."""
468 """Callback for :method:`self.flush`."""
469 self.stream.flush()
469 self.stream.flush()
470 self._flushed = True
470 self._flushed = True
471
471
472
472
473 class StdInChannel(ZMQSocketChannel):
473 class StdInChannel(ZMQSocketChannel):
474 """The stdin channel to handle raw_input requests that the kernel makes."""
474 """The stdin channel to handle raw_input requests that the kernel makes."""
475
475
476 msg_queue = None
476 msg_queue = None
477
477
478 def __init__(self, context, session, address):
478 def __init__(self, context, session, address):
479 super(StdInChannel, self).__init__(context, session, address)
479 super(StdInChannel, self).__init__(context, session, address)
480 self.ioloop = ioloop.IOLoop()
480 self.ioloop = ioloop.IOLoop()
481
481
482 def run(self):
482 def run(self):
483 """The thread's main activity. Call start() instead."""
483 """The thread's main activity. Call start() instead."""
484 self.socket = self.context.socket(zmq.DEALER)
484 self.socket = self.context.socket(zmq.DEALER)
485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
486 self.socket.connect(self.address)
486 self.socket.connect(self.address)
487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
488 self.stream.on_recv(self._handle_recv)
488 self.stream.on_recv(self._handle_recv)
489 self._run_loop()
489 self._run_loop()
490 try:
490 try:
491 self.socket.close()
491 self.socket.close()
492 except:
492 except:
493 pass
493 pass
494
494
495 def stop(self):
495 def stop(self):
496 """Stop the channel's event loop and join its thread."""
496 """Stop the channel's event loop and join its thread."""
497 self.ioloop.stop()
497 self.ioloop.stop()
498 super(StdInChannel, self).stop()
498 super(StdInChannel, self).stop()
499
499
500 def call_handlers(self, msg):
500 def call_handlers(self, msg):
501 """This method is called in the ioloop thread when a message arrives.
501 """This method is called in the ioloop thread when a message arrives.
502
502
503 Subclasses should override this method to handle incoming messages.
503 Subclasses should override this method to handle incoming messages.
504 It is important to remember that this method is called in the thread
504 It is important to remember that this method is called in the thread
505 so that some logic must be done to ensure that the application leve
505 so that some logic must be done to ensure that the application leve
506 handlers are called in the application thread.
506 handlers are called in the application thread.
507 """
507 """
508 raise NotImplementedError('call_handlers must be defined in a subclass.')
508 raise NotImplementedError('call_handlers must be defined in a subclass.')
509
509
510 def input(self, string):
510 def input(self, string):
511 """Send a string of raw input to the kernel."""
511 """Send a string of raw input to the kernel."""
512 content = dict(value=string)
512 content = dict(value=string)
513 msg = self.session.msg('input_reply', content)
513 msg = self.session.msg('input_reply', content)
514 self._queue_send(msg)
514 self._queue_send(msg)
515
515
516
516
517 class HBChannel(ZMQSocketChannel):
517 class HBChannel(ZMQSocketChannel):
518 """The heartbeat channel which monitors the kernel heartbeat.
518 """The heartbeat channel which monitors the kernel heartbeat.
519
519
520 Note that the heartbeat channel is paused by default. As long as you start
520 Note that the heartbeat channel is paused by default. As long as you start
521 this channel, the kernel manager will ensure that it is paused and un-paused
521 this channel, the kernel manager will ensure that it is paused and un-paused
522 as appropriate.
522 as appropriate.
523 """
523 """
524
524
525 time_to_dead = 3.0
525 time_to_dead = 3.0
526 socket = None
526 socket = None
527 poller = None
527 poller = None
528 _running = None
528 _running = None
529 _pause = None
529 _pause = None
530 _beating = None
530 _beating = None
531
531
532 def __init__(self, context, session, address):
532 def __init__(self, context, session, address):
533 super(HBChannel, self).__init__(context, session, address)
533 super(HBChannel, self).__init__(context, session, address)
534 self._running = False
534 self._running = False
535 self._pause =True
535 self._pause =True
536 self.poller = zmq.Poller()
536 self.poller = zmq.Poller()
537
537
538 def _create_socket(self):
538 def _create_socket(self):
539 if self.socket is not None:
539 if self.socket is not None:
540 # close previous socket, before opening a new one
540 # close previous socket, before opening a new one
541 self.poller.unregister(self.socket)
541 self.poller.unregister(self.socket)
542 self.socket.close()
542 self.socket.close()
543 self.socket = self.context.socket(zmq.REQ)
543 self.socket = self.context.socket(zmq.REQ)
544 self.socket.setsockopt(zmq.LINGER, 0)
544 self.socket.setsockopt(zmq.LINGER, 0)
545 self.socket.connect(self.address)
545 self.socket.connect(self.address)
546
546
547 self.poller.register(self.socket, zmq.POLLIN)
547 self.poller.register(self.socket, zmq.POLLIN)
548
548
549 def _poll(self, start_time):
549 def _poll(self, start_time):
550 """poll for heartbeat replies until we reach self.time_to_dead.
550 """poll for heartbeat replies until we reach self.time_to_dead.
551
551
552 Ignores interrupts, and returns the result of poll(), which
552 Ignores interrupts, and returns the result of poll(), which
553 will be an empty list if no messages arrived before the timeout,
553 will be an empty list if no messages arrived before the timeout,
554 or the event tuple if there is a message to receive.
554 or the event tuple if there is a message to receive.
555 """
555 """
556
556
557 until_dead = self.time_to_dead - (time.time() - start_time)
557 until_dead = self.time_to_dead - (time.time() - start_time)
558 # ensure poll at least once
558 # ensure poll at least once
559 until_dead = max(until_dead, 1e-3)
559 until_dead = max(until_dead, 1e-3)
560 events = []
560 events = []
561 while True:
561 while True:
562 try:
562 try:
563 events = self.poller.poll(1000 * until_dead)
563 events = self.poller.poll(1000 * until_dead)
564 except ZMQError as e:
564 except ZMQError as e:
565 if e.errno == errno.EINTR:
565 if e.errno == errno.EINTR:
566 # ignore interrupts during heartbeat
566 # ignore interrupts during heartbeat
567 # this may never actually happen
567 # this may never actually happen
568 until_dead = self.time_to_dead - (time.time() - start_time)
568 until_dead = self.time_to_dead - (time.time() - start_time)
569 until_dead = max(until_dead, 1e-3)
569 until_dead = max(until_dead, 1e-3)
570 pass
570 pass
571 else:
571 else:
572 raise
572 raise
573 except Exception:
573 except Exception:
574 if self._exiting:
574 if self._exiting:
575 break
575 break
576 else:
576 else:
577 raise
577 raise
578 else:
578 else:
579 break
579 break
580 return events
580 return events
581
581
582 def run(self):
582 def run(self):
583 """The thread's main activity. Call start() instead."""
583 """The thread's main activity. Call start() instead."""
584 self._create_socket()
584 self._create_socket()
585 self._running = True
585 self._running = True
586 self._beating = True
586 self._beating = True
587
587
588 while self._running:
588 while self._running:
589 if self._pause:
589 if self._pause:
590 # just sleep, and skip the rest of the loop
590 # just sleep, and skip the rest of the loop
591 time.sleep(self.time_to_dead)
591 time.sleep(self.time_to_dead)
592 continue
592 continue
593
593
594 since_last_heartbeat = 0.0
594 since_last_heartbeat = 0.0
595 # io.rprint('Ping from HB channel') # dbg
595 # io.rprint('Ping from HB channel') # dbg
596 # no need to catch EFSM here, because the previous event was
596 # no need to catch EFSM here, because the previous event was
597 # either a recv or connect, which cannot be followed by EFSM
597 # either a recv or connect, which cannot be followed by EFSM
598 self.socket.send(b'ping')
598 self.socket.send(b'ping')
599 request_time = time.time()
599 request_time = time.time()
600 ready = self._poll(request_time)
600 ready = self._poll(request_time)
601 if ready:
601 if ready:
602 self._beating = True
602 self._beating = True
603 # the poll above guarantees we have something to recv
603 # the poll above guarantees we have something to recv
604 self.socket.recv()
604 self.socket.recv()
605 # sleep the remainder of the cycle
605 # sleep the remainder of the cycle
606 remainder = self.time_to_dead - (time.time() - request_time)
606 remainder = self.time_to_dead - (time.time() - request_time)
607 if remainder > 0:
607 if remainder > 0:
608 time.sleep(remainder)
608 time.sleep(remainder)
609 continue
609 continue
610 else:
610 else:
611 # nothing was received within the time limit, signal heart failure
611 # nothing was received within the time limit, signal heart failure
612 self._beating = False
612 self._beating = False
613 since_last_heartbeat = time.time() - request_time
613 since_last_heartbeat = time.time() - request_time
614 self.call_handlers(since_last_heartbeat)
614 self.call_handlers(since_last_heartbeat)
615 # and close/reopen the socket, because the REQ/REP cycle has been broken
615 # and close/reopen the socket, because the REQ/REP cycle has been broken
616 self._create_socket()
616 self._create_socket()
617 continue
617 continue
618 try:
618 try:
619 self.socket.close()
619 self.socket.close()
620 except:
620 except:
621 pass
621 pass
622
622
623 def pause(self):
623 def pause(self):
624 """Pause the heartbeat."""
624 """Pause the heartbeat."""
625 self._pause = True
625 self._pause = True
626
626
627 def unpause(self):
627 def unpause(self):
628 """Unpause the heartbeat."""
628 """Unpause the heartbeat."""
629 self._pause = False
629 self._pause = False
630
630
631 def is_beating(self):
631 def is_beating(self):
632 """Is the heartbeat running and responsive (and not paused)."""
632 """Is the heartbeat running and responsive (and not paused)."""
633 if self.is_alive() and not self._pause and self._beating:
633 if self.is_alive() and not self._pause and self._beating:
634 return True
634 return True
635 else:
635 else:
636 return False
636 return False
637
637
638 def stop(self):
638 def stop(self):
639 """Stop the channel's event loop and join its thread."""
639 """Stop the channel's event loop and join its thread."""
640 self._running = False
640 self._running = False
641 super(HBChannel, self).stop()
641 super(HBChannel, self).stop()
642
642
643 def call_handlers(self, since_last_heartbeat):
643 def call_handlers(self, since_last_heartbeat):
644 """This method is called in the ioloop thread when a message arrives.
644 """This method is called in the ioloop thread when a message arrives.
645
645
646 Subclasses should override this method to handle incoming messages.
646 Subclasses should override this method to handle incoming messages.
647 It is important to remember that this method is called in the thread
647 It is important to remember that this method is called in the thread
648 so that some logic must be done to ensure that the application level
648 so that some logic must be done to ensure that the application level
649 handlers are called in the application thread.
649 handlers are called in the application thread.
650 """
650 """
651 raise NotImplementedError('call_handlers must be defined in a subclass.')
651 raise NotImplementedError('call_handlers must be defined in a subclass.')
652
652
653
653
654 #-----------------------------------------------------------------------------
654 #-----------------------------------------------------------------------------
655 # Main kernel manager class
655 # Main kernel manager class
656 #-----------------------------------------------------------------------------
656 #-----------------------------------------------------------------------------
657
657
658 class KernelManager(Configurable):
658 class KernelManager(Configurable):
659 """Manages a single kernel on this host along with its channels.
659 """Manages a single kernel on this host along with its channels.
660
660
661 There are four channels associated with each kernel:
661 There are four channels associated with each kernel:
662
662
663 * shell: for request/reply calls to the kernel.
663 * shell: for request/reply calls to the kernel.
664 * iopub: for the kernel to publish results to frontends.
664 * iopub: for the kernel to publish results to frontends.
665 * hb: for monitoring the kernel's heartbeat.
665 * hb: for monitoring the kernel's heartbeat.
666 * stdin: for frontends to reply to raw_input calls in the kernel.
666 * stdin: for frontends to reply to raw_input calls in the kernel.
667
667
668 The usage of the channels that this class manages is optional. It is
668 The usage of the channels that this class manages is optional. It is
669 entirely possible to connect to the kernels directly using ZeroMQ
669 entirely possible to connect to the kernels directly using ZeroMQ
670 sockets. These channels are useful primarily for talking to a kernel
670 sockets. These channels are useful primarily for talking to a kernel
671 whose :class:`KernelManager` is in the same process.
671 whose :class:`KernelManager` is in the same process.
672
672
673 This version manages kernels started using Popen.
673 This version manages kernels started using Popen.
674 """
674 """
675 # The PyZMQ Context to use for communication with the kernel.
675 # The PyZMQ Context to use for communication with the kernel.
676 context = Instance(zmq.Context)
676 context = Instance(zmq.Context)
677 def _context_default(self):
677 def _context_default(self):
678 return zmq.Context.instance()
678 return zmq.Context.instance()
679
679
680 # The Session to use for communication with the kernel.
680 # The Session to use for communication with the kernel.
681 session = Instance(Session)
681 session = Instance(Session)
682 def _session_default(self):
682 def _session_default(self):
683 return Session(config=self.config)
683 return Session(config=self.config)
684
684
685 # The kernel process with which the KernelManager is communicating.
685 # The kernel process with which the KernelManager is communicating.
686 # generally a Popen instance
686 # generally a Popen instance
687 kernel = Any()
687 kernel = Any()
688
688
689 kernel_cmd = List(Unicode, config=True,
689 kernel_cmd = List(Unicode, config=True,
690 help="""The Popen Command to launch the kernel.
690 help="""The Popen Command to launch the kernel.
691 Override this if you have a custom
691 Override this if you have a custom
692 """
692 """
693 )
693 )
694 def _kernel_cmd_changed(self, name, old, new):
694 def _kernel_cmd_changed(self, name, old, new):
695 print 'kernel cmd changed', new
696 self.ipython_kernel = False
695 self.ipython_kernel = False
697
696
698 ipython_kernel = Bool(True)
697 ipython_kernel = Bool(True)
699
698
700
699
701 # The addresses for the communication channels.
700 # The addresses for the communication channels.
702 connection_file = Unicode('')
701 connection_file = Unicode('')
703
702
704 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
705
704
706 ip = Unicode(LOCALHOST, config=True,
705 ip = Unicode(LOCALHOST, config=True,
707 help="""Set the kernel\'s IP address [default localhost].
706 help="""Set the kernel\'s IP address [default localhost].
708 If the IP address is something other than localhost, then
707 If the IP address is something other than localhost, then
709 Consoles on other machines will be able to connect
708 Consoles on other machines will be able to connect
710 to the Kernel, so be careful!"""
709 to the Kernel, so be careful!"""
711 )
710 )
712 def _ip_default(self):
711 def _ip_default(self):
713 if self.transport == 'ipc':
712 if self.transport == 'ipc':
714 if self.connection_file:
713 if self.connection_file:
715 return os.path.splitext(self.connection_file)[0] + '-ipc'
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
716 else:
715 else:
717 return 'kernel-ipc'
716 return 'kernel-ipc'
718 else:
717 else:
719 return LOCALHOST
718 return LOCALHOST
720 def _ip_changed(self, name, old, new):
719 def _ip_changed(self, name, old, new):
721 if new == '*':
720 if new == '*':
722 self.ip = '0.0.0.0'
721 self.ip = '0.0.0.0'
723 shell_port = Integer(0)
722 shell_port = Integer(0)
724 iopub_port = Integer(0)
723 iopub_port = Integer(0)
725 stdin_port = Integer(0)
724 stdin_port = Integer(0)
726 hb_port = Integer(0)
725 hb_port = Integer(0)
727
726
728 # The classes to use for the various channels.
727 # The classes to use for the various channels.
729 shell_channel_class = Type(ShellChannel)
728 shell_channel_class = Type(ShellChannel)
730 iopub_channel_class = Type(IOPubChannel)
729 iopub_channel_class = Type(IOPubChannel)
731 stdin_channel_class = Type(StdInChannel)
730 stdin_channel_class = Type(StdInChannel)
732 hb_channel_class = Type(HBChannel)
731 hb_channel_class = Type(HBChannel)
733
732
734 # Protected traits.
733 # Protected traits.
735 _launch_args = Any
734 _launch_args = Any
736 _shell_channel = Any
735 _shell_channel = Any
737 _iopub_channel = Any
736 _iopub_channel = Any
738 _stdin_channel = Any
737 _stdin_channel = Any
739 _hb_channel = Any
738 _hb_channel = Any
740 _connection_file_written=Bool(False)
739 _connection_file_written=Bool(False)
741
740
742 def __del__(self):
741 def __del__(self):
743 self.cleanup_connection_file()
742 self.cleanup_connection_file()
744
743
745 #--------------------------------------------------------------------------
744 #--------------------------------------------------------------------------
746 # Channel management methods:
745 # Channel management methods:
747 #--------------------------------------------------------------------------
746 #--------------------------------------------------------------------------
748
747
749 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
750 """Starts the channels for this kernel.
749 """Starts the channels for this kernel.
751
750
752 This will create the channels if they do not exist and then start
751 This will create the channels if they do not exist and then start
753 them (their activity runs in a thread). If port numbers of 0 are
752 them (their activity runs in a thread). If port numbers of 0 are
754 being used (random ports) then you must first call
753 being used (random ports) then you must first call
755 :method:`start_kernel`. If the channels have been stopped and you
754 :method:`start_kernel`. If the channels have been stopped and you
756 call this, :class:`RuntimeError` will be raised.
755 call this, :class:`RuntimeError` will be raised.
757 """
756 """
758 if shell:
757 if shell:
759 self.shell_channel.start()
758 self.shell_channel.start()
760 if iopub:
759 if iopub:
761 self.iopub_channel.start()
760 self.iopub_channel.start()
762 if stdin:
761 if stdin:
763 self.stdin_channel.start()
762 self.stdin_channel.start()
764 self.shell_channel.allow_stdin = True
763 self.shell_channel.allow_stdin = True
765 else:
764 else:
766 self.shell_channel.allow_stdin = False
765 self.shell_channel.allow_stdin = False
767 if hb:
766 if hb:
768 self.hb_channel.start()
767 self.hb_channel.start()
769
768
770 def stop_channels(self):
769 def stop_channels(self):
771 """Stops all the running channels for this kernel.
770 """Stops all the running channels for this kernel.
772
771
773 This stops their event loops and joins their threads.
772 This stops their event loops and joins their threads.
774 """
773 """
775 if self.shell_channel.is_alive():
774 if self.shell_channel.is_alive():
776 self.shell_channel.stop()
775 self.shell_channel.stop()
777 if self.iopub_channel.is_alive():
776 if self.iopub_channel.is_alive():
778 self.iopub_channel.stop()
777 self.iopub_channel.stop()
779 if self.stdin_channel.is_alive():
778 if self.stdin_channel.is_alive():
780 self.stdin_channel.stop()
779 self.stdin_channel.stop()
781 if self.hb_channel.is_alive():
780 if self.hb_channel.is_alive():
782 self.hb_channel.stop()
781 self.hb_channel.stop()
783
782
784 @property
783 @property
785 def channels_running(self):
784 def channels_running(self):
786 """Are any of the channels created and running?"""
785 """Are any of the channels created and running?"""
787 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
788 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
789
788
790 def _make_url(self, port):
789 def _make_url(self, port):
791 """Make a zmq url with a port.
790 """Make a zmq url with a port.
792
791
793 There are two cases that this handles:
792 There are two cases that this handles:
794
793
795 * tcp: tcp://ip:port
794 * tcp: tcp://ip:port
796 * ipc: ipc://ip-port
795 * ipc: ipc://ip-port
797 """
796 """
798 if self.transport == 'tcp':
797 if self.transport == 'tcp':
799 return "tcp://%s:%i" % (self.ip, port)
798 return "tcp://%s:%i" % (self.ip, port)
800 else:
799 else:
801 return "%s://%s-%s" % (self.transport, self.ip, port)
800 return "%s://%s-%s" % (self.transport, self.ip, port)
802
801
803 @property
802 @property
804 def shell_channel(self):
803 def shell_channel(self):
805 """Get the shell channel object for this kernel."""
804 """Get the shell channel object for this kernel."""
806 if self._shell_channel is None:
805 if self._shell_channel is None:
807 self._shell_channel = self.shell_channel_class(
806 self._shell_channel = self.shell_channel_class(
808 self.context, self.session, self._make_url(self.shell_port)
807 self.context, self.session, self._make_url(self.shell_port)
809 )
808 )
810 return self._shell_channel
809 return self._shell_channel
811
810
812 @property
811 @property
813 def iopub_channel(self):
812 def iopub_channel(self):
814 """Get the iopub channel object for this kernel."""
813 """Get the iopub channel object for this kernel."""
815 if self._iopub_channel is None:
814 if self._iopub_channel is None:
816 self._iopub_channel = self.iopub_channel_class(
815 self._iopub_channel = self.iopub_channel_class(
817 self.context, self.session, self._make_url(self.iopub_port)
816 self.context, self.session, self._make_url(self.iopub_port)
818 )
817 )
819 return self._iopub_channel
818 return self._iopub_channel
820
819
821 @property
820 @property
822 def stdin_channel(self):
821 def stdin_channel(self):
823 """Get the stdin channel object for this kernel."""
822 """Get the stdin channel object for this kernel."""
824 if self._stdin_channel is None:
823 if self._stdin_channel is None:
825 self._stdin_channel = self.stdin_channel_class(
824 self._stdin_channel = self.stdin_channel_class(
826 self.context, self.session, self._make_url(self.stdin_port)
825 self.context, self.session, self._make_url(self.stdin_port)
827 )
826 )
828 return self._stdin_channel
827 return self._stdin_channel
829
828
830 @property
829 @property
831 def hb_channel(self):
830 def hb_channel(self):
832 """Get the hb channel object for this kernel."""
831 """Get the hb channel object for this kernel."""
833 if self._hb_channel is None:
832 if self._hb_channel is None:
834 self._hb_channel = self.hb_channel_class(
833 self._hb_channel = self.hb_channel_class(
835 self.context, self.session, self._make_url(self.hb_port)
834 self.context, self.session, self._make_url(self.hb_port)
836 )
835 )
837 return self._hb_channel
836 return self._hb_channel
838
837
839 #--------------------------------------------------------------------------
838 #--------------------------------------------------------------------------
840 # Connection and ipc file management
839 # Connection and ipc file management
841 #--------------------------------------------------------------------------
840 #--------------------------------------------------------------------------
842
841
843 def cleanup_connection_file(self):
842 def cleanup_connection_file(self):
844 """Cleanup connection file *if we wrote it*
843 """Cleanup connection file *if we wrote it*
845
844
846 Will not raise if the connection file was already removed somehow.
845 Will not raise if the connection file was already removed somehow.
847 """
846 """
848 if self._connection_file_written:
847 if self._connection_file_written:
849 # cleanup connection files on full shutdown of kernel we started
848 # cleanup connection files on full shutdown of kernel we started
850 self._connection_file_written = False
849 self._connection_file_written = False
851 try:
850 try:
852 os.remove(self.connection_file)
851 os.remove(self.connection_file)
853 except (IOError, OSError):
852 except (IOError, OSError):
854 pass
853 pass
855
854
856 def cleanup_ipc_files(self):
855 def cleanup_ipc_files(self):
857 """Cleanup ipc files if we wrote them."""
856 """Cleanup ipc files if we wrote them."""
858 if self.transport != 'ipc':
857 if self.transport != 'ipc':
859 return
858 return
860 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
861 ipcfile = "%s-%i" % (self.ip, port)
860 ipcfile = "%s-%i" % (self.ip, port)
862 try:
861 try:
863 os.remove(ipcfile)
862 os.remove(ipcfile)
864 except (IOError, OSError):
863 except (IOError, OSError):
865 pass
864 pass
866
865
867 def load_connection_file(self):
866 def load_connection_file(self):
868 """Load connection info from JSON dict in self.connection_file."""
867 """Load connection info from JSON dict in self.connection_file."""
869 with open(self.connection_file) as f:
868 with open(self.connection_file) as f:
870 cfg = json.loads(f.read())
869 cfg = json.loads(f.read())
871
870
872 from pprint import pprint
871 from pprint import pprint
873 pprint(cfg)
872 pprint(cfg)
874 self.transport = cfg.get('transport', 'tcp')
873 self.transport = cfg.get('transport', 'tcp')
875 self.ip = cfg['ip']
874 self.ip = cfg['ip']
876 self.shell_port = cfg['shell_port']
875 self.shell_port = cfg['shell_port']
877 self.stdin_port = cfg['stdin_port']
876 self.stdin_port = cfg['stdin_port']
878 self.iopub_port = cfg['iopub_port']
877 self.iopub_port = cfg['iopub_port']
879 self.hb_port = cfg['hb_port']
878 self.hb_port = cfg['hb_port']
880 self.session.key = str_to_bytes(cfg['key'])
879 self.session.key = str_to_bytes(cfg['key'])
881
880
882 def write_connection_file(self):
881 def write_connection_file(self):
883 """Write connection info to JSON dict in self.connection_file."""
882 """Write connection info to JSON dict in self.connection_file."""
884 if self._connection_file_written:
883 if self._connection_file_written:
885 return
884 return
886 self.connection_file,cfg = write_connection_file(self.connection_file,
885 self.connection_file,cfg = write_connection_file(self.connection_file,
887 transport=self.transport, ip=self.ip, key=self.session.key,
886 transport=self.transport, ip=self.ip, key=self.session.key,
888 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
889 shell_port=self.shell_port, hb_port=self.hb_port)
888 shell_port=self.shell_port, hb_port=self.hb_port)
890 # write_connection_file also sets default ports:
889 # write_connection_file also sets default ports:
891 self.shell_port = cfg['shell_port']
890 self.shell_port = cfg['shell_port']
892 self.stdin_port = cfg['stdin_port']
891 self.stdin_port = cfg['stdin_port']
893 self.iopub_port = cfg['iopub_port']
892 self.iopub_port = cfg['iopub_port']
894 self.hb_port = cfg['hb_port']
893 self.hb_port = cfg['hb_port']
895
894
896 self._connection_file_written = True
895 self._connection_file_written = True
897
896
898 #--------------------------------------------------------------------------
897 #--------------------------------------------------------------------------
899 # Kernel management
898 # Kernel management
900 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
901
900
902 def format_kernel_cmd(self, **kw):
901 def format_kernel_cmd(self, **kw):
903 """format templated args (e.g. {connection_file})"""
902 """format templated args (e.g. {connection_file})"""
904 if self.kernel_cmd:
903 if self.kernel_cmd:
905 cmd = self.kernel_cmd
904 cmd = self.kernel_cmd
906 else:
905 else:
907 cmd = make_ipkernel_cmd(
906 cmd = make_ipkernel_cmd(
908 'from IPython.kernel.zmq.kernelapp import main; main()',
907 'from IPython.kernel.zmq.kernelapp import main; main()',
909 **kw
908 **kw
910 )
909 )
911 ns = dict(connection_file=self.connection_file)
910 ns = dict(connection_file=self.connection_file)
912 ns.update(self._launch_args)
911 ns.update(self._launch_args)
913 return [ c.format(**ns) for c in cmd ]
912 return [ c.format(**ns) for c in cmd ]
914
913
915 def _launch_kernel(self, kernel_cmd, **kw):
914 def _launch_kernel(self, kernel_cmd, **kw):
916 """actually launch the kernel
915 """actually launch the kernel
917
916
918 override in a subclass to launch kernel subprocesses differently
917 override in a subclass to launch kernel subprocesses differently
919 """
918 """
920 return launch_kernel(kernel_cmd, **kw)
919 return launch_kernel(kernel_cmd, **kw)
921
920
922 def start_kernel(self, **kw):
921 def start_kernel(self, **kw):
923 """Starts a kernel on this host in a separate process.
922 """Starts a kernel on this host in a separate process.
924
923
925 If random ports (port=0) are being used, this method must be called
924 If random ports (port=0) are being used, this method must be called
926 before the channels are created.
925 before the channels are created.
927
926
928 Parameters:
927 Parameters:
929 -----------
928 -----------
930 **kw : optional
929 **kw : optional
931 keyword arguments that are passed down to build the kernel_cmd
930 keyword arguments that are passed down to build the kernel_cmd
932 and launching the kernel (e.g. Popen kwargs).
931 and launching the kernel (e.g. Popen kwargs).
933 """
932 """
934 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
935 raise RuntimeError("Can only launch a kernel on a local interface. "
934 raise RuntimeError("Can only launch a kernel on a local interface. "
936 "Make sure that the '*_address' attributes are "
935 "Make sure that the '*_address' attributes are "
937 "configured properly. "
936 "configured properly. "
938 "Currently valid addresses are: %s"%LOCAL_IPS
937 "Currently valid addresses are: %s"%LOCAL_IPS
939 )
938 )
940
939
941 # write connection file / get default ports
940 # write connection file / get default ports
942 self.write_connection_file()
941 self.write_connection_file()
943
942
944 # save kwargs for use in restart
943 # save kwargs for use in restart
945 self._launch_args = kw.copy()
944 self._launch_args = kw.copy()
946 # build the Popen cmd
945 # build the Popen cmd
947 kernel_cmd = self.format_kernel_cmd(**kw)
946 kernel_cmd = self.format_kernel_cmd(**kw)
948 # launch the kernel subprocess
947 # launch the kernel subprocess
949 self.kernel = self._launch_kernel(kernel_cmd,
948 self.kernel = self._launch_kernel(kernel_cmd,
950 ipython_kernel=self.ipython_kernel,
949 ipython_kernel=self.ipython_kernel,
951 **kw)
950 **kw)
952
951
953 def shutdown_kernel(self, now=False, restart=False):
952 def shutdown_kernel(self, now=False, restart=False):
954 """Attempts to the stop the kernel process cleanly.
953 """Attempts to the stop the kernel process cleanly.
955
954
956 This attempts to shutdown the kernels cleanly by:
955 This attempts to shutdown the kernels cleanly by:
957
956
958 1. Sending it a shutdown message over the shell channel.
957 1. Sending it a shutdown message over the shell channel.
959 2. If that fails, the kernel is shutdown forcibly by sending it
958 2. If that fails, the kernel is shutdown forcibly by sending it
960 a signal.
959 a signal.
961
960
962 Parameters:
961 Parameters:
963 -----------
962 -----------
964 now : bool
963 now : bool
965 Should the kernel be forcible killed *now*. This skips the
964 Should the kernel be forcible killed *now*. This skips the
966 first, nice shutdown attempt.
965 first, nice shutdown attempt.
967 restart: bool
966 restart: bool
968 Will this kernel be restarted after it is shutdown. When this
967 Will this kernel be restarted after it is shutdown. When this
969 is True, connection files will not be cleaned up.
968 is True, connection files will not be cleaned up.
970 """
969 """
971 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
972 if sys.platform == 'win32':
971 if sys.platform == 'win32':
973 self._kill_kernel()
972 self._kill_kernel()
974 return
973 return
975
974
976 # Pause the heart beat channel if it exists.
975 # Pause the heart beat channel if it exists.
977 if self._hb_channel is not None:
976 if self._hb_channel is not None:
978 self._hb_channel.pause()
977 self._hb_channel.pause()
979
978
980 if now:
979 if now:
981 if self.has_kernel:
980 if self.has_kernel:
982 self._kill_kernel()
981 self._kill_kernel()
983 else:
982 else:
984 # Don't send any additional kernel kill messages immediately, to give
983 # Don't send any additional kernel kill messages immediately, to give
985 # the kernel a chance to properly execute shutdown actions. Wait for at
984 # the kernel a chance to properly execute shutdown actions. Wait for at
986 # most 1s, checking every 0.1s.
985 # most 1s, checking every 0.1s.
987 self.shell_channel.shutdown(restart=restart)
986 self.shell_channel.shutdown(restart=restart)
988 for i in range(10):
987 for i in range(10):
989 if self.is_alive:
988 if self.is_alive:
990 time.sleep(0.1)
989 time.sleep(0.1)
991 else:
990 else:
992 break
991 break
993 else:
992 else:
994 # OK, we've waited long enough.
993 # OK, we've waited long enough.
995 if self.has_kernel:
994 if self.has_kernel:
996 self._kill_kernel()
995 self._kill_kernel()
997
996
998 if not restart:
997 if not restart:
999 self.cleanup_connection_file()
998 self.cleanup_connection_file()
1000 self.cleanup_ipc_files()
999 self.cleanup_ipc_files()
1001 else:
1000 else:
1002 self.cleanup_ipc_files()
1001 self.cleanup_ipc_files()
1003
1002
1004 def restart_kernel(self, now=False, **kw):
1003 def restart_kernel(self, now=False, **kw):
1005 """Restarts a kernel with the arguments that were used to launch it.
1004 """Restarts a kernel with the arguments that were used to launch it.
1006
1005
1007 If the old kernel was launched with random ports, the same ports will be
1006 If the old kernel was launched with random ports, the same ports will be
1008 used for the new kernel. The same connection file is used again.
1007 used for the new kernel. The same connection file is used again.
1009
1008
1010 Parameters
1009 Parameters
1011 ----------
1010 ----------
1012 now : bool, optional
1011 now : bool, optional
1013 If True, the kernel is forcefully restarted *immediately*, without
1012 If True, the kernel is forcefully restarted *immediately*, without
1014 having a chance to do any cleanup action. Otherwise the kernel is
1013 having a chance to do any cleanup action. Otherwise the kernel is
1015 given 1s to clean up before a forceful restart is issued.
1014 given 1s to clean up before a forceful restart is issued.
1016
1015
1017 In all cases the kernel is restarted, the only difference is whether
1016 In all cases the kernel is restarted, the only difference is whether
1018 it is given a chance to perform a clean shutdown or not.
1017 it is given a chance to perform a clean shutdown or not.
1019
1018
1020 **kw : optional
1019 **kw : optional
1021 Any options specified here will overwrite those used to launch the
1020 Any options specified here will overwrite those used to launch the
1022 kernel.
1021 kernel.
1023 """
1022 """
1024 if self._launch_args is None:
1023 if self._launch_args is None:
1025 raise RuntimeError("Cannot restart the kernel. "
1024 raise RuntimeError("Cannot restart the kernel. "
1026 "No previous call to 'start_kernel'.")
1025 "No previous call to 'start_kernel'.")
1027 else:
1026 else:
1028 # Stop currently running kernel.
1027 # Stop currently running kernel.
1029 self.shutdown_kernel(now=now, restart=True)
1028 self.shutdown_kernel(now=now, restart=True)
1030
1029
1031 # Start new kernel.
1030 # Start new kernel.
1032 self._launch_args.update(kw)
1031 self._launch_args.update(kw)
1033 self.start_kernel(**self._launch_args)
1032 self.start_kernel(**self._launch_args)
1034
1033
1035 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1036 # unless there is some delay here.
1035 # unless there is some delay here.
1037 if sys.platform == 'win32':
1036 if sys.platform == 'win32':
1038 time.sleep(0.2)
1037 time.sleep(0.2)
1039
1038
1040 @property
1039 @property
1041 def has_kernel(self):
1040 def has_kernel(self):
1042 """Has a kernel been started that we are managing."""
1041 """Has a kernel been started that we are managing."""
1043 return self.kernel is not None
1042 return self.kernel is not None
1044
1043
1045 def _kill_kernel(self):
1044 def _kill_kernel(self):
1046 """Kill the running kernel.
1045 """Kill the running kernel.
1047
1046
1048 This is a private method, callers should use shutdown_kernel(now=True).
1047 This is a private method, callers should use shutdown_kernel(now=True).
1049 """
1048 """
1050 if self.has_kernel:
1049 if self.has_kernel:
1051 # Pause the heart beat channel if it exists.
1050 # Pause the heart beat channel if it exists.
1052 if self._hb_channel is not None:
1051 if self._hb_channel is not None:
1053 self._hb_channel.pause()
1052 self._hb_channel.pause()
1054
1053
1055 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1056 # TerminateProcess() on Win32).
1055 # TerminateProcess() on Win32).
1057 try:
1056 try:
1058 self.kernel.kill()
1057 self.kernel.kill()
1059 except OSError as e:
1058 except OSError as e:
1060 # In Windows, we will get an Access Denied error if the process
1059 # In Windows, we will get an Access Denied error if the process
1061 # has already terminated. Ignore it.
1060 # has already terminated. Ignore it.
1062 if sys.platform == 'win32':
1061 if sys.platform == 'win32':
1063 if e.winerror != 5:
1062 if e.winerror != 5:
1064 raise
1063 raise
1065 # On Unix, we may get an ESRCH error if the process has already
1064 # On Unix, we may get an ESRCH error if the process has already
1066 # terminated. Ignore it.
1065 # terminated. Ignore it.
1067 else:
1066 else:
1068 from errno import ESRCH
1067 from errno import ESRCH
1069 if e.errno != ESRCH:
1068 if e.errno != ESRCH:
1070 raise
1069 raise
1071
1070
1072 # Block until the kernel terminates.
1071 # Block until the kernel terminates.
1073 self.kernel.wait()
1072 self.kernel.wait()
1074 self.kernel = None
1073 self.kernel = None
1075 else:
1074 else:
1076 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1077
1076
1078 def interrupt_kernel(self):
1077 def interrupt_kernel(self):
1079 """Interrupts the kernel by sending it a signal.
1078 """Interrupts the kernel by sending it a signal.
1080
1079
1081 Unlike ``signal_kernel``, this operation is well supported on all
1080 Unlike ``signal_kernel``, this operation is well supported on all
1082 platforms.
1081 platforms.
1083 """
1082 """
1084 if self.has_kernel:
1083 if self.has_kernel:
1085 if sys.platform == 'win32':
1084 if sys.platform == 'win32':
1086 from parentpoller import ParentPollerWindows as Poller
1085 from parentpoller import ParentPollerWindows as Poller
1087 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1088 else:
1087 else:
1089 self.kernel.send_signal(signal.SIGINT)
1088 self.kernel.send_signal(signal.SIGINT)
1090 else:
1089 else:
1091 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1092
1091
1093 def signal_kernel(self, signum):
1092 def signal_kernel(self, signum):
1094 """Sends a signal to the kernel.
1093 """Sends a signal to the kernel.
1095
1094
1096 Note that since only SIGTERM is supported on Windows, this function is
1095 Note that since only SIGTERM is supported on Windows, this function is
1097 only useful on Unix systems.
1096 only useful on Unix systems.
1098 """
1097 """
1099 if self.has_kernel:
1098 if self.has_kernel:
1100 self.kernel.send_signal(signum)
1099 self.kernel.send_signal(signum)
1101 else:
1100 else:
1102 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1103
1102
1104 @property
1103 @property
1105 def is_alive(self):
1104 def is_alive(self):
1106 """Is the kernel process still running?"""
1105 """Is the kernel process still running?"""
1107 if self.has_kernel:
1106 if self.has_kernel:
1108 if self.kernel.poll() is None:
1107 if self.kernel.poll() is None:
1109 return True
1108 return True
1110 else:
1109 else:
1111 return False
1110 return False
1112 elif self._hb_channel is not None:
1111 elif self._hb_channel is not None:
1113 # We didn't start the kernel with this KernelManager so we
1112 # We didn't start the kernel with this KernelManager so we
1114 # use the heartbeat.
1113 # use the heartbeat.
1115 return self._hb_channel.is_beating()
1114 return self._hb_channel.is_beating()
1116 else:
1115 else:
1117 # no heartbeat and not local, we can't tell if it's running,
1116 # no heartbeat and not local, we can't tell if it's running,
1118 # so naively return True
1117 # so naively return True
1119 return True
1118 return True
1120
1119
1121
1120
1122 #-----------------------------------------------------------------------------
1121 #-----------------------------------------------------------------------------
1123 # ABC Registration
1122 # ABC Registration
1124 #-----------------------------------------------------------------------------
1123 #-----------------------------------------------------------------------------
1125
1124
1126 ShellChannelABC.register(ShellChannel)
1125 ShellChannelABC.register(ShellChannel)
1127 IOPubChannelABC.register(IOPubChannel)
1126 IOPubChannelABC.register(IOPubChannel)
1128 HBChannelABC.register(HBChannel)
1127 HBChannelABC.register(HBChannel)
1129 StdInChannelABC.register(StdInChannel)
1128 StdInChannelABC.register(StdInChannel)
1130 KernelManagerABC.register(KernelManager)
1129 KernelManagerABC.register(KernelManager)
1131
1130
General Comments 0
You need to be logged in to leave comments. Login now