##// END OF EJS Templates
Putting connection file cleanup back in __del__.
Brian E. Granger -
Show More
@@ -1,1078 +1,1081 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import write_connection_file
44 from session import Session
44 from session import Session
45 from IPython.zmq.kernelmanagerabc import (
45 from IPython.zmq.kernelmanagerabc import (
46 ShellChannelABC, IOPubChannelABC,
46 ShellChannelABC, IOPubChannelABC,
47 HBChannelABC, StdInChannelABC,
47 HBChannelABC, StdInChannelABC,
48 KernelManagerABC
48 KernelManagerABC
49 )
49 )
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Constants and exceptions
53 # Constants and exceptions
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class InvalidPortNumber(Exception):
56 class InvalidPortNumber(Exception):
57 pass
57 pass
58
58
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60 # Utility functions
60 # Utility functions
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62
62
63 # some utilities to validate message structure, these might get moved elsewhere
63 # some utilities to validate message structure, these might get moved elsewhere
64 # if they prove to have more generic utility
64 # if they prove to have more generic utility
65
65
66 def validate_string_list(lst):
66 def validate_string_list(lst):
67 """Validate that the input is a list of strings.
67 """Validate that the input is a list of strings.
68
68
69 Raises ValueError if not."""
69 Raises ValueError if not."""
70 if not isinstance(lst, list):
70 if not isinstance(lst, list):
71 raise ValueError('input %r must be a list' % lst)
71 raise ValueError('input %r must be a list' % lst)
72 for x in lst:
72 for x in lst:
73 if not isinstance(x, basestring):
73 if not isinstance(x, basestring):
74 raise ValueError('element %r in list must be a string' % x)
74 raise ValueError('element %r in list must be a string' % x)
75
75
76
76
77 def validate_string_dict(dct):
77 def validate_string_dict(dct):
78 """Validate that the input is a dict with string keys and values.
78 """Validate that the input is a dict with string keys and values.
79
79
80 Raises ValueError if not."""
80 Raises ValueError if not."""
81 for k,v in dct.iteritems():
81 for k,v in dct.iteritems():
82 if not isinstance(k, basestring):
82 if not isinstance(k, basestring):
83 raise ValueError('key %r in dict must be a string' % k)
83 raise ValueError('key %r in dict must be a string' % k)
84 if not isinstance(v, basestring):
84 if not isinstance(v, basestring):
85 raise ValueError('value %r in dict must be a string' % v)
85 raise ValueError('value %r in dict must be a string' % v)
86
86
87
87
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89 # ZMQ Socket Channel classes
89 # ZMQ Socket Channel classes
90 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
91
91
92 class ZMQSocketChannel(Thread):
92 class ZMQSocketChannel(Thread):
93 """The base class for the channels that use ZMQ sockets."""
93 """The base class for the channels that use ZMQ sockets."""
94 context = None
94 context = None
95 session = None
95 session = None
96 socket = None
96 socket = None
97 ioloop = None
97 ioloop = None
98 stream = None
98 stream = None
99 _address = None
99 _address = None
100 _exiting = False
100 _exiting = False
101
101
102 def __init__(self, context, session, address):
102 def __init__(self, context, session, address):
103 """Create a channel.
103 """Create a channel.
104
104
105 Parameters
105 Parameters
106 ----------
106 ----------
107 context : :class:`zmq.Context`
107 context : :class:`zmq.Context`
108 The ZMQ context to use.
108 The ZMQ context to use.
109 session : :class:`session.Session`
109 session : :class:`session.Session`
110 The session to use.
110 The session to use.
111 address : zmq url
111 address : zmq url
112 Standard (ip, port) tuple that the kernel is listening on.
112 Standard (ip, port) tuple that the kernel is listening on.
113 """
113 """
114 super(ZMQSocketChannel, self).__init__()
114 super(ZMQSocketChannel, self).__init__()
115 self.daemon = True
115 self.daemon = True
116
116
117 self.context = context
117 self.context = context
118 self.session = session
118 self.session = session
119 if isinstance(address, tuple):
119 if isinstance(address, tuple):
120 if address[1] == 0:
120 if address[1] == 0:
121 message = 'The port number for a channel cannot be 0.'
121 message = 'The port number for a channel cannot be 0.'
122 raise InvalidPortNumber(message)
122 raise InvalidPortNumber(message)
123 address = "tcp://%s:%i" % address
123 address = "tcp://%s:%i" % address
124 self._address = address
124 self._address = address
125 atexit.register(self._notice_exit)
125 atexit.register(self._notice_exit)
126
126
127 def _notice_exit(self):
127 def _notice_exit(self):
128 self._exiting = True
128 self._exiting = True
129
129
130 def _run_loop(self):
130 def _run_loop(self):
131 """Run my loop, ignoring EINTR events in the poller"""
131 """Run my loop, ignoring EINTR events in the poller"""
132 while True:
132 while True:
133 try:
133 try:
134 self.ioloop.start()
134 self.ioloop.start()
135 except ZMQError as e:
135 except ZMQError as e:
136 if e.errno == errno.EINTR:
136 if e.errno == errno.EINTR:
137 continue
137 continue
138 else:
138 else:
139 raise
139 raise
140 except Exception:
140 except Exception:
141 if self._exiting:
141 if self._exiting:
142 break
142 break
143 else:
143 else:
144 raise
144 raise
145 else:
145 else:
146 break
146 break
147
147
148 def stop(self):
148 def stop(self):
149 """Stop the channel's event loop and join its thread.
149 """Stop the channel's event loop and join its thread.
150
150
151 This calls :method:`Thread.join` and returns when the thread
151 This calls :method:`Thread.join` and returns when the thread
152 terminates. :class:`RuntimeError` will be raised if
152 terminates. :class:`RuntimeError` will be raised if
153 :method:`self.start` is called again.
153 :method:`self.start` is called again.
154 """
154 """
155 self.join()
155 self.join()
156
156
157 @property
157 @property
158 def address(self):
158 def address(self):
159 """Get the channel's address as a zmq url string.
159 """Get the channel's address as a zmq url string.
160
160
161 These URLS have the form: 'tcp://127.0.0.1:5555'.
161 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 """
162 """
163 return self._address
163 return self._address
164
164
165 def _queue_send(self, msg):
165 def _queue_send(self, msg):
166 """Queue a message to be sent from the IOLoop's thread.
166 """Queue a message to be sent from the IOLoop's thread.
167
167
168 Parameters
168 Parameters
169 ----------
169 ----------
170 msg : message to send
170 msg : message to send
171
171
172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 thread control of the action.
173 thread control of the action.
174 """
174 """
175 def thread_send():
175 def thread_send():
176 self.session.send(self.stream, msg)
176 self.session.send(self.stream, msg)
177 self.ioloop.add_callback(thread_send)
177 self.ioloop.add_callback(thread_send)
178
178
179 def _handle_recv(self, msg):
179 def _handle_recv(self, msg):
180 """Callback for stream.on_recv.
180 """Callback for stream.on_recv.
181
181
182 Unpacks message, and calls handlers with it.
182 Unpacks message, and calls handlers with it.
183 """
183 """
184 ident,smsg = self.session.feed_identities(msg)
184 ident,smsg = self.session.feed_identities(msg)
185 self.call_handlers(self.session.unserialize(smsg))
185 self.call_handlers(self.session.unserialize(smsg))
186
186
187
187
188
188
189 class ShellChannel(ZMQSocketChannel):
189 class ShellChannel(ZMQSocketChannel):
190 """The shell channel for issuing request/replies to the kernel."""
190 """The shell channel for issuing request/replies to the kernel."""
191
191
192 command_queue = None
192 command_queue = None
193 # flag for whether execute requests should be allowed to call raw_input:
193 # flag for whether execute requests should be allowed to call raw_input:
194 allow_stdin = True
194 allow_stdin = True
195
195
196 def __init__(self, context, session, address):
196 def __init__(self, context, session, address):
197 super(ShellChannel, self).__init__(context, session, address)
197 super(ShellChannel, self).__init__(context, session, address)
198 self.ioloop = ioloop.IOLoop()
198 self.ioloop = ioloop.IOLoop()
199
199
200 def run(self):
200 def run(self):
201 """The thread's main activity. Call start() instead."""
201 """The thread's main activity. Call start() instead."""
202 self.socket = self.context.socket(zmq.DEALER)
202 self.socket = self.context.socket(zmq.DEALER)
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.connect(self.address)
204 self.socket.connect(self.address)
205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 self.stream.on_recv(self._handle_recv)
206 self.stream.on_recv(self._handle_recv)
207 self._run_loop()
207 self._run_loop()
208 try:
208 try:
209 self.socket.close()
209 self.socket.close()
210 except:
210 except:
211 pass
211 pass
212
212
213 def stop(self):
213 def stop(self):
214 """Stop the channel's event loop and join its thread."""
214 """Stop the channel's event loop and join its thread."""
215 self.ioloop.stop()
215 self.ioloop.stop()
216 super(ShellChannel, self).stop()
216 super(ShellChannel, self).stop()
217
217
218 def call_handlers(self, msg):
218 def call_handlers(self, msg):
219 """This method is called in the ioloop thread when a message arrives.
219 """This method is called in the ioloop thread when a message arrives.
220
220
221 Subclasses should override this method to handle incoming messages.
221 Subclasses should override this method to handle incoming messages.
222 It is important to remember that this method is called in the thread
222 It is important to remember that this method is called in the thread
223 so that some logic must be done to ensure that the application leve
223 so that some logic must be done to ensure that the application leve
224 handlers are called in the application thread.
224 handlers are called in the application thread.
225 """
225 """
226 raise NotImplementedError('call_handlers must be defined in a subclass.')
226 raise NotImplementedError('call_handlers must be defined in a subclass.')
227
227
228 def execute(self, code, silent=False, store_history=True,
228 def execute(self, code, silent=False, store_history=True,
229 user_variables=None, user_expressions=None, allow_stdin=None):
229 user_variables=None, user_expressions=None, allow_stdin=None):
230 """Execute code in the kernel.
230 """Execute code in the kernel.
231
231
232 Parameters
232 Parameters
233 ----------
233 ----------
234 code : str
234 code : str
235 A string of Python code.
235 A string of Python code.
236
236
237 silent : bool, optional (default False)
237 silent : bool, optional (default False)
238 If set, the kernel will execute the code as quietly possible, and
238 If set, the kernel will execute the code as quietly possible, and
239 will force store_history to be False.
239 will force store_history to be False.
240
240
241 store_history : bool, optional (default True)
241 store_history : bool, optional (default True)
242 If set, the kernel will store command history. This is forced
242 If set, the kernel will store command history. This is forced
243 to be False if silent is True.
243 to be False if silent is True.
244
244
245 user_variables : list, optional
245 user_variables : list, optional
246 A list of variable names to pull from the user's namespace. They
246 A list of variable names to pull from the user's namespace. They
247 will come back as a dict with these names as keys and their
247 will come back as a dict with these names as keys and their
248 :func:`repr` as values.
248 :func:`repr` as values.
249
249
250 user_expressions : dict, optional
250 user_expressions : dict, optional
251 A dict mapping names to expressions to be evaluated in the user's
251 A dict mapping names to expressions to be evaluated in the user's
252 dict. The expression values are returned as strings formatted using
252 dict. The expression values are returned as strings formatted using
253 :func:`repr`.
253 :func:`repr`.
254
254
255 allow_stdin : bool, optional (default self.allow_stdin)
255 allow_stdin : bool, optional (default self.allow_stdin)
256 Flag for whether the kernel can send stdin requests to frontends.
256 Flag for whether the kernel can send stdin requests to frontends.
257
257
258 Some frontends (e.g. the Notebook) do not support stdin requests.
258 Some frontends (e.g. the Notebook) do not support stdin requests.
259 If raw_input is called from code executed from such a frontend, a
259 If raw_input is called from code executed from such a frontend, a
260 StdinNotImplementedError will be raised.
260 StdinNotImplementedError will be raised.
261
261
262 Returns
262 Returns
263 -------
263 -------
264 The msg_id of the message sent.
264 The msg_id of the message sent.
265 """
265 """
266 if user_variables is None:
266 if user_variables is None:
267 user_variables = []
267 user_variables = []
268 if user_expressions is None:
268 if user_expressions is None:
269 user_expressions = {}
269 user_expressions = {}
270 if allow_stdin is None:
270 if allow_stdin is None:
271 allow_stdin = self.allow_stdin
271 allow_stdin = self.allow_stdin
272
272
273
273
274 # Don't waste network traffic if inputs are invalid
274 # Don't waste network traffic if inputs are invalid
275 if not isinstance(code, basestring):
275 if not isinstance(code, basestring):
276 raise ValueError('code %r must be a string' % code)
276 raise ValueError('code %r must be a string' % code)
277 validate_string_list(user_variables)
277 validate_string_list(user_variables)
278 validate_string_dict(user_expressions)
278 validate_string_dict(user_expressions)
279
279
280 # Create class for content/msg creation. Related to, but possibly
280 # Create class for content/msg creation. Related to, but possibly
281 # not in Session.
281 # not in Session.
282 content = dict(code=code, silent=silent, store_history=store_history,
282 content = dict(code=code, silent=silent, store_history=store_history,
283 user_variables=user_variables,
283 user_variables=user_variables,
284 user_expressions=user_expressions,
284 user_expressions=user_expressions,
285 allow_stdin=allow_stdin,
285 allow_stdin=allow_stdin,
286 )
286 )
287 msg = self.session.msg('execute_request', content)
287 msg = self.session.msg('execute_request', content)
288 self._queue_send(msg)
288 self._queue_send(msg)
289 return msg['header']['msg_id']
289 return msg['header']['msg_id']
290
290
291 def complete(self, text, line, cursor_pos, block=None):
291 def complete(self, text, line, cursor_pos, block=None):
292 """Tab complete text in the kernel's namespace.
292 """Tab complete text in the kernel's namespace.
293
293
294 Parameters
294 Parameters
295 ----------
295 ----------
296 text : str
296 text : str
297 The text to complete.
297 The text to complete.
298 line : str
298 line : str
299 The full line of text that is the surrounding context for the
299 The full line of text that is the surrounding context for the
300 text to complete.
300 text to complete.
301 cursor_pos : int
301 cursor_pos : int
302 The position of the cursor in the line where the completion was
302 The position of the cursor in the line where the completion was
303 requested.
303 requested.
304 block : str, optional
304 block : str, optional
305 The full block of code in which the completion is being requested.
305 The full block of code in which the completion is being requested.
306
306
307 Returns
307 Returns
308 -------
308 -------
309 The msg_id of the message sent.
309 The msg_id of the message sent.
310 """
310 """
311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 msg = self.session.msg('complete_request', content)
312 msg = self.session.msg('complete_request', content)
313 self._queue_send(msg)
313 self._queue_send(msg)
314 return msg['header']['msg_id']
314 return msg['header']['msg_id']
315
315
316 def object_info(self, oname, detail_level=0):
316 def object_info(self, oname, detail_level=0):
317 """Get metadata information about an object in the kernel's namespace.
317 """Get metadata information about an object in the kernel's namespace.
318
318
319 Parameters
319 Parameters
320 ----------
320 ----------
321 oname : str
321 oname : str
322 A string specifying the object name.
322 A string specifying the object name.
323 detail_level : int, optional
323 detail_level : int, optional
324 The level of detail for the introspection (0-2)
324 The level of detail for the introspection (0-2)
325
325
326 Returns
326 Returns
327 -------
327 -------
328 The msg_id of the message sent.
328 The msg_id of the message sent.
329 """
329 """
330 content = dict(oname=oname, detail_level=detail_level)
330 content = dict(oname=oname, detail_level=detail_level)
331 msg = self.session.msg('object_info_request', content)
331 msg = self.session.msg('object_info_request', content)
332 self._queue_send(msg)
332 self._queue_send(msg)
333 return msg['header']['msg_id']
333 return msg['header']['msg_id']
334
334
335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 """Get entries from the kernel's history list.
336 """Get entries from the kernel's history list.
337
337
338 Parameters
338 Parameters
339 ----------
339 ----------
340 raw : bool
340 raw : bool
341 If True, return the raw input.
341 If True, return the raw input.
342 output : bool
342 output : bool
343 If True, then return the output as well.
343 If True, then return the output as well.
344 hist_access_type : str
344 hist_access_type : str
345 'range' (fill in session, start and stop params), 'tail' (fill in n)
345 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 or 'search' (fill in pattern param).
346 or 'search' (fill in pattern param).
347
347
348 session : int
348 session : int
349 For a range request, the session from which to get lines. Session
349 For a range request, the session from which to get lines. Session
350 numbers are positive integers; negative ones count back from the
350 numbers are positive integers; negative ones count back from the
351 current session.
351 current session.
352 start : int
352 start : int
353 The first line number of a history range.
353 The first line number of a history range.
354 stop : int
354 stop : int
355 The final (excluded) line number of a history range.
355 The final (excluded) line number of a history range.
356
356
357 n : int
357 n : int
358 The number of lines of history to get for a tail request.
358 The number of lines of history to get for a tail request.
359
359
360 pattern : str
360 pattern : str
361 The glob-syntax pattern for a search request.
361 The glob-syntax pattern for a search request.
362
362
363 Returns
363 Returns
364 -------
364 -------
365 The msg_id of the message sent.
365 The msg_id of the message sent.
366 """
366 """
367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 **kwargs)
368 **kwargs)
369 msg = self.session.msg('history_request', content)
369 msg = self.session.msg('history_request', content)
370 self._queue_send(msg)
370 self._queue_send(msg)
371 return msg['header']['msg_id']
371 return msg['header']['msg_id']
372
372
373 def kernel_info(self):
373 def kernel_info(self):
374 """Request kernel info."""
374 """Request kernel info."""
375 msg = self.session.msg('kernel_info_request')
375 msg = self.session.msg('kernel_info_request')
376 self._queue_send(msg)
376 self._queue_send(msg)
377 return msg['header']['msg_id']
377 return msg['header']['msg_id']
378
378
379 def shutdown(self, restart=False):
379 def shutdown(self, restart=False):
380 """Request an immediate kernel shutdown.
380 """Request an immediate kernel shutdown.
381
381
382 Upon receipt of the (empty) reply, client code can safely assume that
382 Upon receipt of the (empty) reply, client code can safely assume that
383 the kernel has shut down and it's safe to forcefully terminate it if
383 the kernel has shut down and it's safe to forcefully terminate it if
384 it's still alive.
384 it's still alive.
385
385
386 The kernel will send the reply via a function registered with Python's
386 The kernel will send the reply via a function registered with Python's
387 atexit module, ensuring it's truly done as the kernel is done with all
387 atexit module, ensuring it's truly done as the kernel is done with all
388 normal operation.
388 normal operation.
389 """
389 """
390 # Send quit message to kernel. Once we implement kernel-side setattr,
390 # Send quit message to kernel. Once we implement kernel-side setattr,
391 # this should probably be done that way, but for now this will do.
391 # this should probably be done that way, but for now this will do.
392 msg = self.session.msg('shutdown_request', {'restart':restart})
392 msg = self.session.msg('shutdown_request', {'restart':restart})
393 self._queue_send(msg)
393 self._queue_send(msg)
394 return msg['header']['msg_id']
394 return msg['header']['msg_id']
395
395
396
396
397
397
398 class IOPubChannel(ZMQSocketChannel):
398 class IOPubChannel(ZMQSocketChannel):
399 """The iopub channel which listens for messages that the kernel publishes.
399 """The iopub channel which listens for messages that the kernel publishes.
400
400
401 This channel is where all output is published to frontends.
401 This channel is where all output is published to frontends.
402 """
402 """
403
403
404 def __init__(self, context, session, address):
404 def __init__(self, context, session, address):
405 super(IOPubChannel, self).__init__(context, session, address)
405 super(IOPubChannel, self).__init__(context, session, address)
406 self.ioloop = ioloop.IOLoop()
406 self.ioloop = ioloop.IOLoop()
407
407
408 def run(self):
408 def run(self):
409 """The thread's main activity. Call start() instead."""
409 """The thread's main activity. Call start() instead."""
410 self.socket = self.context.socket(zmq.SUB)
410 self.socket = self.context.socket(zmq.SUB)
411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 self.socket.connect(self.address)
413 self.socket.connect(self.address)
414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 self.stream.on_recv(self._handle_recv)
415 self.stream.on_recv(self._handle_recv)
416 self._run_loop()
416 self._run_loop()
417 try:
417 try:
418 self.socket.close()
418 self.socket.close()
419 except:
419 except:
420 pass
420 pass
421
421
422 def stop(self):
422 def stop(self):
423 """Stop the channel's event loop and join its thread."""
423 """Stop the channel's event loop and join its thread."""
424 self.ioloop.stop()
424 self.ioloop.stop()
425 super(IOPubChannel, self).stop()
425 super(IOPubChannel, self).stop()
426
426
427 def call_handlers(self, msg):
427 def call_handlers(self, msg):
428 """This method is called in the ioloop thread when a message arrives.
428 """This method is called in the ioloop thread when a message arrives.
429
429
430 Subclasses should override this method to handle incoming messages.
430 Subclasses should override this method to handle incoming messages.
431 It is important to remember that this method is called in the thread
431 It is important to remember that this method is called in the thread
432 so that some logic must be done to ensure that the application leve
432 so that some logic must be done to ensure that the application leve
433 handlers are called in the application thread.
433 handlers are called in the application thread.
434 """
434 """
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436
436
437 def flush(self, timeout=1.0):
437 def flush(self, timeout=1.0):
438 """Immediately processes all pending messages on the iopub channel.
438 """Immediately processes all pending messages on the iopub channel.
439
439
440 Callers should use this method to ensure that :method:`call_handlers`
440 Callers should use this method to ensure that :method:`call_handlers`
441 has been called for all messages that have been received on the
441 has been called for all messages that have been received on the
442 0MQ SUB socket of this channel.
442 0MQ SUB socket of this channel.
443
443
444 This method is thread safe.
444 This method is thread safe.
445
445
446 Parameters
446 Parameters
447 ----------
447 ----------
448 timeout : float, optional
448 timeout : float, optional
449 The maximum amount of time to spend flushing, in seconds. The
449 The maximum amount of time to spend flushing, in seconds. The
450 default is one second.
450 default is one second.
451 """
451 """
452 # We do the IOLoop callback process twice to ensure that the IOLoop
452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # gets to perform at least one full poll.
453 # gets to perform at least one full poll.
454 stop_time = time.time() + timeout
454 stop_time = time.time() + timeout
455 for i in xrange(2):
455 for i in xrange(2):
456 self._flushed = False
456 self._flushed = False
457 self.ioloop.add_callback(self._flush)
457 self.ioloop.add_callback(self._flush)
458 while not self._flushed and time.time() < stop_time:
458 while not self._flushed and time.time() < stop_time:
459 time.sleep(0.01)
459 time.sleep(0.01)
460
460
461 def _flush(self):
461 def _flush(self):
462 """Callback for :method:`self.flush`."""
462 """Callback for :method:`self.flush`."""
463 self.stream.flush()
463 self.stream.flush()
464 self._flushed = True
464 self._flushed = True
465
465
466
466
467 class StdInChannel(ZMQSocketChannel):
467 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
468 """The stdin channel to handle raw_input requests that the kernel makes."""
469
469
470 msg_queue = None
470 msg_queue = None
471
471
472 def __init__(self, context, session, address):
472 def __init__(self, context, session, address):
473 super(StdInChannel, self).__init__(context, session, address)
473 super(StdInChannel, self).__init__(context, session, address)
474 self.ioloop = ioloop.IOLoop()
474 self.ioloop = ioloop.IOLoop()
475
475
476 def run(self):
476 def run(self):
477 """The thread's main activity. Call start() instead."""
477 """The thread's main activity. Call start() instead."""
478 self.socket = self.context.socket(zmq.DEALER)
478 self.socket = self.context.socket(zmq.DEALER)
479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 self.socket.connect(self.address)
480 self.socket.connect(self.address)
481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 self.stream.on_recv(self._handle_recv)
482 self.stream.on_recv(self._handle_recv)
483 self._run_loop()
483 self._run_loop()
484 try:
484 try:
485 self.socket.close()
485 self.socket.close()
486 except:
486 except:
487 pass
487 pass
488
488
489 def stop(self):
489 def stop(self):
490 """Stop the channel's event loop and join its thread."""
490 """Stop the channel's event loop and join its thread."""
491 self.ioloop.stop()
491 self.ioloop.stop()
492 super(StdInChannel, self).stop()
492 super(StdInChannel, self).stop()
493
493
494 def call_handlers(self, msg):
494 def call_handlers(self, msg):
495 """This method is called in the ioloop thread when a message arrives.
495 """This method is called in the ioloop thread when a message arrives.
496
496
497 Subclasses should override this method to handle incoming messages.
497 Subclasses should override this method to handle incoming messages.
498 It is important to remember that this method is called in the thread
498 It is important to remember that this method is called in the thread
499 so that some logic must be done to ensure that the application leve
499 so that some logic must be done to ensure that the application leve
500 handlers are called in the application thread.
500 handlers are called in the application thread.
501 """
501 """
502 raise NotImplementedError('call_handlers must be defined in a subclass.')
502 raise NotImplementedError('call_handlers must be defined in a subclass.')
503
503
504 def input(self, string):
504 def input(self, string):
505 """Send a string of raw input to the kernel."""
505 """Send a string of raw input to the kernel."""
506 content = dict(value=string)
506 content = dict(value=string)
507 msg = self.session.msg('input_reply', content)
507 msg = self.session.msg('input_reply', content)
508 self._queue_send(msg)
508 self._queue_send(msg)
509
509
510
510
511 class HBChannel(ZMQSocketChannel):
511 class HBChannel(ZMQSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat.
512 """The heartbeat channel which monitors the kernel heartbeat.
513
513
514 Note that the heartbeat channel is paused by default. As long as you start
514 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
515 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
516 as appropriate.
517 """
517 """
518
518
519 time_to_dead = 3.0
519 time_to_dead = 3.0
520 socket = None
520 socket = None
521 poller = None
521 poller = None
522 _running = None
522 _running = None
523 _pause = None
523 _pause = None
524 _beating = None
524 _beating = None
525
525
526 def __init__(self, context, session, address):
526 def __init__(self, context, session, address):
527 super(HBChannel, self).__init__(context, session, address)
527 super(HBChannel, self).__init__(context, session, address)
528 self._running = False
528 self._running = False
529 self._pause =True
529 self._pause =True
530 self.poller = zmq.Poller()
530 self.poller = zmq.Poller()
531
531
532 def _create_socket(self):
532 def _create_socket(self):
533 if self.socket is not None:
533 if self.socket is not None:
534 # close previous socket, before opening a new one
534 # close previous socket, before opening a new one
535 self.poller.unregister(self.socket)
535 self.poller.unregister(self.socket)
536 self.socket.close()
536 self.socket.close()
537 self.socket = self.context.socket(zmq.REQ)
537 self.socket = self.context.socket(zmq.REQ)
538 self.socket.setsockopt(zmq.LINGER, 0)
538 self.socket.setsockopt(zmq.LINGER, 0)
539 self.socket.connect(self.address)
539 self.socket.connect(self.address)
540
540
541 self.poller.register(self.socket, zmq.POLLIN)
541 self.poller.register(self.socket, zmq.POLLIN)
542
542
543 def _poll(self, start_time):
543 def _poll(self, start_time):
544 """poll for heartbeat replies until we reach self.time_to_dead.
544 """poll for heartbeat replies until we reach self.time_to_dead.
545
545
546 Ignores interrupts, and returns the result of poll(), which
546 Ignores interrupts, and returns the result of poll(), which
547 will be an empty list if no messages arrived before the timeout,
547 will be an empty list if no messages arrived before the timeout,
548 or the event tuple if there is a message to receive.
548 or the event tuple if there is a message to receive.
549 """
549 """
550
550
551 until_dead = self.time_to_dead - (time.time() - start_time)
551 until_dead = self.time_to_dead - (time.time() - start_time)
552 # ensure poll at least once
552 # ensure poll at least once
553 until_dead = max(until_dead, 1e-3)
553 until_dead = max(until_dead, 1e-3)
554 events = []
554 events = []
555 while True:
555 while True:
556 try:
556 try:
557 events = self.poller.poll(1000 * until_dead)
557 events = self.poller.poll(1000 * until_dead)
558 except ZMQError as e:
558 except ZMQError as e:
559 if e.errno == errno.EINTR:
559 if e.errno == errno.EINTR:
560 # ignore interrupts during heartbeat
560 # ignore interrupts during heartbeat
561 # this may never actually happen
561 # this may never actually happen
562 until_dead = self.time_to_dead - (time.time() - start_time)
562 until_dead = self.time_to_dead - (time.time() - start_time)
563 until_dead = max(until_dead, 1e-3)
563 until_dead = max(until_dead, 1e-3)
564 pass
564 pass
565 else:
565 else:
566 raise
566 raise
567 except Exception:
567 except Exception:
568 if self._exiting:
568 if self._exiting:
569 break
569 break
570 else:
570 else:
571 raise
571 raise
572 else:
572 else:
573 break
573 break
574 return events
574 return events
575
575
576 def run(self):
576 def run(self):
577 """The thread's main activity. Call start() instead."""
577 """The thread's main activity. Call start() instead."""
578 self._create_socket()
578 self._create_socket()
579 self._running = True
579 self._running = True
580 self._beating = True
580 self._beating = True
581
581
582 while self._running:
582 while self._running:
583 if self._pause:
583 if self._pause:
584 # just sleep, and skip the rest of the loop
584 # just sleep, and skip the rest of the loop
585 time.sleep(self.time_to_dead)
585 time.sleep(self.time_to_dead)
586 continue
586 continue
587
587
588 since_last_heartbeat = 0.0
588 since_last_heartbeat = 0.0
589 # io.rprint('Ping from HB channel') # dbg
589 # io.rprint('Ping from HB channel') # dbg
590 # no need to catch EFSM here, because the previous event was
590 # no need to catch EFSM here, because the previous event was
591 # either a recv or connect, which cannot be followed by EFSM
591 # either a recv or connect, which cannot be followed by EFSM
592 self.socket.send(b'ping')
592 self.socket.send(b'ping')
593 request_time = time.time()
593 request_time = time.time()
594 ready = self._poll(request_time)
594 ready = self._poll(request_time)
595 if ready:
595 if ready:
596 self._beating = True
596 self._beating = True
597 # the poll above guarantees we have something to recv
597 # the poll above guarantees we have something to recv
598 self.socket.recv()
598 self.socket.recv()
599 # sleep the remainder of the cycle
599 # sleep the remainder of the cycle
600 remainder = self.time_to_dead - (time.time() - request_time)
600 remainder = self.time_to_dead - (time.time() - request_time)
601 if remainder > 0:
601 if remainder > 0:
602 time.sleep(remainder)
602 time.sleep(remainder)
603 continue
603 continue
604 else:
604 else:
605 # nothing was received within the time limit, signal heart failure
605 # nothing was received within the time limit, signal heart failure
606 self._beating = False
606 self._beating = False
607 since_last_heartbeat = time.time() - request_time
607 since_last_heartbeat = time.time() - request_time
608 self.call_handlers(since_last_heartbeat)
608 self.call_handlers(since_last_heartbeat)
609 # and close/reopen the socket, because the REQ/REP cycle has been broken
609 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 self._create_socket()
610 self._create_socket()
611 continue
611 continue
612 try:
612 try:
613 self.socket.close()
613 self.socket.close()
614 except:
614 except:
615 pass
615 pass
616
616
617 def pause(self):
617 def pause(self):
618 """Pause the heartbeat."""
618 """Pause the heartbeat."""
619 self._pause = True
619 self._pause = True
620
620
621 def unpause(self):
621 def unpause(self):
622 """Unpause the heartbeat."""
622 """Unpause the heartbeat."""
623 self._pause = False
623 self._pause = False
624
624
625 def is_beating(self):
625 def is_beating(self):
626 """Is the heartbeat running and responsive (and not paused)."""
626 """Is the heartbeat running and responsive (and not paused)."""
627 if self.is_alive() and not self._pause and self._beating:
627 if self.is_alive() and not self._pause and self._beating:
628 return True
628 return True
629 else:
629 else:
630 return False
630 return False
631
631
632 def stop(self):
632 def stop(self):
633 """Stop the channel's event loop and join its thread."""
633 """Stop the channel's event loop and join its thread."""
634 self._running = False
634 self._running = False
635 super(HBChannel, self).stop()
635 super(HBChannel, self).stop()
636
636
637 def call_handlers(self, since_last_heartbeat):
637 def call_handlers(self, since_last_heartbeat):
638 """This method is called in the ioloop thread when a message arrives.
638 """This method is called in the ioloop thread when a message arrives.
639
639
640 Subclasses should override this method to handle incoming messages.
640 Subclasses should override this method to handle incoming messages.
641 It is important to remember that this method is called in the thread
641 It is important to remember that this method is called in the thread
642 so that some logic must be done to ensure that the application level
642 so that some logic must be done to ensure that the application level
643 handlers are called in the application thread.
643 handlers are called in the application thread.
644 """
644 """
645 raise NotImplementedError('call_handlers must be defined in a subclass.')
645 raise NotImplementedError('call_handlers must be defined in a subclass.')
646
646
647
647
648 #-----------------------------------------------------------------------------
648 #-----------------------------------------------------------------------------
649 # Main kernel manager class
649 # Main kernel manager class
650 #-----------------------------------------------------------------------------
650 #-----------------------------------------------------------------------------
651
651
652 class KernelManager(Configurable):
652 class KernelManager(Configurable):
653 """Manages a single kernel on this host along with its channels.
653 """Manages a single kernel on this host along with its channels.
654
654
655 There are four channels associated with each kernel:
655 There are four channels associated with each kernel:
656
656
657 * shell: for request/reply calls to the kernel.
657 * shell: for request/reply calls to the kernel.
658 * iopub: for the kernel to publish results to frontends.
658 * iopub: for the kernel to publish results to frontends.
659 * hb: for monitoring the kernel's heartbeat.
659 * hb: for monitoring the kernel's heartbeat.
660 * stdin: for frontends to reply to raw_input calls in the kernel.
660 * stdin: for frontends to reply to raw_input calls in the kernel.
661
661
662 The usage of the channels that this class manages is optional. It is
662 The usage of the channels that this class manages is optional. It is
663 entirely possible to connect to the kernels directly using ZeroMQ
663 entirely possible to connect to the kernels directly using ZeroMQ
664 sockets. These channels are useful primarily for talking to a kernel
664 sockets. These channels are useful primarily for talking to a kernel
665 whose :class:`KernelManager` is in the same process.
665 whose :class:`KernelManager` is in the same process.
666
666
667 This version manages kernels started using Popen.
667 This version manages kernels started using Popen.
668 """
668 """
669 # The PyZMQ Context to use for communication with the kernel.
669 # The PyZMQ Context to use for communication with the kernel.
670 context = Instance(zmq.Context)
670 context = Instance(zmq.Context)
671 def _context_default(self):
671 def _context_default(self):
672 return zmq.Context.instance()
672 return zmq.Context.instance()
673
673
674 # The Session to use for communication with the kernel.
674 # The Session to use for communication with the kernel.
675 session = Instance(Session)
675 session = Instance(Session)
676 def _session_default(self):
676 def _session_default(self):
677 return Session(config=self.config)
677 return Session(config=self.config)
678
678
679 # The kernel process with which the KernelManager is communicating.
679 # The kernel process with which the KernelManager is communicating.
680 kernel = Instance(Popen)
680 kernel = Instance(Popen)
681
681
682 # The addresses for the communication channels.
682 # The addresses for the communication channels.
683 connection_file = Unicode('')
683 connection_file = Unicode('')
684
684
685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686
686
687 ip = Unicode(LOCALHOST, config=True)
687 ip = Unicode(LOCALHOST, config=True)
688 def _ip_changed(self, name, old, new):
688 def _ip_changed(self, name, old, new):
689 if new == '*':
689 if new == '*':
690 self.ip = '0.0.0.0'
690 self.ip = '0.0.0.0'
691 shell_port = Integer(0)
691 shell_port = Integer(0)
692 iopub_port = Integer(0)
692 iopub_port = Integer(0)
693 stdin_port = Integer(0)
693 stdin_port = Integer(0)
694 hb_port = Integer(0)
694 hb_port = Integer(0)
695
695
696 # The classes to use for the various channels.
696 # The classes to use for the various channels.
697 shell_channel_class = Type(ShellChannel)
697 shell_channel_class = Type(ShellChannel)
698 iopub_channel_class = Type(IOPubChannel)
698 iopub_channel_class = Type(IOPubChannel)
699 stdin_channel_class = Type(StdInChannel)
699 stdin_channel_class = Type(StdInChannel)
700 hb_channel_class = Type(HBChannel)
700 hb_channel_class = Type(HBChannel)
701
701
702 # Protected traits.
702 # Protected traits.
703 _launch_args = Any
703 _launch_args = Any
704 _shell_channel = Any
704 _shell_channel = Any
705 _iopub_channel = Any
705 _iopub_channel = Any
706 _stdin_channel = Any
706 _stdin_channel = Any
707 _hb_channel = Any
707 _hb_channel = Any
708 _connection_file_written=Bool(False)
708 _connection_file_written=Bool(False)
709
710 def __del__(self):
711 self.cleanup_connection_file()
709
712
710 #--------------------------------------------------------------------------
713 #--------------------------------------------------------------------------
711 # Channel management methods:
714 # Channel management methods:
712 #--------------------------------------------------------------------------
715 #--------------------------------------------------------------------------
713
716
714 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
717 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
715 """Starts the channels for this kernel.
718 """Starts the channels for this kernel.
716
719
717 This will create the channels if they do not exist and then start
720 This will create the channels if they do not exist and then start
718 them (their activity runs in a thread). If port numbers of 0 are
721 them (their activity runs in a thread). If port numbers of 0 are
719 being used (random ports) then you must first call
722 being used (random ports) then you must first call
720 :method:`start_kernel`. If the channels have been stopped and you
723 :method:`start_kernel`. If the channels have been stopped and you
721 call this, :class:`RuntimeError` will be raised.
724 call this, :class:`RuntimeError` will be raised.
722 """
725 """
723 if shell:
726 if shell:
724 self.shell_channel.start()
727 self.shell_channel.start()
725 if iopub:
728 if iopub:
726 self.iopub_channel.start()
729 self.iopub_channel.start()
727 if stdin:
730 if stdin:
728 self.stdin_channel.start()
731 self.stdin_channel.start()
729 self.shell_channel.allow_stdin = True
732 self.shell_channel.allow_stdin = True
730 else:
733 else:
731 self.shell_channel.allow_stdin = False
734 self.shell_channel.allow_stdin = False
732 if hb:
735 if hb:
733 self.hb_channel.start()
736 self.hb_channel.start()
734
737
735 def stop_channels(self):
738 def stop_channels(self):
736 """Stops all the running channels for this kernel.
739 """Stops all the running channels for this kernel.
737
740
738 This stops their event loops and joins their threads.
741 This stops their event loops and joins their threads.
739 """
742 """
740 if self.shell_channel.is_alive():
743 if self.shell_channel.is_alive():
741 self.shell_channel.stop()
744 self.shell_channel.stop()
742 if self.iopub_channel.is_alive():
745 if self.iopub_channel.is_alive():
743 self.iopub_channel.stop()
746 self.iopub_channel.stop()
744 if self.stdin_channel.is_alive():
747 if self.stdin_channel.is_alive():
745 self.stdin_channel.stop()
748 self.stdin_channel.stop()
746 if self.hb_channel.is_alive():
749 if self.hb_channel.is_alive():
747 self.hb_channel.stop()
750 self.hb_channel.stop()
748
751
749 @property
752 @property
750 def channels_running(self):
753 def channels_running(self):
751 """Are any of the channels created and running?"""
754 """Are any of the channels created and running?"""
752 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
755 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
753 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
756 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
754
757
755 def _make_url(self, port):
758 def _make_url(self, port):
756 """Make a zmq url with a port.
759 """Make a zmq url with a port.
757
760
758 There are two cases that this handles:
761 There are two cases that this handles:
759
762
760 * tcp: tcp://ip:port
763 * tcp: tcp://ip:port
761 * ipc: ipc://ip-port
764 * ipc: ipc://ip-port
762 """
765 """
763 if self.transport == 'tcp':
766 if self.transport == 'tcp':
764 return "tcp://%s:%i" % (self.ip, port)
767 return "tcp://%s:%i" % (self.ip, port)
765 else:
768 else:
766 return "%s://%s-%s" % (self.transport, self.ip, port)
769 return "%s://%s-%s" % (self.transport, self.ip, port)
767
770
768 @property
771 @property
769 def shell_channel(self):
772 def shell_channel(self):
770 """Get the shell channel object for this kernel."""
773 """Get the shell channel object for this kernel."""
771 if self._shell_channel is None:
774 if self._shell_channel is None:
772 self._shell_channel = self.shell_channel_class(
775 self._shell_channel = self.shell_channel_class(
773 self.context, self.session, self._make_url(self.shell_port)
776 self.context, self.session, self._make_url(self.shell_port)
774 )
777 )
775 return self._shell_channel
778 return self._shell_channel
776
779
777 @property
780 @property
778 def iopub_channel(self):
781 def iopub_channel(self):
779 """Get the iopub channel object for this kernel."""
782 """Get the iopub channel object for this kernel."""
780 if self._iopub_channel is None:
783 if self._iopub_channel is None:
781 self._iopub_channel = self.iopub_channel_class(
784 self._iopub_channel = self.iopub_channel_class(
782 self.context, self.session, self._make_url(self.iopub_port)
785 self.context, self.session, self._make_url(self.iopub_port)
783 )
786 )
784 return self._iopub_channel
787 return self._iopub_channel
785
788
786 @property
789 @property
787 def stdin_channel(self):
790 def stdin_channel(self):
788 """Get the stdin channel object for this kernel."""
791 """Get the stdin channel object for this kernel."""
789 if self._stdin_channel is None:
792 if self._stdin_channel is None:
790 self._stdin_channel = self.stdin_channel_class(
793 self._stdin_channel = self.stdin_channel_class(
791 self.context, self.session, self._make_url(self.stdin_port)
794 self.context, self.session, self._make_url(self.stdin_port)
792 )
795 )
793 return self._stdin_channel
796 return self._stdin_channel
794
797
795 @property
798 @property
796 def hb_channel(self):
799 def hb_channel(self):
797 """Get the hb channel object for this kernel."""
800 """Get the hb channel object for this kernel."""
798 if self._hb_channel is None:
801 if self._hb_channel is None:
799 self._hb_channel = self.hb_channel_class(
802 self._hb_channel = self.hb_channel_class(
800 self.context, self.session, self._make_url(self.hb_port)
803 self.context, self.session, self._make_url(self.hb_port)
801 )
804 )
802 return self._hb_channel
805 return self._hb_channel
803
806
804 #--------------------------------------------------------------------------
807 #--------------------------------------------------------------------------
805 # Connection and ipc file management
808 # Connection and ipc file management
806 #--------------------------------------------------------------------------
809 #--------------------------------------------------------------------------
807
810
808 def cleanup_connection_file(self):
811 def cleanup_connection_file(self):
809 """Cleanup connection file *if we wrote it*
812 """Cleanup connection file *if we wrote it*
810
813
811 Will not raise if the connection file was already removed somehow.
814 Will not raise if the connection file was already removed somehow.
812 """
815 """
813 if self._connection_file_written:
816 if self._connection_file_written:
814 # cleanup connection files on full shutdown of kernel we started
817 # cleanup connection files on full shutdown of kernel we started
815 self._connection_file_written = False
818 self._connection_file_written = False
816 try:
819 try:
817 os.remove(self.connection_file)
820 os.remove(self.connection_file)
818 except (IOError, OSError):
821 except (IOError, OSError):
819 pass
822 pass
820
823
821 def cleanup_ipc_files(self):
824 def cleanup_ipc_files(self):
822 """Cleanup ipc files if we wrote them."""
825 """Cleanup ipc files if we wrote them."""
823 if self.transport != 'ipc':
826 if self.transport != 'ipc':
824 return
827 return
825 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
828 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
826 ipcfile = "%s-%i" % (self.ip, port)
829 ipcfile = "%s-%i" % (self.ip, port)
827 try:
830 try:
828 os.remove(ipcfile)
831 os.remove(ipcfile)
829 except (IOError, OSError):
832 except (IOError, OSError):
830 pass
833 pass
831
834
832 def load_connection_file(self):
835 def load_connection_file(self):
833 """Load connection info from JSON dict in self.connection_file."""
836 """Load connection info from JSON dict in self.connection_file."""
834 with open(self.connection_file) as f:
837 with open(self.connection_file) as f:
835 cfg = json.loads(f.read())
838 cfg = json.loads(f.read())
836
839
837 from pprint import pprint
840 from pprint import pprint
838 pprint(cfg)
841 pprint(cfg)
839 self.transport = cfg.get('transport', 'tcp')
842 self.transport = cfg.get('transport', 'tcp')
840 self.ip = cfg['ip']
843 self.ip = cfg['ip']
841 self.shell_port = cfg['shell_port']
844 self.shell_port = cfg['shell_port']
842 self.stdin_port = cfg['stdin_port']
845 self.stdin_port = cfg['stdin_port']
843 self.iopub_port = cfg['iopub_port']
846 self.iopub_port = cfg['iopub_port']
844 self.hb_port = cfg['hb_port']
847 self.hb_port = cfg['hb_port']
845 self.session.key = str_to_bytes(cfg['key'])
848 self.session.key = str_to_bytes(cfg['key'])
846
849
847 def write_connection_file(self):
850 def write_connection_file(self):
848 """Write connection info to JSON dict in self.connection_file."""
851 """Write connection info to JSON dict in self.connection_file."""
849 if self._connection_file_written:
852 if self._connection_file_written:
850 return
853 return
851 self.connection_file,cfg = write_connection_file(self.connection_file,
854 self.connection_file,cfg = write_connection_file(self.connection_file,
852 transport=self.transport, ip=self.ip, key=self.session.key,
855 transport=self.transport, ip=self.ip, key=self.session.key,
853 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
856 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
854 shell_port=self.shell_port, hb_port=self.hb_port)
857 shell_port=self.shell_port, hb_port=self.hb_port)
855 # write_connection_file also sets default ports:
858 # write_connection_file also sets default ports:
856 self.shell_port = cfg['shell_port']
859 self.shell_port = cfg['shell_port']
857 self.stdin_port = cfg['stdin_port']
860 self.stdin_port = cfg['stdin_port']
858 self.iopub_port = cfg['iopub_port']
861 self.iopub_port = cfg['iopub_port']
859 self.hb_port = cfg['hb_port']
862 self.hb_port = cfg['hb_port']
860
863
861 self._connection_file_written = True
864 self._connection_file_written = True
862
865
863 #--------------------------------------------------------------------------
866 #--------------------------------------------------------------------------
864 # Kernel management
867 # Kernel management
865 #--------------------------------------------------------------------------
868 #--------------------------------------------------------------------------
866
869
867 def start_kernel(self, **kw):
870 def start_kernel(self, **kw):
868 """Starts a kernel on this host in a separate process.
871 """Starts a kernel on this host in a separate process.
869
872
870 If random ports (port=0) are being used, this method must be called
873 If random ports (port=0) are being used, this method must be called
871 before the channels are created.
874 before the channels are created.
872
875
873 Parameters:
876 Parameters:
874 -----------
877 -----------
875 launcher : callable, optional (default None)
878 launcher : callable, optional (default None)
876 A custom function for launching the kernel process (generally a
879 A custom function for launching the kernel process (generally a
877 wrapper around ``entry_point.base_launch_kernel``). In most cases,
880 wrapper around ``entry_point.base_launch_kernel``). In most cases,
878 it should not be necessary to use this parameter.
881 it should not be necessary to use this parameter.
879
882
880 **kw : optional
883 **kw : optional
881 keyword arguments that are passed down into the launcher
884 keyword arguments that are passed down into the launcher
882 callable.
885 callable.
883 """
886 """
884 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
887 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
885 raise RuntimeError("Can only launch a kernel on a local interface. "
888 raise RuntimeError("Can only launch a kernel on a local interface. "
886 "Make sure that the '*_address' attributes are "
889 "Make sure that the '*_address' attributes are "
887 "configured properly. "
890 "configured properly. "
888 "Currently valid addresses are: %s"%LOCAL_IPS
891 "Currently valid addresses are: %s"%LOCAL_IPS
889 )
892 )
890
893
891 # write connection file / get default ports
894 # write connection file / get default ports
892 self.write_connection_file()
895 self.write_connection_file()
893
896
894 self._launch_args = kw.copy()
897 self._launch_args = kw.copy()
895 launch_kernel = kw.pop('launcher', None)
898 launch_kernel = kw.pop('launcher', None)
896 if launch_kernel is None:
899 if launch_kernel is None:
897 from ipkernel import launch_kernel
900 from ipkernel import launch_kernel
898 self.kernel = launch_kernel(fname=self.connection_file, **kw)
901 self.kernel = launch_kernel(fname=self.connection_file, **kw)
899
902
900 def shutdown_kernel(self, now=False, restart=False):
903 def shutdown_kernel(self, now=False, restart=False):
901 """Attempts to the stop the kernel process cleanly.
904 """Attempts to the stop the kernel process cleanly.
902
905
903 This attempts to shutdown the kernels cleanly by:
906 This attempts to shutdown the kernels cleanly by:
904
907
905 1. Sending it a shutdown message over the shell channel.
908 1. Sending it a shutdown message over the shell channel.
906 2. If that fails, the kernel is shutdown forcibly by sending it
909 2. If that fails, the kernel is shutdown forcibly by sending it
907 a signal.
910 a signal.
908
911
909 Parameters:
912 Parameters:
910 -----------
913 -----------
911 now : bool
914 now : bool
912 Should the kernel be forcible killed *now*. This skips the
915 Should the kernel be forcible killed *now*. This skips the
913 first, nice shutdown attempt.
916 first, nice shutdown attempt.
914 restart: bool
917 restart: bool
915 Will this kernel be restarted after it is shutdown. When this
918 Will this kernel be restarted after it is shutdown. When this
916 is True, connection files will not be cleaned up.
919 is True, connection files will not be cleaned up.
917 """
920 """
918 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
921 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
919 if sys.platform == 'win32':
922 if sys.platform == 'win32':
920 self._kill_kernel()
923 self._kill_kernel()
921 return
924 return
922
925
923 # Pause the heart beat channel if it exists.
926 # Pause the heart beat channel if it exists.
924 if self._hb_channel is not None:
927 if self._hb_channel is not None:
925 self._hb_channel.pause()
928 self._hb_channel.pause()
926
929
927 if now:
930 if now:
928 if self.has_kernel:
931 if self.has_kernel:
929 self._kill_kernel()
932 self._kill_kernel()
930 else:
933 else:
931 # Don't send any additional kernel kill messages immediately, to give
934 # Don't send any additional kernel kill messages immediately, to give
932 # the kernel a chance to properly execute shutdown actions. Wait for at
935 # the kernel a chance to properly execute shutdown actions. Wait for at
933 # most 1s, checking every 0.1s.
936 # most 1s, checking every 0.1s.
934 self.shell_channel.shutdown(restart=restart)
937 self.shell_channel.shutdown(restart=restart)
935 for i in range(10):
938 for i in range(10):
936 if self.is_alive:
939 if self.is_alive:
937 time.sleep(0.1)
940 time.sleep(0.1)
938 else:
941 else:
939 break
942 break
940 else:
943 else:
941 # OK, we've waited long enough.
944 # OK, we've waited long enough.
942 if self.has_kernel:
945 if self.has_kernel:
943 self._kill_kernel()
946 self._kill_kernel()
944
947
945 if not restart:
948 if not restart:
946 self.cleanup_connection_file()
949 self.cleanup_connection_file()
947 self.cleanup_ipc_files()
950 self.cleanup_ipc_files()
948 else:
951 else:
949 self.cleanup_ipc_files()
952 self.cleanup_ipc_files()
950
953
951 def restart_kernel(self, now=False, **kw):
954 def restart_kernel(self, now=False, **kw):
952 """Restarts a kernel with the arguments that were used to launch it.
955 """Restarts a kernel with the arguments that were used to launch it.
953
956
954 If the old kernel was launched with random ports, the same ports will be
957 If the old kernel was launched with random ports, the same ports will be
955 used for the new kernel. The same connection file is used again.
958 used for the new kernel. The same connection file is used again.
956
959
957 Parameters
960 Parameters
958 ----------
961 ----------
959 now : bool, optional
962 now : bool, optional
960 If True, the kernel is forcefully restarted *immediately*, without
963 If True, the kernel is forcefully restarted *immediately*, without
961 having a chance to do any cleanup action. Otherwise the kernel is
964 having a chance to do any cleanup action. Otherwise the kernel is
962 given 1s to clean up before a forceful restart is issued.
965 given 1s to clean up before a forceful restart is issued.
963
966
964 In all cases the kernel is restarted, the only difference is whether
967 In all cases the kernel is restarted, the only difference is whether
965 it is given a chance to perform a clean shutdown or not.
968 it is given a chance to perform a clean shutdown or not.
966
969
967 **kw : optional
970 **kw : optional
968 Any options specified here will overwrite those used to launch the
971 Any options specified here will overwrite those used to launch the
969 kernel.
972 kernel.
970 """
973 """
971 if self._launch_args is None:
974 if self._launch_args is None:
972 raise RuntimeError("Cannot restart the kernel. "
975 raise RuntimeError("Cannot restart the kernel. "
973 "No previous call to 'start_kernel'.")
976 "No previous call to 'start_kernel'.")
974 else:
977 else:
975 # Stop currently running kernel.
978 # Stop currently running kernel.
976 self.shutdown_kernel(now=now, restart=True)
979 self.shutdown_kernel(now=now, restart=True)
977
980
978 # Start new kernel.
981 # Start new kernel.
979 self._launch_args.update(kw)
982 self._launch_args.update(kw)
980 self.start_kernel(**self._launch_args)
983 self.start_kernel(**self._launch_args)
981
984
982 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
985 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
983 # unless there is some delay here.
986 # unless there is some delay here.
984 if sys.platform == 'win32':
987 if sys.platform == 'win32':
985 time.sleep(0.2)
988 time.sleep(0.2)
986
989
987 @property
990 @property
988 def has_kernel(self):
991 def has_kernel(self):
989 """Has a kernel been started that we are managing."""
992 """Has a kernel been started that we are managing."""
990 return self.kernel is not None
993 return self.kernel is not None
991
994
992 def _kill_kernel(self):
995 def _kill_kernel(self):
993 """Kill the running kernel.
996 """Kill the running kernel.
994
997
995 This is a private method, callers should use shutdown_kernel(now=True).
998 This is a private method, callers should use shutdown_kernel(now=True).
996 """
999 """
997 if self.has_kernel:
1000 if self.has_kernel:
998 # Pause the heart beat channel if it exists.
1001 # Pause the heart beat channel if it exists.
999 if self._hb_channel is not None:
1002 if self._hb_channel is not None:
1000 self._hb_channel.pause()
1003 self._hb_channel.pause()
1001
1004
1002 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1005 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1003 # TerminateProcess() on Win32).
1006 # TerminateProcess() on Win32).
1004 try:
1007 try:
1005 self.kernel.kill()
1008 self.kernel.kill()
1006 except OSError as e:
1009 except OSError as e:
1007 # In Windows, we will get an Access Denied error if the process
1010 # In Windows, we will get an Access Denied error if the process
1008 # has already terminated. Ignore it.
1011 # has already terminated. Ignore it.
1009 if sys.platform == 'win32':
1012 if sys.platform == 'win32':
1010 if e.winerror != 5:
1013 if e.winerror != 5:
1011 raise
1014 raise
1012 # On Unix, we may get an ESRCH error if the process has already
1015 # On Unix, we may get an ESRCH error if the process has already
1013 # terminated. Ignore it.
1016 # terminated. Ignore it.
1014 else:
1017 else:
1015 from errno import ESRCH
1018 from errno import ESRCH
1016 if e.errno != ESRCH:
1019 if e.errno != ESRCH:
1017 raise
1020 raise
1018
1021
1019 # Block until the kernel terminates.
1022 # Block until the kernel terminates.
1020 self.kernel.wait()
1023 self.kernel.wait()
1021 self.kernel = None
1024 self.kernel = None
1022 else:
1025 else:
1023 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1026 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1024
1027
1025 def interrupt_kernel(self):
1028 def interrupt_kernel(self):
1026 """Interrupts the kernel by sending it a signal.
1029 """Interrupts the kernel by sending it a signal.
1027
1030
1028 Unlike ``signal_kernel``, this operation is well supported on all
1031 Unlike ``signal_kernel``, this operation is well supported on all
1029 platforms.
1032 platforms.
1030 """
1033 """
1031 if self.has_kernel:
1034 if self.has_kernel:
1032 if sys.platform == 'win32':
1035 if sys.platform == 'win32':
1033 from parentpoller import ParentPollerWindows as Poller
1036 from parentpoller import ParentPollerWindows as Poller
1034 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1037 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1035 else:
1038 else:
1036 self.kernel.send_signal(signal.SIGINT)
1039 self.kernel.send_signal(signal.SIGINT)
1037 else:
1040 else:
1038 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1041 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1039
1042
1040 def signal_kernel(self, signum):
1043 def signal_kernel(self, signum):
1041 """Sends a signal to the kernel.
1044 """Sends a signal to the kernel.
1042
1045
1043 Note that since only SIGTERM is supported on Windows, this function is
1046 Note that since only SIGTERM is supported on Windows, this function is
1044 only useful on Unix systems.
1047 only useful on Unix systems.
1045 """
1048 """
1046 if self.has_kernel:
1049 if self.has_kernel:
1047 self.kernel.send_signal(signum)
1050 self.kernel.send_signal(signum)
1048 else:
1051 else:
1049 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1052 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1050
1053
1051 @property
1054 @property
1052 def is_alive(self):
1055 def is_alive(self):
1053 """Is the kernel process still running?"""
1056 """Is the kernel process still running?"""
1054 if self.has_kernel:
1057 if self.has_kernel:
1055 if self.kernel.poll() is None:
1058 if self.kernel.poll() is None:
1056 return True
1059 return True
1057 else:
1060 else:
1058 return False
1061 return False
1059 elif self._hb_channel is not None:
1062 elif self._hb_channel is not None:
1060 # We didn't start the kernel with this KernelManager so we
1063 # We didn't start the kernel with this KernelManager so we
1061 # use the heartbeat.
1064 # use the heartbeat.
1062 return self._hb_channel.is_beating()
1065 return self._hb_channel.is_beating()
1063 else:
1066 else:
1064 # no heartbeat and not local, we can't tell if it's running,
1067 # no heartbeat and not local, we can't tell if it's running,
1065 # so naively return True
1068 # so naively return True
1066 return True
1069 return True
1067
1070
1068
1071
1069 #-----------------------------------------------------------------------------
1072 #-----------------------------------------------------------------------------
1070 # ABC Registration
1073 # ABC Registration
1071 #-----------------------------------------------------------------------------
1074 #-----------------------------------------------------------------------------
1072
1075
1073 ShellChannelABC.register(ShellChannel)
1076 ShellChannelABC.register(ShellChannel)
1074 IOPubChannelABC.register(IOPubChannel)
1077 IOPubChannelABC.register(IOPubChannel)
1075 HBChannelABC.register(HBChannel)
1078 HBChannelABC.register(HBChannel)
1076 StdInChannelABC.register(StdInChannel)
1079 StdInChannelABC.register(StdInChannel)
1077 KernelManagerABC.register(KernelManager)
1080 KernelManagerABC.register(KernelManager)
1078
1081
General Comments 0
You need to be logged in to leave comments. Login now