##// END OF EJS Templates
allow custom kernel_cmd in KernelManager
MinRK -
Show More
@@ -1,1094 +1,1126 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 import json
21 import json
22 from subprocess import Popen
22 from subprocess import Popen
23 import os
23 import os
24 import signal
24 import signal
25 import sys
25 import sys
26 from threading import Thread
26 from threading import Thread
27 import time
27 import time
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
34 from zmq.eventloop import ioloop, zmqstream
35
35
36 # Local imports.
36 # Local imports.
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 from IPython.utils.traitlets import (
39 from IPython.utils.traitlets import (
40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
40 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
41 )
41 )
42 from IPython.utils.py3compat import str_to_bytes
42 from IPython.utils.py3compat import str_to_bytes
43 from IPython.zmq.entry_point import write_connection_file
43 from IPython.zmq.entry_point import (
44 write_connection_file,
45 make_kernel_cmd,
46 launch_kernel,
47 )
44 from session import Session
48 from session import Session
45 from IPython.zmq.kernelmanagerabc import (
49 from IPython.zmq.kernelmanagerabc import (
46 ShellChannelABC, IOPubChannelABC,
50 ShellChannelABC, IOPubChannelABC,
47 HBChannelABC, StdInChannelABC,
51 HBChannelABC, StdInChannelABC,
48 KernelManagerABC
52 KernelManagerABC
49 )
53 )
50
54
51
55
52 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
53 # Constants and exceptions
57 # Constants and exceptions
54 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
55
59
56 class InvalidPortNumber(Exception):
60 class InvalidPortNumber(Exception):
57 pass
61 pass
58
62
59 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
60 # Utility functions
64 # Utility functions
61 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
62
66
63 # some utilities to validate message structure, these might get moved elsewhere
67 # some utilities to validate message structure, these might get moved elsewhere
64 # if they prove to have more generic utility
68 # if they prove to have more generic utility
65
69
66 def validate_string_list(lst):
70 def validate_string_list(lst):
67 """Validate that the input is a list of strings.
71 """Validate that the input is a list of strings.
68
72
69 Raises ValueError if not."""
73 Raises ValueError if not."""
70 if not isinstance(lst, list):
74 if not isinstance(lst, list):
71 raise ValueError('input %r must be a list' % lst)
75 raise ValueError('input %r must be a list' % lst)
72 for x in lst:
76 for x in lst:
73 if not isinstance(x, basestring):
77 if not isinstance(x, basestring):
74 raise ValueError('element %r in list must be a string' % x)
78 raise ValueError('element %r in list must be a string' % x)
75
79
76
80
77 def validate_string_dict(dct):
81 def validate_string_dict(dct):
78 """Validate that the input is a dict with string keys and values.
82 """Validate that the input is a dict with string keys and values.
79
83
80 Raises ValueError if not."""
84 Raises ValueError if not."""
81 for k,v in dct.iteritems():
85 for k,v in dct.iteritems():
82 if not isinstance(k, basestring):
86 if not isinstance(k, basestring):
83 raise ValueError('key %r in dict must be a string' % k)
87 raise ValueError('key %r in dict must be a string' % k)
84 if not isinstance(v, basestring):
88 if not isinstance(v, basestring):
85 raise ValueError('value %r in dict must be a string' % v)
89 raise ValueError('value %r in dict must be a string' % v)
86
90
87
91
88 #-----------------------------------------------------------------------------
92 #-----------------------------------------------------------------------------
89 # ZMQ Socket Channel classes
93 # ZMQ Socket Channel classes
90 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
91
95
92 class ZMQSocketChannel(Thread):
96 class ZMQSocketChannel(Thread):
93 """The base class for the channels that use ZMQ sockets."""
97 """The base class for the channels that use ZMQ sockets."""
94 context = None
98 context = None
95 session = None
99 session = None
96 socket = None
100 socket = None
97 ioloop = None
101 ioloop = None
98 stream = None
102 stream = None
99 _address = None
103 _address = None
100 _exiting = False
104 _exiting = False
101
105
102 def __init__(self, context, session, address):
106 def __init__(self, context, session, address):
103 """Create a channel.
107 """Create a channel.
104
108
105 Parameters
109 Parameters
106 ----------
110 ----------
107 context : :class:`zmq.Context`
111 context : :class:`zmq.Context`
108 The ZMQ context to use.
112 The ZMQ context to use.
109 session : :class:`session.Session`
113 session : :class:`session.Session`
110 The session to use.
114 The session to use.
111 address : zmq url
115 address : zmq url
112 Standard (ip, port) tuple that the kernel is listening on.
116 Standard (ip, port) tuple that the kernel is listening on.
113 """
117 """
114 super(ZMQSocketChannel, self).__init__()
118 super(ZMQSocketChannel, self).__init__()
115 self.daemon = True
119 self.daemon = True
116
120
117 self.context = context
121 self.context = context
118 self.session = session
122 self.session = session
119 if isinstance(address, tuple):
123 if isinstance(address, tuple):
120 if address[1] == 0:
124 if address[1] == 0:
121 message = 'The port number for a channel cannot be 0.'
125 message = 'The port number for a channel cannot be 0.'
122 raise InvalidPortNumber(message)
126 raise InvalidPortNumber(message)
123 address = "tcp://%s:%i" % address
127 address = "tcp://%s:%i" % address
124 self._address = address
128 self._address = address
125 atexit.register(self._notice_exit)
129 atexit.register(self._notice_exit)
126
130
127 def _notice_exit(self):
131 def _notice_exit(self):
128 self._exiting = True
132 self._exiting = True
129
133
130 def _run_loop(self):
134 def _run_loop(self):
131 """Run my loop, ignoring EINTR events in the poller"""
135 """Run my loop, ignoring EINTR events in the poller"""
132 while True:
136 while True:
133 try:
137 try:
134 self.ioloop.start()
138 self.ioloop.start()
135 except ZMQError as e:
139 except ZMQError as e:
136 if e.errno == errno.EINTR:
140 if e.errno == errno.EINTR:
137 continue
141 continue
138 else:
142 else:
139 raise
143 raise
140 except Exception:
144 except Exception:
141 if self._exiting:
145 if self._exiting:
142 break
146 break
143 else:
147 else:
144 raise
148 raise
145 else:
149 else:
146 break
150 break
147
151
148 def stop(self):
152 def stop(self):
149 """Stop the channel's event loop and join its thread.
153 """Stop the channel's event loop and join its thread.
150
154
151 This calls :method:`Thread.join` and returns when the thread
155 This calls :method:`Thread.join` and returns when the thread
152 terminates. :class:`RuntimeError` will be raised if
156 terminates. :class:`RuntimeError` will be raised if
153 :method:`self.start` is called again.
157 :method:`self.start` is called again.
154 """
158 """
155 self.join()
159 self.join()
156
160
157 @property
161 @property
158 def address(self):
162 def address(self):
159 """Get the channel's address as a zmq url string.
163 """Get the channel's address as a zmq url string.
160
164
161 These URLS have the form: 'tcp://127.0.0.1:5555'.
165 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 """
166 """
163 return self._address
167 return self._address
164
168
165 def _queue_send(self, msg):
169 def _queue_send(self, msg):
166 """Queue a message to be sent from the IOLoop's thread.
170 """Queue a message to be sent from the IOLoop's thread.
167
171
168 Parameters
172 Parameters
169 ----------
173 ----------
170 msg : message to send
174 msg : message to send
171
175
172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
176 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 thread control of the action.
177 thread control of the action.
174 """
178 """
175 def thread_send():
179 def thread_send():
176 self.session.send(self.stream, msg)
180 self.session.send(self.stream, msg)
177 self.ioloop.add_callback(thread_send)
181 self.ioloop.add_callback(thread_send)
178
182
179 def _handle_recv(self, msg):
183 def _handle_recv(self, msg):
180 """Callback for stream.on_recv.
184 """Callback for stream.on_recv.
181
185
182 Unpacks message, and calls handlers with it.
186 Unpacks message, and calls handlers with it.
183 """
187 """
184 ident,smsg = self.session.feed_identities(msg)
188 ident,smsg = self.session.feed_identities(msg)
185 self.call_handlers(self.session.unserialize(smsg))
189 self.call_handlers(self.session.unserialize(smsg))
186
190
187
191
188
192
189 class ShellChannel(ZMQSocketChannel):
193 class ShellChannel(ZMQSocketChannel):
190 """The shell channel for issuing request/replies to the kernel."""
194 """The shell channel for issuing request/replies to the kernel."""
191
195
192 command_queue = None
196 command_queue = None
193 # flag for whether execute requests should be allowed to call raw_input:
197 # flag for whether execute requests should be allowed to call raw_input:
194 allow_stdin = True
198 allow_stdin = True
195
199
196 def __init__(self, context, session, address):
200 def __init__(self, context, session, address):
197 super(ShellChannel, self).__init__(context, session, address)
201 super(ShellChannel, self).__init__(context, session, address)
198 self.ioloop = ioloop.IOLoop()
202 self.ioloop = ioloop.IOLoop()
199
203
200 def run(self):
204 def run(self):
201 """The thread's main activity. Call start() instead."""
205 """The thread's main activity. Call start() instead."""
202 self.socket = self.context.socket(zmq.DEALER)
206 self.socket = self.context.socket(zmq.DEALER)
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
207 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.connect(self.address)
208 self.socket.connect(self.address)
205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
209 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 self.stream.on_recv(self._handle_recv)
210 self.stream.on_recv(self._handle_recv)
207 self._run_loop()
211 self._run_loop()
208 try:
212 try:
209 self.socket.close()
213 self.socket.close()
210 except:
214 except:
211 pass
215 pass
212
216
213 def stop(self):
217 def stop(self):
214 """Stop the channel's event loop and join its thread."""
218 """Stop the channel's event loop and join its thread."""
215 self.ioloop.stop()
219 self.ioloop.stop()
216 super(ShellChannel, self).stop()
220 super(ShellChannel, self).stop()
217
221
218 def call_handlers(self, msg):
222 def call_handlers(self, msg):
219 """This method is called in the ioloop thread when a message arrives.
223 """This method is called in the ioloop thread when a message arrives.
220
224
221 Subclasses should override this method to handle incoming messages.
225 Subclasses should override this method to handle incoming messages.
222 It is important to remember that this method is called in the thread
226 It is important to remember that this method is called in the thread
223 so that some logic must be done to ensure that the application leve
227 so that some logic must be done to ensure that the application leve
224 handlers are called in the application thread.
228 handlers are called in the application thread.
225 """
229 """
226 raise NotImplementedError('call_handlers must be defined in a subclass.')
230 raise NotImplementedError('call_handlers must be defined in a subclass.')
227
231
228 def execute(self, code, silent=False, store_history=True,
232 def execute(self, code, silent=False, store_history=True,
229 user_variables=None, user_expressions=None, allow_stdin=None):
233 user_variables=None, user_expressions=None, allow_stdin=None):
230 """Execute code in the kernel.
234 """Execute code in the kernel.
231
235
232 Parameters
236 Parameters
233 ----------
237 ----------
234 code : str
238 code : str
235 A string of Python code.
239 A string of Python code.
236
240
237 silent : bool, optional (default False)
241 silent : bool, optional (default False)
238 If set, the kernel will execute the code as quietly possible, and
242 If set, the kernel will execute the code as quietly possible, and
239 will force store_history to be False.
243 will force store_history to be False.
240
244
241 store_history : bool, optional (default True)
245 store_history : bool, optional (default True)
242 If set, the kernel will store command history. This is forced
246 If set, the kernel will store command history. This is forced
243 to be False if silent is True.
247 to be False if silent is True.
244
248
245 user_variables : list, optional
249 user_variables : list, optional
246 A list of variable names to pull from the user's namespace. They
250 A list of variable names to pull from the user's namespace. They
247 will come back as a dict with these names as keys and their
251 will come back as a dict with these names as keys and their
248 :func:`repr` as values.
252 :func:`repr` as values.
249
253
250 user_expressions : dict, optional
254 user_expressions : dict, optional
251 A dict mapping names to expressions to be evaluated in the user's
255 A dict mapping names to expressions to be evaluated in the user's
252 dict. The expression values are returned as strings formatted using
256 dict. The expression values are returned as strings formatted using
253 :func:`repr`.
257 :func:`repr`.
254
258
255 allow_stdin : bool, optional (default self.allow_stdin)
259 allow_stdin : bool, optional (default self.allow_stdin)
256 Flag for whether the kernel can send stdin requests to frontends.
260 Flag for whether the kernel can send stdin requests to frontends.
257
261
258 Some frontends (e.g. the Notebook) do not support stdin requests.
262 Some frontends (e.g. the Notebook) do not support stdin requests.
259 If raw_input is called from code executed from such a frontend, a
263 If raw_input is called from code executed from such a frontend, a
260 StdinNotImplementedError will be raised.
264 StdinNotImplementedError will be raised.
261
265
262 Returns
266 Returns
263 -------
267 -------
264 The msg_id of the message sent.
268 The msg_id of the message sent.
265 """
269 """
266 if user_variables is None:
270 if user_variables is None:
267 user_variables = []
271 user_variables = []
268 if user_expressions is None:
272 if user_expressions is None:
269 user_expressions = {}
273 user_expressions = {}
270 if allow_stdin is None:
274 if allow_stdin is None:
271 allow_stdin = self.allow_stdin
275 allow_stdin = self.allow_stdin
272
276
273
277
274 # Don't waste network traffic if inputs are invalid
278 # Don't waste network traffic if inputs are invalid
275 if not isinstance(code, basestring):
279 if not isinstance(code, basestring):
276 raise ValueError('code %r must be a string' % code)
280 raise ValueError('code %r must be a string' % code)
277 validate_string_list(user_variables)
281 validate_string_list(user_variables)
278 validate_string_dict(user_expressions)
282 validate_string_dict(user_expressions)
279
283
280 # Create class for content/msg creation. Related to, but possibly
284 # Create class for content/msg creation. Related to, but possibly
281 # not in Session.
285 # not in Session.
282 content = dict(code=code, silent=silent, store_history=store_history,
286 content = dict(code=code, silent=silent, store_history=store_history,
283 user_variables=user_variables,
287 user_variables=user_variables,
284 user_expressions=user_expressions,
288 user_expressions=user_expressions,
285 allow_stdin=allow_stdin,
289 allow_stdin=allow_stdin,
286 )
290 )
287 msg = self.session.msg('execute_request', content)
291 msg = self.session.msg('execute_request', content)
288 self._queue_send(msg)
292 self._queue_send(msg)
289 return msg['header']['msg_id']
293 return msg['header']['msg_id']
290
294
291 def complete(self, text, line, cursor_pos, block=None):
295 def complete(self, text, line, cursor_pos, block=None):
292 """Tab complete text in the kernel's namespace.
296 """Tab complete text in the kernel's namespace.
293
297
294 Parameters
298 Parameters
295 ----------
299 ----------
296 text : str
300 text : str
297 The text to complete.
301 The text to complete.
298 line : str
302 line : str
299 The full line of text that is the surrounding context for the
303 The full line of text that is the surrounding context for the
300 text to complete.
304 text to complete.
301 cursor_pos : int
305 cursor_pos : int
302 The position of the cursor in the line where the completion was
306 The position of the cursor in the line where the completion was
303 requested.
307 requested.
304 block : str, optional
308 block : str, optional
305 The full block of code in which the completion is being requested.
309 The full block of code in which the completion is being requested.
306
310
307 Returns
311 Returns
308 -------
312 -------
309 The msg_id of the message sent.
313 The msg_id of the message sent.
310 """
314 """
311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
315 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 msg = self.session.msg('complete_request', content)
316 msg = self.session.msg('complete_request', content)
313 self._queue_send(msg)
317 self._queue_send(msg)
314 return msg['header']['msg_id']
318 return msg['header']['msg_id']
315
319
316 def object_info(self, oname, detail_level=0):
320 def object_info(self, oname, detail_level=0):
317 """Get metadata information about an object in the kernel's namespace.
321 """Get metadata information about an object in the kernel's namespace.
318
322
319 Parameters
323 Parameters
320 ----------
324 ----------
321 oname : str
325 oname : str
322 A string specifying the object name.
326 A string specifying the object name.
323 detail_level : int, optional
327 detail_level : int, optional
324 The level of detail for the introspection (0-2)
328 The level of detail for the introspection (0-2)
325
329
326 Returns
330 Returns
327 -------
331 -------
328 The msg_id of the message sent.
332 The msg_id of the message sent.
329 """
333 """
330 content = dict(oname=oname, detail_level=detail_level)
334 content = dict(oname=oname, detail_level=detail_level)
331 msg = self.session.msg('object_info_request', content)
335 msg = self.session.msg('object_info_request', content)
332 self._queue_send(msg)
336 self._queue_send(msg)
333 return msg['header']['msg_id']
337 return msg['header']['msg_id']
334
338
335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
339 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 """Get entries from the kernel's history list.
340 """Get entries from the kernel's history list.
337
341
338 Parameters
342 Parameters
339 ----------
343 ----------
340 raw : bool
344 raw : bool
341 If True, return the raw input.
345 If True, return the raw input.
342 output : bool
346 output : bool
343 If True, then return the output as well.
347 If True, then return the output as well.
344 hist_access_type : str
348 hist_access_type : str
345 'range' (fill in session, start and stop params), 'tail' (fill in n)
349 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 or 'search' (fill in pattern param).
350 or 'search' (fill in pattern param).
347
351
348 session : int
352 session : int
349 For a range request, the session from which to get lines. Session
353 For a range request, the session from which to get lines. Session
350 numbers are positive integers; negative ones count back from the
354 numbers are positive integers; negative ones count back from the
351 current session.
355 current session.
352 start : int
356 start : int
353 The first line number of a history range.
357 The first line number of a history range.
354 stop : int
358 stop : int
355 The final (excluded) line number of a history range.
359 The final (excluded) line number of a history range.
356
360
357 n : int
361 n : int
358 The number of lines of history to get for a tail request.
362 The number of lines of history to get for a tail request.
359
363
360 pattern : str
364 pattern : str
361 The glob-syntax pattern for a search request.
365 The glob-syntax pattern for a search request.
362
366
363 Returns
367 Returns
364 -------
368 -------
365 The msg_id of the message sent.
369 The msg_id of the message sent.
366 """
370 """
367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
371 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 **kwargs)
372 **kwargs)
369 msg = self.session.msg('history_request', content)
373 msg = self.session.msg('history_request', content)
370 self._queue_send(msg)
374 self._queue_send(msg)
371 return msg['header']['msg_id']
375 return msg['header']['msg_id']
372
376
373 def kernel_info(self):
377 def kernel_info(self):
374 """Request kernel info."""
378 """Request kernel info."""
375 msg = self.session.msg('kernel_info_request')
379 msg = self.session.msg('kernel_info_request')
376 self._queue_send(msg)
380 self._queue_send(msg)
377 return msg['header']['msg_id']
381 return msg['header']['msg_id']
378
382
379 def shutdown(self, restart=False):
383 def shutdown(self, restart=False):
380 """Request an immediate kernel shutdown.
384 """Request an immediate kernel shutdown.
381
385
382 Upon receipt of the (empty) reply, client code can safely assume that
386 Upon receipt of the (empty) reply, client code can safely assume that
383 the kernel has shut down and it's safe to forcefully terminate it if
387 the kernel has shut down and it's safe to forcefully terminate it if
384 it's still alive.
388 it's still alive.
385
389
386 The kernel will send the reply via a function registered with Python's
390 The kernel will send the reply via a function registered with Python's
387 atexit module, ensuring it's truly done as the kernel is done with all
391 atexit module, ensuring it's truly done as the kernel is done with all
388 normal operation.
392 normal operation.
389 """
393 """
390 # Send quit message to kernel. Once we implement kernel-side setattr,
394 # Send quit message to kernel. Once we implement kernel-side setattr,
391 # this should probably be done that way, but for now this will do.
395 # this should probably be done that way, but for now this will do.
392 msg = self.session.msg('shutdown_request', {'restart':restart})
396 msg = self.session.msg('shutdown_request', {'restart':restart})
393 self._queue_send(msg)
397 self._queue_send(msg)
394 return msg['header']['msg_id']
398 return msg['header']['msg_id']
395
399
396
400
397
401
398 class IOPubChannel(ZMQSocketChannel):
402 class IOPubChannel(ZMQSocketChannel):
399 """The iopub channel which listens for messages that the kernel publishes.
403 """The iopub channel which listens for messages that the kernel publishes.
400
404
401 This channel is where all output is published to frontends.
405 This channel is where all output is published to frontends.
402 """
406 """
403
407
404 def __init__(self, context, session, address):
408 def __init__(self, context, session, address):
405 super(IOPubChannel, self).__init__(context, session, address)
409 super(IOPubChannel, self).__init__(context, session, address)
406 self.ioloop = ioloop.IOLoop()
410 self.ioloop = ioloop.IOLoop()
407
411
408 def run(self):
412 def run(self):
409 """The thread's main activity. Call start() instead."""
413 """The thread's main activity. Call start() instead."""
410 self.socket = self.context.socket(zmq.SUB)
414 self.socket = self.context.socket(zmq.SUB)
411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
415 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
416 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 self.socket.connect(self.address)
417 self.socket.connect(self.address)
414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
418 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 self.stream.on_recv(self._handle_recv)
419 self.stream.on_recv(self._handle_recv)
416 self._run_loop()
420 self._run_loop()
417 try:
421 try:
418 self.socket.close()
422 self.socket.close()
419 except:
423 except:
420 pass
424 pass
421
425
422 def stop(self):
426 def stop(self):
423 """Stop the channel's event loop and join its thread."""
427 """Stop the channel's event loop and join its thread."""
424 self.ioloop.stop()
428 self.ioloop.stop()
425 super(IOPubChannel, self).stop()
429 super(IOPubChannel, self).stop()
426
430
427 def call_handlers(self, msg):
431 def call_handlers(self, msg):
428 """This method is called in the ioloop thread when a message arrives.
432 """This method is called in the ioloop thread when a message arrives.
429
433
430 Subclasses should override this method to handle incoming messages.
434 Subclasses should override this method to handle incoming messages.
431 It is important to remember that this method is called in the thread
435 It is important to remember that this method is called in the thread
432 so that some logic must be done to ensure that the application leve
436 so that some logic must be done to ensure that the application leve
433 handlers are called in the application thread.
437 handlers are called in the application thread.
434 """
438 """
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
439 raise NotImplementedError('call_handlers must be defined in a subclass.')
436
440
437 def flush(self, timeout=1.0):
441 def flush(self, timeout=1.0):
438 """Immediately processes all pending messages on the iopub channel.
442 """Immediately processes all pending messages on the iopub channel.
439
443
440 Callers should use this method to ensure that :method:`call_handlers`
444 Callers should use this method to ensure that :method:`call_handlers`
441 has been called for all messages that have been received on the
445 has been called for all messages that have been received on the
442 0MQ SUB socket of this channel.
446 0MQ SUB socket of this channel.
443
447
444 This method is thread safe.
448 This method is thread safe.
445
449
446 Parameters
450 Parameters
447 ----------
451 ----------
448 timeout : float, optional
452 timeout : float, optional
449 The maximum amount of time to spend flushing, in seconds. The
453 The maximum amount of time to spend flushing, in seconds. The
450 default is one second.
454 default is one second.
451 """
455 """
452 # We do the IOLoop callback process twice to ensure that the IOLoop
456 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # gets to perform at least one full poll.
457 # gets to perform at least one full poll.
454 stop_time = time.time() + timeout
458 stop_time = time.time() + timeout
455 for i in xrange(2):
459 for i in xrange(2):
456 self._flushed = False
460 self._flushed = False
457 self.ioloop.add_callback(self._flush)
461 self.ioloop.add_callback(self._flush)
458 while not self._flushed and time.time() < stop_time:
462 while not self._flushed and time.time() < stop_time:
459 time.sleep(0.01)
463 time.sleep(0.01)
460
464
461 def _flush(self):
465 def _flush(self):
462 """Callback for :method:`self.flush`."""
466 """Callback for :method:`self.flush`."""
463 self.stream.flush()
467 self.stream.flush()
464 self._flushed = True
468 self._flushed = True
465
469
466
470
467 class StdInChannel(ZMQSocketChannel):
471 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
472 """The stdin channel to handle raw_input requests that the kernel makes."""
469
473
470 msg_queue = None
474 msg_queue = None
471
475
472 def __init__(self, context, session, address):
476 def __init__(self, context, session, address):
473 super(StdInChannel, self).__init__(context, session, address)
477 super(StdInChannel, self).__init__(context, session, address)
474 self.ioloop = ioloop.IOLoop()
478 self.ioloop = ioloop.IOLoop()
475
479
476 def run(self):
480 def run(self):
477 """The thread's main activity. Call start() instead."""
481 """The thread's main activity. Call start() instead."""
478 self.socket = self.context.socket(zmq.DEALER)
482 self.socket = self.context.socket(zmq.DEALER)
479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
483 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 self.socket.connect(self.address)
484 self.socket.connect(self.address)
481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
485 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 self.stream.on_recv(self._handle_recv)
486 self.stream.on_recv(self._handle_recv)
483 self._run_loop()
487 self._run_loop()
484 try:
488 try:
485 self.socket.close()
489 self.socket.close()
486 except:
490 except:
487 pass
491 pass
488
492
489 def stop(self):
493 def stop(self):
490 """Stop the channel's event loop and join its thread."""
494 """Stop the channel's event loop and join its thread."""
491 self.ioloop.stop()
495 self.ioloop.stop()
492 super(StdInChannel, self).stop()
496 super(StdInChannel, self).stop()
493
497
494 def call_handlers(self, msg):
498 def call_handlers(self, msg):
495 """This method is called in the ioloop thread when a message arrives.
499 """This method is called in the ioloop thread when a message arrives.
496
500
497 Subclasses should override this method to handle incoming messages.
501 Subclasses should override this method to handle incoming messages.
498 It is important to remember that this method is called in the thread
502 It is important to remember that this method is called in the thread
499 so that some logic must be done to ensure that the application leve
503 so that some logic must be done to ensure that the application leve
500 handlers are called in the application thread.
504 handlers are called in the application thread.
501 """
505 """
502 raise NotImplementedError('call_handlers must be defined in a subclass.')
506 raise NotImplementedError('call_handlers must be defined in a subclass.')
503
507
504 def input(self, string):
508 def input(self, string):
505 """Send a string of raw input to the kernel."""
509 """Send a string of raw input to the kernel."""
506 content = dict(value=string)
510 content = dict(value=string)
507 msg = self.session.msg('input_reply', content)
511 msg = self.session.msg('input_reply', content)
508 self._queue_send(msg)
512 self._queue_send(msg)
509
513
510
514
511 class HBChannel(ZMQSocketChannel):
515 class HBChannel(ZMQSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat.
516 """The heartbeat channel which monitors the kernel heartbeat.
513
517
514 Note that the heartbeat channel is paused by default. As long as you start
518 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
519 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
520 as appropriate.
517 """
521 """
518
522
519 time_to_dead = 3.0
523 time_to_dead = 3.0
520 socket = None
524 socket = None
521 poller = None
525 poller = None
522 _running = None
526 _running = None
523 _pause = None
527 _pause = None
524 _beating = None
528 _beating = None
525
529
526 def __init__(self, context, session, address):
530 def __init__(self, context, session, address):
527 super(HBChannel, self).__init__(context, session, address)
531 super(HBChannel, self).__init__(context, session, address)
528 self._running = False
532 self._running = False
529 self._pause =True
533 self._pause =True
530 self.poller = zmq.Poller()
534 self.poller = zmq.Poller()
531
535
532 def _create_socket(self):
536 def _create_socket(self):
533 if self.socket is not None:
537 if self.socket is not None:
534 # close previous socket, before opening a new one
538 # close previous socket, before opening a new one
535 self.poller.unregister(self.socket)
539 self.poller.unregister(self.socket)
536 self.socket.close()
540 self.socket.close()
537 self.socket = self.context.socket(zmq.REQ)
541 self.socket = self.context.socket(zmq.REQ)
538 self.socket.setsockopt(zmq.LINGER, 0)
542 self.socket.setsockopt(zmq.LINGER, 0)
539 self.socket.connect(self.address)
543 self.socket.connect(self.address)
540
544
541 self.poller.register(self.socket, zmq.POLLIN)
545 self.poller.register(self.socket, zmq.POLLIN)
542
546
543 def _poll(self, start_time):
547 def _poll(self, start_time):
544 """poll for heartbeat replies until we reach self.time_to_dead.
548 """poll for heartbeat replies until we reach self.time_to_dead.
545
549
546 Ignores interrupts, and returns the result of poll(), which
550 Ignores interrupts, and returns the result of poll(), which
547 will be an empty list if no messages arrived before the timeout,
551 will be an empty list if no messages arrived before the timeout,
548 or the event tuple if there is a message to receive.
552 or the event tuple if there is a message to receive.
549 """
553 """
550
554
551 until_dead = self.time_to_dead - (time.time() - start_time)
555 until_dead = self.time_to_dead - (time.time() - start_time)
552 # ensure poll at least once
556 # ensure poll at least once
553 until_dead = max(until_dead, 1e-3)
557 until_dead = max(until_dead, 1e-3)
554 events = []
558 events = []
555 while True:
559 while True:
556 try:
560 try:
557 events = self.poller.poll(1000 * until_dead)
561 events = self.poller.poll(1000 * until_dead)
558 except ZMQError as e:
562 except ZMQError as e:
559 if e.errno == errno.EINTR:
563 if e.errno == errno.EINTR:
560 # ignore interrupts during heartbeat
564 # ignore interrupts during heartbeat
561 # this may never actually happen
565 # this may never actually happen
562 until_dead = self.time_to_dead - (time.time() - start_time)
566 until_dead = self.time_to_dead - (time.time() - start_time)
563 until_dead = max(until_dead, 1e-3)
567 until_dead = max(until_dead, 1e-3)
564 pass
568 pass
565 else:
569 else:
566 raise
570 raise
567 except Exception:
571 except Exception:
568 if self._exiting:
572 if self._exiting:
569 break
573 break
570 else:
574 else:
571 raise
575 raise
572 else:
576 else:
573 break
577 break
574 return events
578 return events
575
579
576 def run(self):
580 def run(self):
577 """The thread's main activity. Call start() instead."""
581 """The thread's main activity. Call start() instead."""
578 self._create_socket()
582 self._create_socket()
579 self._running = True
583 self._running = True
580 self._beating = True
584 self._beating = True
581
585
582 while self._running:
586 while self._running:
583 if self._pause:
587 if self._pause:
584 # just sleep, and skip the rest of the loop
588 # just sleep, and skip the rest of the loop
585 time.sleep(self.time_to_dead)
589 time.sleep(self.time_to_dead)
586 continue
590 continue
587
591
588 since_last_heartbeat = 0.0
592 since_last_heartbeat = 0.0
589 # io.rprint('Ping from HB channel') # dbg
593 # io.rprint('Ping from HB channel') # dbg
590 # no need to catch EFSM here, because the previous event was
594 # no need to catch EFSM here, because the previous event was
591 # either a recv or connect, which cannot be followed by EFSM
595 # either a recv or connect, which cannot be followed by EFSM
592 self.socket.send(b'ping')
596 self.socket.send(b'ping')
593 request_time = time.time()
597 request_time = time.time()
594 ready = self._poll(request_time)
598 ready = self._poll(request_time)
595 if ready:
599 if ready:
596 self._beating = True
600 self._beating = True
597 # the poll above guarantees we have something to recv
601 # the poll above guarantees we have something to recv
598 self.socket.recv()
602 self.socket.recv()
599 # sleep the remainder of the cycle
603 # sleep the remainder of the cycle
600 remainder = self.time_to_dead - (time.time() - request_time)
604 remainder = self.time_to_dead - (time.time() - request_time)
601 if remainder > 0:
605 if remainder > 0:
602 time.sleep(remainder)
606 time.sleep(remainder)
603 continue
607 continue
604 else:
608 else:
605 # nothing was received within the time limit, signal heart failure
609 # nothing was received within the time limit, signal heart failure
606 self._beating = False
610 self._beating = False
607 since_last_heartbeat = time.time() - request_time
611 since_last_heartbeat = time.time() - request_time
608 self.call_handlers(since_last_heartbeat)
612 self.call_handlers(since_last_heartbeat)
609 # and close/reopen the socket, because the REQ/REP cycle has been broken
613 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 self._create_socket()
614 self._create_socket()
611 continue
615 continue
612 try:
616 try:
613 self.socket.close()
617 self.socket.close()
614 except:
618 except:
615 pass
619 pass
616
620
617 def pause(self):
621 def pause(self):
618 """Pause the heartbeat."""
622 """Pause the heartbeat."""
619 self._pause = True
623 self._pause = True
620
624
621 def unpause(self):
625 def unpause(self):
622 """Unpause the heartbeat."""
626 """Unpause the heartbeat."""
623 self._pause = False
627 self._pause = False
624
628
625 def is_beating(self):
629 def is_beating(self):
626 """Is the heartbeat running and responsive (and not paused)."""
630 """Is the heartbeat running and responsive (and not paused)."""
627 if self.is_alive() and not self._pause and self._beating:
631 if self.is_alive() and not self._pause and self._beating:
628 return True
632 return True
629 else:
633 else:
630 return False
634 return False
631
635
632 def stop(self):
636 def stop(self):
633 """Stop the channel's event loop and join its thread."""
637 """Stop the channel's event loop and join its thread."""
634 self._running = False
638 self._running = False
635 super(HBChannel, self).stop()
639 super(HBChannel, self).stop()
636
640
637 def call_handlers(self, since_last_heartbeat):
641 def call_handlers(self, since_last_heartbeat):
638 """This method is called in the ioloop thread when a message arrives.
642 """This method is called in the ioloop thread when a message arrives.
639
643
640 Subclasses should override this method to handle incoming messages.
644 Subclasses should override this method to handle incoming messages.
641 It is important to remember that this method is called in the thread
645 It is important to remember that this method is called in the thread
642 so that some logic must be done to ensure that the application level
646 so that some logic must be done to ensure that the application level
643 handlers are called in the application thread.
647 handlers are called in the application thread.
644 """
648 """
645 raise NotImplementedError('call_handlers must be defined in a subclass.')
649 raise NotImplementedError('call_handlers must be defined in a subclass.')
646
650
647
651
648 #-----------------------------------------------------------------------------
652 #-----------------------------------------------------------------------------
649 # Main kernel manager class
653 # Main kernel manager class
650 #-----------------------------------------------------------------------------
654 #-----------------------------------------------------------------------------
651
655
652 class KernelManager(Configurable):
656 class KernelManager(Configurable):
653 """Manages a single kernel on this host along with its channels.
657 """Manages a single kernel on this host along with its channels.
654
658
655 There are four channels associated with each kernel:
659 There are four channels associated with each kernel:
656
660
657 * shell: for request/reply calls to the kernel.
661 * shell: for request/reply calls to the kernel.
658 * iopub: for the kernel to publish results to frontends.
662 * iopub: for the kernel to publish results to frontends.
659 * hb: for monitoring the kernel's heartbeat.
663 * hb: for monitoring the kernel's heartbeat.
660 * stdin: for frontends to reply to raw_input calls in the kernel.
664 * stdin: for frontends to reply to raw_input calls in the kernel.
661
665
662 The usage of the channels that this class manages is optional. It is
666 The usage of the channels that this class manages is optional. It is
663 entirely possible to connect to the kernels directly using ZeroMQ
667 entirely possible to connect to the kernels directly using ZeroMQ
664 sockets. These channels are useful primarily for talking to a kernel
668 sockets. These channels are useful primarily for talking to a kernel
665 whose :class:`KernelManager` is in the same process.
669 whose :class:`KernelManager` is in the same process.
666
670
667 This version manages kernels started using Popen.
671 This version manages kernels started using Popen.
668 """
672 """
669 # The PyZMQ Context to use for communication with the kernel.
673 # The PyZMQ Context to use for communication with the kernel.
670 context = Instance(zmq.Context)
674 context = Instance(zmq.Context)
671 def _context_default(self):
675 def _context_default(self):
672 return zmq.Context.instance()
676 return zmq.Context.instance()
673
677
674 # The Session to use for communication with the kernel.
678 # The Session to use for communication with the kernel.
675 session = Instance(Session)
679 session = Instance(Session)
676 def _session_default(self):
680 def _session_default(self):
677 return Session(config=self.config)
681 return Session(config=self.config)
678
682
679 # The kernel process with which the KernelManager is communicating.
683 # The kernel process with which the KernelManager is communicating.
680 kernel = Instance(Popen)
684 # generally a Popen instance
685 kernel = Any()
686
687 kernel_cmd = List(Unicode, config=True,
688 help="""The Popen Command to launch the kernel.
689 Override this if you have a custom
690 """
691 )
692 def _kernel_cmd_changed(self, name, old, new):
693 print 'kernel cmd changed', new
694 self.ipython_kernel = False
695
696 ipython_kernel = Bool(True)
697
681
698
682 # The addresses for the communication channels.
699 # The addresses for the communication channels.
683 connection_file = Unicode('')
700 connection_file = Unicode('')
684
701
685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
702 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686
703
687 ip = Unicode(LOCALHOST, config=True,
704 ip = Unicode(LOCALHOST, config=True,
688 help="""Set the kernel\'s IP address [default localhost].
705 help="""Set the kernel\'s IP address [default localhost].
689 If the IP address is something other than localhost, then
706 If the IP address is something other than localhost, then
690 Consoles on other machines will be able to connect
707 Consoles on other machines will be able to connect
691 to the Kernel, so be careful!"""
708 to the Kernel, so be careful!"""
692 )
709 )
693 def _ip_default(self):
710 def _ip_default(self):
694 if self.transport == 'ipc':
711 if self.transport == 'ipc':
695 if self.connection_file:
712 if self.connection_file:
696 return os.path.splitext(self.connection_file)[0] + '-ipc'
713 return os.path.splitext(self.connection_file)[0] + '-ipc'
697 else:
714 else:
698 return 'kernel-ipc'
715 return 'kernel-ipc'
699 else:
716 else:
700 return LOCALHOST
717 return LOCALHOST
701 def _ip_changed(self, name, old, new):
718 def _ip_changed(self, name, old, new):
702 if new == '*':
719 if new == '*':
703 self.ip = '0.0.0.0'
720 self.ip = '0.0.0.0'
704 shell_port = Integer(0)
721 shell_port = Integer(0)
705 iopub_port = Integer(0)
722 iopub_port = Integer(0)
706 stdin_port = Integer(0)
723 stdin_port = Integer(0)
707 hb_port = Integer(0)
724 hb_port = Integer(0)
708
725
709 # The classes to use for the various channels.
726 # The classes to use for the various channels.
710 shell_channel_class = Type(ShellChannel)
727 shell_channel_class = Type(ShellChannel)
711 iopub_channel_class = Type(IOPubChannel)
728 iopub_channel_class = Type(IOPubChannel)
712 stdin_channel_class = Type(StdInChannel)
729 stdin_channel_class = Type(StdInChannel)
713 hb_channel_class = Type(HBChannel)
730 hb_channel_class = Type(HBChannel)
714
731
715 # Protected traits.
732 # Protected traits.
716 _launch_args = Any
733 _launch_args = Any
717 _shell_channel = Any
734 _shell_channel = Any
718 _iopub_channel = Any
735 _iopub_channel = Any
719 _stdin_channel = Any
736 _stdin_channel = Any
720 _hb_channel = Any
737 _hb_channel = Any
721 _connection_file_written=Bool(False)
738 _connection_file_written=Bool(False)
722
739
723 def __del__(self):
740 def __del__(self):
724 self.cleanup_connection_file()
741 self.cleanup_connection_file()
725
742
726 #--------------------------------------------------------------------------
743 #--------------------------------------------------------------------------
727 # Channel management methods:
744 # Channel management methods:
728 #--------------------------------------------------------------------------
745 #--------------------------------------------------------------------------
729
746
730 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
747 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
731 """Starts the channels for this kernel.
748 """Starts the channels for this kernel.
732
749
733 This will create the channels if they do not exist and then start
750 This will create the channels if they do not exist and then start
734 them (their activity runs in a thread). If port numbers of 0 are
751 them (their activity runs in a thread). If port numbers of 0 are
735 being used (random ports) then you must first call
752 being used (random ports) then you must first call
736 :method:`start_kernel`. If the channels have been stopped and you
753 :method:`start_kernel`. If the channels have been stopped and you
737 call this, :class:`RuntimeError` will be raised.
754 call this, :class:`RuntimeError` will be raised.
738 """
755 """
739 if shell:
756 if shell:
740 self.shell_channel.start()
757 self.shell_channel.start()
741 if iopub:
758 if iopub:
742 self.iopub_channel.start()
759 self.iopub_channel.start()
743 if stdin:
760 if stdin:
744 self.stdin_channel.start()
761 self.stdin_channel.start()
745 self.shell_channel.allow_stdin = True
762 self.shell_channel.allow_stdin = True
746 else:
763 else:
747 self.shell_channel.allow_stdin = False
764 self.shell_channel.allow_stdin = False
748 if hb:
765 if hb:
749 self.hb_channel.start()
766 self.hb_channel.start()
750
767
751 def stop_channels(self):
768 def stop_channels(self):
752 """Stops all the running channels for this kernel.
769 """Stops all the running channels for this kernel.
753
770
754 This stops their event loops and joins their threads.
771 This stops their event loops and joins their threads.
755 """
772 """
756 if self.shell_channel.is_alive():
773 if self.shell_channel.is_alive():
757 self.shell_channel.stop()
774 self.shell_channel.stop()
758 if self.iopub_channel.is_alive():
775 if self.iopub_channel.is_alive():
759 self.iopub_channel.stop()
776 self.iopub_channel.stop()
760 if self.stdin_channel.is_alive():
777 if self.stdin_channel.is_alive():
761 self.stdin_channel.stop()
778 self.stdin_channel.stop()
762 if self.hb_channel.is_alive():
779 if self.hb_channel.is_alive():
763 self.hb_channel.stop()
780 self.hb_channel.stop()
764
781
765 @property
782 @property
766 def channels_running(self):
783 def channels_running(self):
767 """Are any of the channels created and running?"""
784 """Are any of the channels created and running?"""
768 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
785 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
769 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
786 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
770
787
771 def _make_url(self, port):
788 def _make_url(self, port):
772 """Make a zmq url with a port.
789 """Make a zmq url with a port.
773
790
774 There are two cases that this handles:
791 There are two cases that this handles:
775
792
776 * tcp: tcp://ip:port
793 * tcp: tcp://ip:port
777 * ipc: ipc://ip-port
794 * ipc: ipc://ip-port
778 """
795 """
779 if self.transport == 'tcp':
796 if self.transport == 'tcp':
780 return "tcp://%s:%i" % (self.ip, port)
797 return "tcp://%s:%i" % (self.ip, port)
781 else:
798 else:
782 return "%s://%s-%s" % (self.transport, self.ip, port)
799 return "%s://%s-%s" % (self.transport, self.ip, port)
783
800
784 @property
801 @property
785 def shell_channel(self):
802 def shell_channel(self):
786 """Get the shell channel object for this kernel."""
803 """Get the shell channel object for this kernel."""
787 if self._shell_channel is None:
804 if self._shell_channel is None:
788 self._shell_channel = self.shell_channel_class(
805 self._shell_channel = self.shell_channel_class(
789 self.context, self.session, self._make_url(self.shell_port)
806 self.context, self.session, self._make_url(self.shell_port)
790 )
807 )
791 return self._shell_channel
808 return self._shell_channel
792
809
793 @property
810 @property
794 def iopub_channel(self):
811 def iopub_channel(self):
795 """Get the iopub channel object for this kernel."""
812 """Get the iopub channel object for this kernel."""
796 if self._iopub_channel is None:
813 if self._iopub_channel is None:
797 self._iopub_channel = self.iopub_channel_class(
814 self._iopub_channel = self.iopub_channel_class(
798 self.context, self.session, self._make_url(self.iopub_port)
815 self.context, self.session, self._make_url(self.iopub_port)
799 )
816 )
800 return self._iopub_channel
817 return self._iopub_channel
801
818
802 @property
819 @property
803 def stdin_channel(self):
820 def stdin_channel(self):
804 """Get the stdin channel object for this kernel."""
821 """Get the stdin channel object for this kernel."""
805 if self._stdin_channel is None:
822 if self._stdin_channel is None:
806 self._stdin_channel = self.stdin_channel_class(
823 self._stdin_channel = self.stdin_channel_class(
807 self.context, self.session, self._make_url(self.stdin_port)
824 self.context, self.session, self._make_url(self.stdin_port)
808 )
825 )
809 return self._stdin_channel
826 return self._stdin_channel
810
827
811 @property
828 @property
812 def hb_channel(self):
829 def hb_channel(self):
813 """Get the hb channel object for this kernel."""
830 """Get the hb channel object for this kernel."""
814 if self._hb_channel is None:
831 if self._hb_channel is None:
815 self._hb_channel = self.hb_channel_class(
832 self._hb_channel = self.hb_channel_class(
816 self.context, self.session, self._make_url(self.hb_port)
833 self.context, self.session, self._make_url(self.hb_port)
817 )
834 )
818 return self._hb_channel
835 return self._hb_channel
819
836
820 #--------------------------------------------------------------------------
837 #--------------------------------------------------------------------------
821 # Connection and ipc file management
838 # Connection and ipc file management
822 #--------------------------------------------------------------------------
839 #--------------------------------------------------------------------------
823
840
824 def cleanup_connection_file(self):
841 def cleanup_connection_file(self):
825 """Cleanup connection file *if we wrote it*
842 """Cleanup connection file *if we wrote it*
826
843
827 Will not raise if the connection file was already removed somehow.
844 Will not raise if the connection file was already removed somehow.
828 """
845 """
829 if self._connection_file_written:
846 if self._connection_file_written:
830 # cleanup connection files on full shutdown of kernel we started
847 # cleanup connection files on full shutdown of kernel we started
831 self._connection_file_written = False
848 self._connection_file_written = False
832 try:
849 try:
833 os.remove(self.connection_file)
850 os.remove(self.connection_file)
834 except (IOError, OSError):
851 except (IOError, OSError):
835 pass
852 pass
836
853
837 def cleanup_ipc_files(self):
854 def cleanup_ipc_files(self):
838 """Cleanup ipc files if we wrote them."""
855 """Cleanup ipc files if we wrote them."""
839 if self.transport != 'ipc':
856 if self.transport != 'ipc':
840 return
857 return
841 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
858 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
842 ipcfile = "%s-%i" % (self.ip, port)
859 ipcfile = "%s-%i" % (self.ip, port)
843 try:
860 try:
844 os.remove(ipcfile)
861 os.remove(ipcfile)
845 except (IOError, OSError):
862 except (IOError, OSError):
846 pass
863 pass
847
864
848 def load_connection_file(self):
865 def load_connection_file(self):
849 """Load connection info from JSON dict in self.connection_file."""
866 """Load connection info from JSON dict in self.connection_file."""
850 with open(self.connection_file) as f:
867 with open(self.connection_file) as f:
851 cfg = json.loads(f.read())
868 cfg = json.loads(f.read())
852
869
853 from pprint import pprint
870 from pprint import pprint
854 pprint(cfg)
871 pprint(cfg)
855 self.transport = cfg.get('transport', 'tcp')
872 self.transport = cfg.get('transport', 'tcp')
856 self.ip = cfg['ip']
873 self.ip = cfg['ip']
857 self.shell_port = cfg['shell_port']
874 self.shell_port = cfg['shell_port']
858 self.stdin_port = cfg['stdin_port']
875 self.stdin_port = cfg['stdin_port']
859 self.iopub_port = cfg['iopub_port']
876 self.iopub_port = cfg['iopub_port']
860 self.hb_port = cfg['hb_port']
877 self.hb_port = cfg['hb_port']
861 self.session.key = str_to_bytes(cfg['key'])
878 self.session.key = str_to_bytes(cfg['key'])
862
879
863 def write_connection_file(self):
880 def write_connection_file(self):
864 """Write connection info to JSON dict in self.connection_file."""
881 """Write connection info to JSON dict in self.connection_file."""
865 if self._connection_file_written:
882 if self._connection_file_written:
866 return
883 return
867 self.connection_file,cfg = write_connection_file(self.connection_file,
884 self.connection_file,cfg = write_connection_file(self.connection_file,
868 transport=self.transport, ip=self.ip, key=self.session.key,
885 transport=self.transport, ip=self.ip, key=self.session.key,
869 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
886 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
870 shell_port=self.shell_port, hb_port=self.hb_port)
887 shell_port=self.shell_port, hb_port=self.hb_port)
871 # write_connection_file also sets default ports:
888 # write_connection_file also sets default ports:
872 self.shell_port = cfg['shell_port']
889 self.shell_port = cfg['shell_port']
873 self.stdin_port = cfg['stdin_port']
890 self.stdin_port = cfg['stdin_port']
874 self.iopub_port = cfg['iopub_port']
891 self.iopub_port = cfg['iopub_port']
875 self.hb_port = cfg['hb_port']
892 self.hb_port = cfg['hb_port']
876
893
877 self._connection_file_written = True
894 self._connection_file_written = True
878
895
879 #--------------------------------------------------------------------------
896 #--------------------------------------------------------------------------
880 # Kernel management
897 # Kernel management
881 #--------------------------------------------------------------------------
898 #--------------------------------------------------------------------------
899
900 def format_kernel_cmd(self, **kw):
901 """format templated args (e.g. {connection_file})"""
902 if self.kernel_cmd:
903 cmd = self.kernel_cmd
904 else:
905 cmd = make_kernel_cmd(
906 'from IPython.zmq.ipkernel import main; main()',
907 **kw
908 )
909 ns = dict(connection_file=self.connection_file)
910 ns.update(self._launch_args)
911 return [ c.format(**ns) for c in cmd ]
882
912
883 def start_kernel(self, **kw):
913 def start_kernel(self, **kw):
884 """Starts a kernel on this host in a separate process.
914 """Starts a kernel on this host in a separate process.
885
915
886 If random ports (port=0) are being used, this method must be called
916 If random ports (port=0) are being used, this method must be called
887 before the channels are created.
917 before the channels are created.
888
918
889 Parameters:
919 Parameters:
890 -----------
920 -----------
891 launcher : callable, optional (default None)
921 launcher : callable, optional (default None)
892 A custom function for launching the kernel process (generally a
922 A custom function for launching the kernel process (generally a
893 wrapper around ``entry_point.base_launch_kernel``). In most cases,
923 wrapper around ``entry_point.base_launch_kernel``). In most cases,
894 it should not be necessary to use this parameter.
924 it should not be necessary to use this parameter.
895
925
896 **kw : optional
926 **kw : optional
897 keyword arguments that are passed down into the launcher
927 keyword arguments that are passed down into the launcher
898 callable.
928 callable.
899 """
929 """
900 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
930 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
901 raise RuntimeError("Can only launch a kernel on a local interface. "
931 raise RuntimeError("Can only launch a kernel on a local interface. "
902 "Make sure that the '*_address' attributes are "
932 "Make sure that the '*_address' attributes are "
903 "configured properly. "
933 "configured properly. "
904 "Currently valid addresses are: %s"%LOCAL_IPS
934 "Currently valid addresses are: %s"%LOCAL_IPS
905 )
935 )
906
936
907 # write connection file / get default ports
937 # write connection file / get default ports
908 self.write_connection_file()
938 self.write_connection_file()
909
939
940 # save kwargs for use in restart
910 self._launch_args = kw.copy()
941 self._launch_args = kw.copy()
911 launch_kernel = kw.pop('launcher', None)
942 # build the Popen cmd
912 if launch_kernel is None:
943 kernel_cmd = self.format_kernel_cmd(**kw)
913 from ipkernel import launch_kernel
944 print kernel_cmd
914 self.kernel = launch_kernel(fname=self.connection_file, **kw)
945 # launch the kernel subprocess
946 self.kernel = launch_kernel(kernel_cmd, ipython_kernel=self.ipython_kernel, **kw)
915
947
916 def shutdown_kernel(self, now=False, restart=False):
948 def shutdown_kernel(self, now=False, restart=False):
917 """Attempts to the stop the kernel process cleanly.
949 """Attempts to the stop the kernel process cleanly.
918
950
919 This attempts to shutdown the kernels cleanly by:
951 This attempts to shutdown the kernels cleanly by:
920
952
921 1. Sending it a shutdown message over the shell channel.
953 1. Sending it a shutdown message over the shell channel.
922 2. If that fails, the kernel is shutdown forcibly by sending it
954 2. If that fails, the kernel is shutdown forcibly by sending it
923 a signal.
955 a signal.
924
956
925 Parameters:
957 Parameters:
926 -----------
958 -----------
927 now : bool
959 now : bool
928 Should the kernel be forcible killed *now*. This skips the
960 Should the kernel be forcible killed *now*. This skips the
929 first, nice shutdown attempt.
961 first, nice shutdown attempt.
930 restart: bool
962 restart: bool
931 Will this kernel be restarted after it is shutdown. When this
963 Will this kernel be restarted after it is shutdown. When this
932 is True, connection files will not be cleaned up.
964 is True, connection files will not be cleaned up.
933 """
965 """
934 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
966 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
935 if sys.platform == 'win32':
967 if sys.platform == 'win32':
936 self._kill_kernel()
968 self._kill_kernel()
937 return
969 return
938
970
939 # Pause the heart beat channel if it exists.
971 # Pause the heart beat channel if it exists.
940 if self._hb_channel is not None:
972 if self._hb_channel is not None:
941 self._hb_channel.pause()
973 self._hb_channel.pause()
942
974
943 if now:
975 if now:
944 if self.has_kernel:
976 if self.has_kernel:
945 self._kill_kernel()
977 self._kill_kernel()
946 else:
978 else:
947 # Don't send any additional kernel kill messages immediately, to give
979 # Don't send any additional kernel kill messages immediately, to give
948 # the kernel a chance to properly execute shutdown actions. Wait for at
980 # the kernel a chance to properly execute shutdown actions. Wait for at
949 # most 1s, checking every 0.1s.
981 # most 1s, checking every 0.1s.
950 self.shell_channel.shutdown(restart=restart)
982 self.shell_channel.shutdown(restart=restart)
951 for i in range(10):
983 for i in range(10):
952 if self.is_alive:
984 if self.is_alive:
953 time.sleep(0.1)
985 time.sleep(0.1)
954 else:
986 else:
955 break
987 break
956 else:
988 else:
957 # OK, we've waited long enough.
989 # OK, we've waited long enough.
958 if self.has_kernel:
990 if self.has_kernel:
959 self._kill_kernel()
991 self._kill_kernel()
960
992
961 if not restart:
993 if not restart:
962 self.cleanup_connection_file()
994 self.cleanup_connection_file()
963 self.cleanup_ipc_files()
995 self.cleanup_ipc_files()
964 else:
996 else:
965 self.cleanup_ipc_files()
997 self.cleanup_ipc_files()
966
998
967 def restart_kernel(self, now=False, **kw):
999 def restart_kernel(self, now=False, **kw):
968 """Restarts a kernel with the arguments that were used to launch it.
1000 """Restarts a kernel with the arguments that were used to launch it.
969
1001
970 If the old kernel was launched with random ports, the same ports will be
1002 If the old kernel was launched with random ports, the same ports will be
971 used for the new kernel. The same connection file is used again.
1003 used for the new kernel. The same connection file is used again.
972
1004
973 Parameters
1005 Parameters
974 ----------
1006 ----------
975 now : bool, optional
1007 now : bool, optional
976 If True, the kernel is forcefully restarted *immediately*, without
1008 If True, the kernel is forcefully restarted *immediately*, without
977 having a chance to do any cleanup action. Otherwise the kernel is
1009 having a chance to do any cleanup action. Otherwise the kernel is
978 given 1s to clean up before a forceful restart is issued.
1010 given 1s to clean up before a forceful restart is issued.
979
1011
980 In all cases the kernel is restarted, the only difference is whether
1012 In all cases the kernel is restarted, the only difference is whether
981 it is given a chance to perform a clean shutdown or not.
1013 it is given a chance to perform a clean shutdown or not.
982
1014
983 **kw : optional
1015 **kw : optional
984 Any options specified here will overwrite those used to launch the
1016 Any options specified here will overwrite those used to launch the
985 kernel.
1017 kernel.
986 """
1018 """
987 if self._launch_args is None:
1019 if self._launch_args is None:
988 raise RuntimeError("Cannot restart the kernel. "
1020 raise RuntimeError("Cannot restart the kernel. "
989 "No previous call to 'start_kernel'.")
1021 "No previous call to 'start_kernel'.")
990 else:
1022 else:
991 # Stop currently running kernel.
1023 # Stop currently running kernel.
992 self.shutdown_kernel(now=now, restart=True)
1024 self.shutdown_kernel(now=now, restart=True)
993
1025
994 # Start new kernel.
1026 # Start new kernel.
995 self._launch_args.update(kw)
1027 self._launch_args.update(kw)
996 self.start_kernel(**self._launch_args)
1028 self.start_kernel(**self._launch_args)
997
1029
998 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1030 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
999 # unless there is some delay here.
1031 # unless there is some delay here.
1000 if sys.platform == 'win32':
1032 if sys.platform == 'win32':
1001 time.sleep(0.2)
1033 time.sleep(0.2)
1002
1034
1003 @property
1035 @property
1004 def has_kernel(self):
1036 def has_kernel(self):
1005 """Has a kernel been started that we are managing."""
1037 """Has a kernel been started that we are managing."""
1006 return self.kernel is not None
1038 return self.kernel is not None
1007
1039
1008 def _kill_kernel(self):
1040 def _kill_kernel(self):
1009 """Kill the running kernel.
1041 """Kill the running kernel.
1010
1042
1011 This is a private method, callers should use shutdown_kernel(now=True).
1043 This is a private method, callers should use shutdown_kernel(now=True).
1012 """
1044 """
1013 if self.has_kernel:
1045 if self.has_kernel:
1014 # Pause the heart beat channel if it exists.
1046 # Pause the heart beat channel if it exists.
1015 if self._hb_channel is not None:
1047 if self._hb_channel is not None:
1016 self._hb_channel.pause()
1048 self._hb_channel.pause()
1017
1049
1018 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1050 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1019 # TerminateProcess() on Win32).
1051 # TerminateProcess() on Win32).
1020 try:
1052 try:
1021 self.kernel.kill()
1053 self.kernel.kill()
1022 except OSError as e:
1054 except OSError as e:
1023 # In Windows, we will get an Access Denied error if the process
1055 # In Windows, we will get an Access Denied error if the process
1024 # has already terminated. Ignore it.
1056 # has already terminated. Ignore it.
1025 if sys.platform == 'win32':
1057 if sys.platform == 'win32':
1026 if e.winerror != 5:
1058 if e.winerror != 5:
1027 raise
1059 raise
1028 # On Unix, we may get an ESRCH error if the process has already
1060 # On Unix, we may get an ESRCH error if the process has already
1029 # terminated. Ignore it.
1061 # terminated. Ignore it.
1030 else:
1062 else:
1031 from errno import ESRCH
1063 from errno import ESRCH
1032 if e.errno != ESRCH:
1064 if e.errno != ESRCH:
1033 raise
1065 raise
1034
1066
1035 # Block until the kernel terminates.
1067 # Block until the kernel terminates.
1036 self.kernel.wait()
1068 self.kernel.wait()
1037 self.kernel = None
1069 self.kernel = None
1038 else:
1070 else:
1039 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1071 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1040
1072
1041 def interrupt_kernel(self):
1073 def interrupt_kernel(self):
1042 """Interrupts the kernel by sending it a signal.
1074 """Interrupts the kernel by sending it a signal.
1043
1075
1044 Unlike ``signal_kernel``, this operation is well supported on all
1076 Unlike ``signal_kernel``, this operation is well supported on all
1045 platforms.
1077 platforms.
1046 """
1078 """
1047 if self.has_kernel:
1079 if self.has_kernel:
1048 if sys.platform == 'win32':
1080 if sys.platform == 'win32':
1049 from parentpoller import ParentPollerWindows as Poller
1081 from parentpoller import ParentPollerWindows as Poller
1050 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1082 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1051 else:
1083 else:
1052 self.kernel.send_signal(signal.SIGINT)
1084 self.kernel.send_signal(signal.SIGINT)
1053 else:
1085 else:
1054 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1086 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1055
1087
1056 def signal_kernel(self, signum):
1088 def signal_kernel(self, signum):
1057 """Sends a signal to the kernel.
1089 """Sends a signal to the kernel.
1058
1090
1059 Note that since only SIGTERM is supported on Windows, this function is
1091 Note that since only SIGTERM is supported on Windows, this function is
1060 only useful on Unix systems.
1092 only useful on Unix systems.
1061 """
1093 """
1062 if self.has_kernel:
1094 if self.has_kernel:
1063 self.kernel.send_signal(signum)
1095 self.kernel.send_signal(signum)
1064 else:
1096 else:
1065 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1097 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1066
1098
1067 @property
1099 @property
1068 def is_alive(self):
1100 def is_alive(self):
1069 """Is the kernel process still running?"""
1101 """Is the kernel process still running?"""
1070 if self.has_kernel:
1102 if self.has_kernel:
1071 if self.kernel.poll() is None:
1103 if self.kernel.poll() is None:
1072 return True
1104 return True
1073 else:
1105 else:
1074 return False
1106 return False
1075 elif self._hb_channel is not None:
1107 elif self._hb_channel is not None:
1076 # We didn't start the kernel with this KernelManager so we
1108 # We didn't start the kernel with this KernelManager so we
1077 # use the heartbeat.
1109 # use the heartbeat.
1078 return self._hb_channel.is_beating()
1110 return self._hb_channel.is_beating()
1079 else:
1111 else:
1080 # no heartbeat and not local, we can't tell if it's running,
1112 # no heartbeat and not local, we can't tell if it's running,
1081 # so naively return True
1113 # so naively return True
1082 return True
1114 return True
1083
1115
1084
1116
1085 #-----------------------------------------------------------------------------
1117 #-----------------------------------------------------------------------------
1086 # ABC Registration
1118 # ABC Registration
1087 #-----------------------------------------------------------------------------
1119 #-----------------------------------------------------------------------------
1088
1120
1089 ShellChannelABC.register(ShellChannel)
1121 ShellChannelABC.register(ShellChannel)
1090 IOPubChannelABC.register(IOPubChannel)
1122 IOPubChannelABC.register(IOPubChannel)
1091 HBChannelABC.register(HBChannel)
1123 HBChannelABC.register(HBChannel)
1092 StdInChannelABC.register(StdInChannel)
1124 StdInChannelABC.register(StdInChannel)
1093 KernelManagerABC.register(KernelManager)
1125 KernelManagerABC.register(KernelManager)
1094
1126
General Comments 0
You need to be logged in to leave comments. Login now