##// END OF EJS Templates
split KernelManager into KernelManager + KernelClient
MinRK -
Show More
@@ -0,0 +1,33
1 """Implements a fully blocking kernel client.
2
3 Useful for test suites and blocking terminal interfaces.
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
15
16 from IPython.utils.traitlets import Type
17 from IPython.kernel.client import KernelClient
18 from .channels import (
19 BlockingIOPubChannel, BlockingHBChannel,
20 BlockingShellChannel, BlockingStdInChannel
21 )
22
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
26
27 class BlockingKernelClient(KernelClient):
28
29 # The classes to use for the various channels
30 shell_channel_class = Type(BlockingShellChannel)
31 iopub_channel_class = Type(BlockingIOPubChannel)
32 stdin_channel_class = Type(BlockingStdInChannel)
33 hb_channel_class = Type(BlockingHBChannel)
@@ -0,0 +1,193
1 """Abstract base classes for kernel client channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.channels.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.channels.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.channels.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.channels.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.channels.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
This diff has been collapsed as it changes many lines, (638 lines changed) Show them Hide them
@@ -0,0 +1,638
1 """Base classes to manage a Client's interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 # Standard library imports
18 import atexit
19 import errno
20 from threading import Thread
21 import time
22
23 import zmq
24 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
25 # during garbage collection of threads at exit:
26 from zmq import ZMQError
27 from zmq.eventloop import ioloop, zmqstream
28
29 # Local imports
30 from .channelabc import (
31 ShellChannelABC, IOPubChannelABC,
32 HBChannelABC, StdInChannelABC,
33 )
34
35 #-----------------------------------------------------------------------------
36 # Constants and exceptions
37 #-----------------------------------------------------------------------------
38
39 class InvalidPortNumber(Exception):
40 pass
41
42 #-----------------------------------------------------------------------------
43 # Utility functions
44 #-----------------------------------------------------------------------------
45
46 # some utilities to validate message structure, these might get moved elsewhere
47 # if they prove to have more generic utility
48
49 def validate_string_list(lst):
50 """Validate that the input is a list of strings.
51
52 Raises ValueError if not."""
53 if not isinstance(lst, list):
54 raise ValueError('input %r must be a list' % lst)
55 for x in lst:
56 if not isinstance(x, basestring):
57 raise ValueError('element %r in list must be a string' % x)
58
59
60 def validate_string_dict(dct):
61 """Validate that the input is a dict with string keys and values.
62
63 Raises ValueError if not."""
64 for k,v in dct.iteritems():
65 if not isinstance(k, basestring):
66 raise ValueError('key %r in dict must be a string' % k)
67 if not isinstance(v, basestring):
68 raise ValueError('value %r in dict must be a string' % v)
69
70
71 #-----------------------------------------------------------------------------
72 # ZMQ Socket Channel classes
73 #-----------------------------------------------------------------------------
74
75 class ZMQSocketChannel(Thread):
76 """The base class for the channels that use ZMQ sockets."""
77 context = None
78 session = None
79 socket = None
80 ioloop = None
81 stream = None
82 _address = None
83 _exiting = False
84
85 def __init__(self, context, session, address):
86 """Create a channel.
87
88 Parameters
89 ----------
90 context : :class:`zmq.Context`
91 The ZMQ context to use.
92 session : :class:`session.Session`
93 The session to use.
94 address : zmq url
95 Standard (ip, port) tuple that the kernel is listening on.
96 """
97 super(ZMQSocketChannel, self).__init__()
98 self.daemon = True
99
100 self.context = context
101 self.session = session
102 if isinstance(address, tuple):
103 if address[1] == 0:
104 message = 'The port number for a channel cannot be 0.'
105 raise InvalidPortNumber(message)
106 address = "tcp://%s:%i" % address
107 self._address = address
108 atexit.register(self._notice_exit)
109
110 def _notice_exit(self):
111 self._exiting = True
112
113 def _run_loop(self):
114 """Run my loop, ignoring EINTR events in the poller"""
115 while True:
116 try:
117 self.ioloop.start()
118 except ZMQError as e:
119 if e.errno == errno.EINTR:
120 continue
121 else:
122 raise
123 except Exception:
124 if self._exiting:
125 break
126 else:
127 raise
128 else:
129 break
130
131 def stop(self):
132 """Stop the channel's event loop and join its thread.
133
134 This calls :method:`Thread.join` and returns when the thread
135 terminates. :class:`RuntimeError` will be raised if
136 :method:`self.start` is called again.
137 """
138 self.join()
139
140 @property
141 def address(self):
142 """Get the channel's address as a zmq url string.
143
144 These URLS have the form: 'tcp://127.0.0.1:5555'.
145 """
146 return self._address
147
148 def _queue_send(self, msg):
149 """Queue a message to be sent from the IOLoop's thread.
150
151 Parameters
152 ----------
153 msg : message to send
154
155 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
156 thread control of the action.
157 """
158 def thread_send():
159 self.session.send(self.stream, msg)
160 self.ioloop.add_callback(thread_send)
161
162 def _handle_recv(self, msg):
163 """Callback for stream.on_recv.
164
165 Unpacks message, and calls handlers with it.
166 """
167 ident,smsg = self.session.feed_identities(msg)
168 self.call_handlers(self.session.unserialize(smsg))
169
170
171
172 class ShellChannel(ZMQSocketChannel):
173 """The shell channel for issuing request/replies to the kernel."""
174
175 command_queue = None
176 # flag for whether execute requests should be allowed to call raw_input:
177 allow_stdin = True
178
179 def __init__(self, context, session, address):
180 super(ShellChannel, self).__init__(context, session, address)
181 self.ioloop = ioloop.IOLoop()
182
183 def run(self):
184 """The thread's main activity. Call start() instead."""
185 self.socket = self.context.socket(zmq.DEALER)
186 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
187 self.socket.connect(self.address)
188 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
189 self.stream.on_recv(self._handle_recv)
190 self._run_loop()
191 try:
192 self.socket.close()
193 except:
194 pass
195
196 def stop(self):
197 """Stop the channel's event loop and join its thread."""
198 self.ioloop.stop()
199 super(ShellChannel, self).stop()
200
201 def call_handlers(self, msg):
202 """This method is called in the ioloop thread when a message arrives.
203
204 Subclasses should override this method to handle incoming messages.
205 It is important to remember that this method is called in the thread
206 so that some logic must be done to ensure that the application leve
207 handlers are called in the application thread.
208 """
209 raise NotImplementedError('call_handlers must be defined in a subclass.')
210
211 def execute(self, code, silent=False, store_history=True,
212 user_variables=None, user_expressions=None, allow_stdin=None):
213 """Execute code in the kernel.
214
215 Parameters
216 ----------
217 code : str
218 A string of Python code.
219
220 silent : bool, optional (default False)
221 If set, the kernel will execute the code as quietly possible, and
222 will force store_history to be False.
223
224 store_history : bool, optional (default True)
225 If set, the kernel will store command history. This is forced
226 to be False if silent is True.
227
228 user_variables : list, optional
229 A list of variable names to pull from the user's namespace. They
230 will come back as a dict with these names as keys and their
231 :func:`repr` as values.
232
233 user_expressions : dict, optional
234 A dict mapping names to expressions to be evaluated in the user's
235 dict. The expression values are returned as strings formatted using
236 :func:`repr`.
237
238 allow_stdin : bool, optional (default self.allow_stdin)
239 Flag for whether the kernel can send stdin requests to frontends.
240
241 Some frontends (e.g. the Notebook) do not support stdin requests.
242 If raw_input is called from code executed from such a frontend, a
243 StdinNotImplementedError will be raised.
244
245 Returns
246 -------
247 The msg_id of the message sent.
248 """
249 if user_variables is None:
250 user_variables = []
251 if user_expressions is None:
252 user_expressions = {}
253 if allow_stdin is None:
254 allow_stdin = self.allow_stdin
255
256
257 # Don't waste network traffic if inputs are invalid
258 if not isinstance(code, basestring):
259 raise ValueError('code %r must be a string' % code)
260 validate_string_list(user_variables)
261 validate_string_dict(user_expressions)
262
263 # Create class for content/msg creation. Related to, but possibly
264 # not in Session.
265 content = dict(code=code, silent=silent, store_history=store_history,
266 user_variables=user_variables,
267 user_expressions=user_expressions,
268 allow_stdin=allow_stdin,
269 )
270 msg = self.session.msg('execute_request', content)
271 self._queue_send(msg)
272 return msg['header']['msg_id']
273
274 def complete(self, text, line, cursor_pos, block=None):
275 """Tab complete text in the kernel's namespace.
276
277 Parameters
278 ----------
279 text : str
280 The text to complete.
281 line : str
282 The full line of text that is the surrounding context for the
283 text to complete.
284 cursor_pos : int
285 The position of the cursor in the line where the completion was
286 requested.
287 block : str, optional
288 The full block of code in which the completion is being requested.
289
290 Returns
291 -------
292 The msg_id of the message sent.
293 """
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 msg = self.session.msg('complete_request', content)
296 self._queue_send(msg)
297 return msg['header']['msg_id']
298
299 def object_info(self, oname, detail_level=0):
300 """Get metadata information about an object in the kernel's namespace.
301
302 Parameters
303 ----------
304 oname : str
305 A string specifying the object name.
306 detail_level : int, optional
307 The level of detail for the introspection (0-2)
308
309 Returns
310 -------
311 The msg_id of the message sent.
312 """
313 content = dict(oname=oname, detail_level=detail_level)
314 msg = self.session.msg('object_info_request', content)
315 self._queue_send(msg)
316 return msg['header']['msg_id']
317
318 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
319 """Get entries from the kernel's history list.
320
321 Parameters
322 ----------
323 raw : bool
324 If True, return the raw input.
325 output : bool
326 If True, then return the output as well.
327 hist_access_type : str
328 'range' (fill in session, start and stop params), 'tail' (fill in n)
329 or 'search' (fill in pattern param).
330
331 session : int
332 For a range request, the session from which to get lines. Session
333 numbers are positive integers; negative ones count back from the
334 current session.
335 start : int
336 The first line number of a history range.
337 stop : int
338 The final (excluded) line number of a history range.
339
340 n : int
341 The number of lines of history to get for a tail request.
342
343 pattern : str
344 The glob-syntax pattern for a search request.
345
346 Returns
347 -------
348 The msg_id of the message sent.
349 """
350 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
351 **kwargs)
352 msg = self.session.msg('history_request', content)
353 self._queue_send(msg)
354 return msg['header']['msg_id']
355
356 def kernel_info(self):
357 """Request kernel info."""
358 msg = self.session.msg('kernel_info_request')
359 self._queue_send(msg)
360 return msg['header']['msg_id']
361
362 def shutdown(self, restart=False):
363 """Request an immediate kernel shutdown.
364
365 Upon receipt of the (empty) reply, client code can safely assume that
366 the kernel has shut down and it's safe to forcefully terminate it if
367 it's still alive.
368
369 The kernel will send the reply via a function registered with Python's
370 atexit module, ensuring it's truly done as the kernel is done with all
371 normal operation.
372 """
373 # Send quit message to kernel. Once we implement kernel-side setattr,
374 # this should probably be done that way, but for now this will do.
375 msg = self.session.msg('shutdown_request', {'restart':restart})
376 self._queue_send(msg)
377 return msg['header']['msg_id']
378
379
380
381 class IOPubChannel(ZMQSocketChannel):
382 """The iopub channel which listens for messages that the kernel publishes.
383
384 This channel is where all output is published to frontends.
385 """
386
387 def __init__(self, context, session, address):
388 super(IOPubChannel, self).__init__(context, session, address)
389 self.ioloop = ioloop.IOLoop()
390
391 def run(self):
392 """The thread's main activity. Call start() instead."""
393 self.socket = self.context.socket(zmq.SUB)
394 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
395 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
396 self.socket.connect(self.address)
397 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
398 self.stream.on_recv(self._handle_recv)
399 self._run_loop()
400 try:
401 self.socket.close()
402 except:
403 pass
404
405 def stop(self):
406 """Stop the channel's event loop and join its thread."""
407 self.ioloop.stop()
408 super(IOPubChannel, self).stop()
409
410 def call_handlers(self, msg):
411 """This method is called in the ioloop thread when a message arrives.
412
413 Subclasses should override this method to handle incoming messages.
414 It is important to remember that this method is called in the thread
415 so that some logic must be done to ensure that the application leve
416 handlers are called in the application thread.
417 """
418 raise NotImplementedError('call_handlers must be defined in a subclass.')
419
420 def flush(self, timeout=1.0):
421 """Immediately processes all pending messages on the iopub channel.
422
423 Callers should use this method to ensure that :method:`call_handlers`
424 has been called for all messages that have been received on the
425 0MQ SUB socket of this channel.
426
427 This method is thread safe.
428
429 Parameters
430 ----------
431 timeout : float, optional
432 The maximum amount of time to spend flushing, in seconds. The
433 default is one second.
434 """
435 # We do the IOLoop callback process twice to ensure that the IOLoop
436 # gets to perform at least one full poll.
437 stop_time = time.time() + timeout
438 for i in xrange(2):
439 self._flushed = False
440 self.ioloop.add_callback(self._flush)
441 while not self._flushed and time.time() < stop_time:
442 time.sleep(0.01)
443
444 def _flush(self):
445 """Callback for :method:`self.flush`."""
446 self.stream.flush()
447 self._flushed = True
448
449
450 class StdInChannel(ZMQSocketChannel):
451 """The stdin channel to handle raw_input requests that the kernel makes."""
452
453 msg_queue = None
454
455 def __init__(self, context, session, address):
456 super(StdInChannel, self).__init__(context, session, address)
457 self.ioloop = ioloop.IOLoop()
458
459 def run(self):
460 """The thread's main activity. Call start() instead."""
461 self.socket = self.context.socket(zmq.DEALER)
462 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
463 self.socket.connect(self.address)
464 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
465 self.stream.on_recv(self._handle_recv)
466 self._run_loop()
467 try:
468 self.socket.close()
469 except:
470 pass
471
472 def stop(self):
473 """Stop the channel's event loop and join its thread."""
474 self.ioloop.stop()
475 super(StdInChannel, self).stop()
476
477 def call_handlers(self, msg):
478 """This method is called in the ioloop thread when a message arrives.
479
480 Subclasses should override this method to handle incoming messages.
481 It is important to remember that this method is called in the thread
482 so that some logic must be done to ensure that the application leve
483 handlers are called in the application thread.
484 """
485 raise NotImplementedError('call_handlers must be defined in a subclass.')
486
487 def input(self, string):
488 """Send a string of raw input to the kernel."""
489 content = dict(value=string)
490 msg = self.session.msg('input_reply', content)
491 self._queue_send(msg)
492
493
494 class HBChannel(ZMQSocketChannel):
495 """The heartbeat channel which monitors the kernel heartbeat.
496
497 Note that the heartbeat channel is paused by default. As long as you start
498 this channel, the kernel manager will ensure that it is paused and un-paused
499 as appropriate.
500 """
501
502 time_to_dead = 3.0
503 socket = None
504 poller = None
505 _running = None
506 _pause = None
507 _beating = None
508
509 def __init__(self, context, session, address):
510 super(HBChannel, self).__init__(context, session, address)
511 self._running = False
512 self._pause =True
513 self.poller = zmq.Poller()
514
515 def _create_socket(self):
516 if self.socket is not None:
517 # close previous socket, before opening a new one
518 self.poller.unregister(self.socket)
519 self.socket.close()
520 self.socket = self.context.socket(zmq.REQ)
521 self.socket.setsockopt(zmq.LINGER, 0)
522 self.socket.connect(self.address)
523
524 self.poller.register(self.socket, zmq.POLLIN)
525
526 def _poll(self, start_time):
527 """poll for heartbeat replies until we reach self.time_to_dead.
528
529 Ignores interrupts, and returns the result of poll(), which
530 will be an empty list if no messages arrived before the timeout,
531 or the event tuple if there is a message to receive.
532 """
533
534 until_dead = self.time_to_dead - (time.time() - start_time)
535 # ensure poll at least once
536 until_dead = max(until_dead, 1e-3)
537 events = []
538 while True:
539 try:
540 events = self.poller.poll(1000 * until_dead)
541 except ZMQError as e:
542 if e.errno == errno.EINTR:
543 # ignore interrupts during heartbeat
544 # this may never actually happen
545 until_dead = self.time_to_dead - (time.time() - start_time)
546 until_dead = max(until_dead, 1e-3)
547 pass
548 else:
549 raise
550 except Exception:
551 if self._exiting:
552 break
553 else:
554 raise
555 else:
556 break
557 return events
558
559 def run(self):
560 """The thread's main activity. Call start() instead."""
561 self._create_socket()
562 self._running = True
563 self._beating = True
564
565 while self._running:
566 if self._pause:
567 # just sleep, and skip the rest of the loop
568 time.sleep(self.time_to_dead)
569 continue
570
571 since_last_heartbeat = 0.0
572 # io.rprint('Ping from HB channel') # dbg
573 # no need to catch EFSM here, because the previous event was
574 # either a recv or connect, which cannot be followed by EFSM
575 self.socket.send(b'ping')
576 request_time = time.time()
577 ready = self._poll(request_time)
578 if ready:
579 self._beating = True
580 # the poll above guarantees we have something to recv
581 self.socket.recv()
582 # sleep the remainder of the cycle
583 remainder = self.time_to_dead - (time.time() - request_time)
584 if remainder > 0:
585 time.sleep(remainder)
586 continue
587 else:
588 # nothing was received within the time limit, signal heart failure
589 self._beating = False
590 since_last_heartbeat = time.time() - request_time
591 self.call_handlers(since_last_heartbeat)
592 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 self._create_socket()
594 continue
595 try:
596 self.socket.close()
597 except:
598 pass
599
600 def pause(self):
601 """Pause the heartbeat."""
602 self._pause = True
603
604 def unpause(self):
605 """Unpause the heartbeat."""
606 self._pause = False
607
608 def is_beating(self):
609 """Is the heartbeat running and responsive (and not paused)."""
610 if self.is_alive() and not self._pause and self._beating:
611 return True
612 else:
613 return False
614
615 def stop(self):
616 """Stop the channel's event loop and join its thread."""
617 self._running = False
618 super(HBChannel, self).stop()
619
620 def call_handlers(self, since_last_heartbeat):
621 """This method is called in the ioloop thread when a message arrives.
622
623 Subclasses should override this method to handle incoming messages.
624 It is important to remember that this method is called in the thread
625 so that some logic must be done to ensure that the application level
626 handlers are called in the application thread.
627 """
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
629
630
631 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
632 # ABC Registration
633 #-----------------------------------------------------------------------------
634
635 ShellChannelABC.register(ShellChannel)
636 IOPubChannelABC.register(IOPubChannel)
637 HBChannelABC.register(HBChannel)
638 StdInChannelABC.register(StdInChannel)
@@ -0,0 +1,182
1 """Base class to manage the interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 import zmq
18
19 # Local imports
20 from IPython.config.configurable import LoggingConfigurable
21 from IPython.utils.traitlets import (
22 Any, Instance, Type,
23 )
24
25 from .zmq.session import Session
26 from .channels import (
27 ShellChannel, IOPubChannel,
28 HBChannel, StdInChannel,
29 )
30 from .clientabc import KernelClientABC
31 from .connect import ConnectionFileMixin
32
33
34 #-----------------------------------------------------------------------------
35 # Main kernel client class
36 #-----------------------------------------------------------------------------
37
38 class KernelClient(LoggingConfigurable, ConnectionFileMixin):
39 """Communicates with a single kernel on any host via zmq channels.
40
41 There are four channels associated with each kernel:
42
43 * shell: for request/reply calls to the kernel.
44 * iopub: for the kernel to publish results to frontends.
45 * hb: for monitoring the kernel's heartbeat.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
47
48 """
49
50 # The PyZMQ Context to use for communication with the kernel.
51 context = Instance(zmq.Context)
52 def _context_default(self):
53 return zmq.Context.instance()
54
55 # The Session to use for communication with the kernel.
56 session = Instance(Session)
57 def _session_default(self):
58 return Session(config=self.config)
59
60 # The classes to use for the various channels
61 shell_channel_class = Type(ShellChannel)
62 iopub_channel_class = Type(IOPubChannel)
63 stdin_channel_class = Type(StdInChannel)
64 hb_channel_class = Type(HBChannel)
65
66 # Protected traits
67 _shell_channel = Any
68 _iopub_channel = Any
69 _stdin_channel = Any
70 _hb_channel = Any
71
72 #--------------------------------------------------------------------------
73 # Channel management methods
74 #--------------------------------------------------------------------------
75
76 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
77 """Starts the channels for this kernel.
78
79 This will create the channels if they do not exist and then start
80 them (their activity runs in a thread). If port numbers of 0 are
81 being used (random ports) then you must first call
82 :method:`start_kernel`. If the channels have been stopped and you
83 call this, :class:`RuntimeError` will be raised.
84 """
85 if shell:
86 self.shell_channel.start()
87 if iopub:
88 self.iopub_channel.start()
89 if stdin:
90 self.stdin_channel.start()
91 self.shell_channel.allow_stdin = True
92 else:
93 self.shell_channel.allow_stdin = False
94 if hb:
95 self.hb_channel.start()
96
97 def stop_channels(self):
98 """Stops all the running channels for this kernel.
99
100 This stops their event loops and joins their threads.
101 """
102 if self.shell_channel.is_alive():
103 self.shell_channel.stop()
104 if self.iopub_channel.is_alive():
105 self.iopub_channel.stop()
106 if self.stdin_channel.is_alive():
107 self.stdin_channel.stop()
108 if self.hb_channel.is_alive():
109 self.hb_channel.stop()
110
111 @property
112 def channels_running(self):
113 """Are any of the channels created and running?"""
114 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
115 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
116
117 def _make_url(self, port):
118 """Make a zmq url with a port.
119
120 There are two cases that this handles:
121
122 * tcp: tcp://ip:port
123 * ipc: ipc://ip-port
124 """
125 if self.transport == 'tcp':
126 return "tcp://%s:%i" % (self.ip, port)
127 else:
128 return "%s://%s-%s" % (self.transport, self.ip, port)
129
130 @property
131 def shell_channel(self):
132 """Get the shell channel object for this kernel."""
133 if self._shell_channel is None:
134 self._shell_channel = self.shell_channel_class(
135 self.context, self.session, self._make_url(self.shell_port)
136 )
137 return self._shell_channel
138
139 @property
140 def iopub_channel(self):
141 """Get the iopub channel object for this kernel."""
142 if self._iopub_channel is None:
143 self._iopub_channel = self.iopub_channel_class(
144 self.context, self.session, self._make_url(self.iopub_port)
145 )
146 return self._iopub_channel
147
148 @property
149 def stdin_channel(self):
150 """Get the stdin channel object for this kernel."""
151 if self._stdin_channel is None:
152 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, self._make_url(self.stdin_port)
154 )
155 return self._stdin_channel
156
157 @property
158 def hb_channel(self):
159 """Get the hb channel object for this kernel."""
160 if self._hb_channel is None:
161 self._hb_channel = self.hb_channel_class(
162 self.context, self.session, self._make_url(self.hb_port)
163 )
164 return self._hb_channel
165
166 def is_alive(self):
167 """Is the kernel process still running?"""
168 if self._hb_channel is not None:
169 # We didn't start the kernel with this KernelManager so we
170 # use the heartbeat.
171 return self._hb_channel.is_beating()
172 else:
173 # no heartbeat and not local, we can't tell if it's running,
174 # so naively return True
175 return True
176
177
178 #-----------------------------------------------------------------------------
179 # ABC Registration
180 #-----------------------------------------------------------------------------
181
182 KernelClientABC.register(KernelClient)
@@ -0,0 +1,193
1 """Abstract base classes for kernel clients and channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.kernelmanager.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.kernelmanager.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.kernelmanager.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.kernelmanager.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.kernelmanager.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
@@ -1,10 +1,10
1 """IPython kernels and associated utilities"""
1 """IPython kernels and associated utilities"""
2
2
3 # just for friendlier zmq version check
3 # just for friendlier zmq version check
4 from . import zmq
4 from . import zmq
5
5
6 from .connect import *
6 from .connect import *
7 from .launcher import *
7 from .launcher import *
8 from .manager import KernelManager
8 from .manager import KernelManager
9 from .blocking import BlockingKernelManager
9 from .blocking import BlockingKernelClient
10 from .multikernelmanager import MultiKernelManager
10 from .multikernelmanager import MultiKernelManager
@@ -1,1 +1,1
1 from .manager import BlockingKernelManager No newline at end of file
1 from .client import BlockingKernelClient No newline at end of file
@@ -1,89 +1,79
1 """ Implements a fully blocking kernel manager.
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2012 The IPython Development Team
6 # Copyright (C) 2013 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 import Queue
16 import Queue
17
17
18 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import IOPubChannel, HBChannel, \
19 from IPython.kernel.manager import KernelManager, IOPubChannel, HBChannel, \
20 ShellChannel, StdInChannel
19 ShellChannel, StdInChannel
21
20
22 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
23 # Blocking kernel manager
22 # Blocking kernel manager
24 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
25
24
26
25
27 class BlockingChannelMixin(object):
26 class BlockingChannelMixin(object):
28
27
29 def __init__(self, *args, **kwds):
28 def __init__(self, *args, **kwds):
30 super(BlockingChannelMixin, self).__init__(*args, **kwds)
29 super(BlockingChannelMixin, self).__init__(*args, **kwds)
31 self._in_queue = Queue.Queue()
30 self._in_queue = Queue.Queue()
32
31
33 def call_handlers(self, msg):
32 def call_handlers(self, msg):
34 self._in_queue.put(msg)
33 self._in_queue.put(msg)
35
34
36 def get_msg(self, block=True, timeout=None):
35 def get_msg(self, block=True, timeout=None):
37 """ Gets a message if there is one that is ready. """
36 """ Gets a message if there is one that is ready. """
38 if timeout is None:
37 if timeout is None:
39 # Queue.get(timeout=None) has stupid uninteruptible
38 # Queue.get(timeout=None) has stupid uninteruptible
40 # behavior, so wait for a week instead
39 # behavior, so wait for a week instead
41 timeout = 604800
40 timeout = 604800
42 return self._in_queue.get(block, timeout)
41 return self._in_queue.get(block, timeout)
43
42
44 def get_msgs(self):
43 def get_msgs(self):
45 """ Get all messages that are currently ready. """
44 """ Get all messages that are currently ready. """
46 msgs = []
45 msgs = []
47 while True:
46 while True:
48 try:
47 try:
49 msgs.append(self.get_msg(block=False))
48 msgs.append(self.get_msg(block=False))
50 except Queue.Empty:
49 except Queue.Empty:
51 break
50 break
52 return msgs
51 return msgs
53
52
54 def msg_ready(self):
53 def msg_ready(self):
55 """ Is there a message that has been received? """
54 """ Is there a message that has been received? """
56 return not self._in_queue.empty()
55 return not self._in_queue.empty()
57
56
58
57
59 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
58 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
60 pass
59 pass
61
60
62
61
63 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
62 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
64 pass
63 pass
65
64
66
65
67 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
66 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
68 pass
67 pass
69
68
70
69
71 class BlockingHBChannel(HBChannel):
70 class BlockingHBChannel(HBChannel):
72
71
73 # This kernel needs quicker monitoring, shorten to 1 sec.
72 # This kernel needs quicker monitoring, shorten to 1 sec.
74 # less than 0.5s is unreliable, and will get occasional
73 # less than 0.5s is unreliable, and will get occasional
75 # false reports of missed beats.
74 # false reports of missed beats.
76 time_to_dead = 1.
75 time_to_dead = 1.
77
76
78 def call_handlers(self, since_last_heartbeat):
77 def call_handlers(self, since_last_heartbeat):
79 """ Pause beating on missed heartbeat. """
78 """ Pause beating on missed heartbeat. """
80 pass
79 pass
81
82
83 class BlockingKernelManager(KernelManager):
84
85 # The classes to use for the various channels.
86 shell_channel_class = Type(BlockingShellChannel)
87 iopub_channel_class = Type(BlockingIOPubChannel)
88 stdin_channel_class = Type(BlockingStdInChannel)
89 hb_channel_class = Type(BlockingHBChannel)
@@ -1,347 +1,452
1 """Utilities for connecting to kernels
1 """Utilities for connecting to kernels
2
2
3 Authors:
3 Authors:
4
4
5 * Min Ragan-Kelley
5 * Min Ragan-Kelley
6
6
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2013 The IPython Development Team
10 # Copyright (C) 2013 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 import glob
20 import glob
21 import json
21 import json
22 import os
22 import os
23 import socket
23 import socket
24 import sys
24 import sys
25 from getpass import getpass
25 from getpass import getpass
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27 import tempfile
27 import tempfile
28
28
29 # external imports
29 # external imports
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 # IPython imports
32 # IPython imports
33 # from IPython.config import Configurable
33 from IPython.core.profiledir import ProfileDir
34 from IPython.core.profiledir import ProfileDir
34 from IPython.utils.localinterfaces import LOCALHOST
35 from IPython.utils.localinterfaces import LOCALHOST
35 from IPython.utils.path import filefind, get_ipython_dir
36 from IPython.utils.path import filefind, get_ipython_dir
36 from IPython.utils.py3compat import str_to_bytes, bytes_to_str
37 from IPython.utils.py3compat import str_to_bytes, bytes_to_str
38 from IPython.utils.traitlets import (
39 Bool, Integer, Unicode, CaselessStrEnum,
40 HasTraits,
41 )
37
42
38
43
39 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
40 # Working with Connection Files
45 # Working with Connection Files
41 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
42
47
43 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
48 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
44 ip=LOCALHOST, key=b'', transport='tcp'):
49 ip=LOCALHOST, key=b'', transport='tcp'):
45 """Generates a JSON config file, including the selection of random ports.
50 """Generates a JSON config file, including the selection of random ports.
46
51
47 Parameters
52 Parameters
48 ----------
53 ----------
49
54
50 fname : unicode
55 fname : unicode
51 The path to the file to write
56 The path to the file to write
52
57
53 shell_port : int, optional
58 shell_port : int, optional
54 The port to use for ROUTER channel.
59 The port to use for ROUTER channel.
55
60
56 iopub_port : int, optional
61 iopub_port : int, optional
57 The port to use for the SUB channel.
62 The port to use for the SUB channel.
58
63
59 stdin_port : int, optional
64 stdin_port : int, optional
60 The port to use for the REQ (raw input) channel.
65 The port to use for the REQ (raw input) channel.
61
66
62 hb_port : int, optional
67 hb_port : int, optional
63 The port to use for the hearbeat REP channel.
68 The port to use for the hearbeat REP channel.
64
69
65 ip : str, optional
70 ip : str, optional
66 The ip address the kernel will bind to.
71 The ip address the kernel will bind to.
67
72
68 key : str, optional
73 key : str, optional
69 The Session key used for HMAC authentication.
74 The Session key used for HMAC authentication.
70
75
71 """
76 """
72 # default to temporary connector file
77 # default to temporary connector file
73 if not fname:
78 if not fname:
74 fname = tempfile.mktemp('.json')
79 fname = tempfile.mktemp('.json')
75
80
76 # Find open ports as necessary.
81 # Find open ports as necessary.
77
82
78 ports = []
83 ports = []
79 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
84 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
80 int(stdin_port <= 0) + int(hb_port <= 0)
85 int(stdin_port <= 0) + int(hb_port <= 0)
81 if transport == 'tcp':
86 if transport == 'tcp':
82 for i in range(ports_needed):
87 for i in range(ports_needed):
83 sock = socket.socket()
88 sock = socket.socket()
84 sock.bind(('', 0))
89 sock.bind(('', 0))
85 ports.append(sock)
90 ports.append(sock)
86 for i, sock in enumerate(ports):
91 for i, sock in enumerate(ports):
87 port = sock.getsockname()[1]
92 port = sock.getsockname()[1]
88 sock.close()
93 sock.close()
89 ports[i] = port
94 ports[i] = port
90 else:
95 else:
91 N = 1
96 N = 1
92 for i in range(ports_needed):
97 for i in range(ports_needed):
93 while os.path.exists("%s-%s" % (ip, str(N))):
98 while os.path.exists("%s-%s" % (ip, str(N))):
94 N += 1
99 N += 1
95 ports.append(N)
100 ports.append(N)
96 N += 1
101 N += 1
97 if shell_port <= 0:
102 if shell_port <= 0:
98 shell_port = ports.pop(0)
103 shell_port = ports.pop(0)
99 if iopub_port <= 0:
104 if iopub_port <= 0:
100 iopub_port = ports.pop(0)
105 iopub_port = ports.pop(0)
101 if stdin_port <= 0:
106 if stdin_port <= 0:
102 stdin_port = ports.pop(0)
107 stdin_port = ports.pop(0)
103 if hb_port <= 0:
108 if hb_port <= 0:
104 hb_port = ports.pop(0)
109 hb_port = ports.pop(0)
105
110
106 cfg = dict( shell_port=shell_port,
111 cfg = dict( shell_port=shell_port,
107 iopub_port=iopub_port,
112 iopub_port=iopub_port,
108 stdin_port=stdin_port,
113 stdin_port=stdin_port,
109 hb_port=hb_port,
114 hb_port=hb_port,
110 )
115 )
111 cfg['ip'] = ip
116 cfg['ip'] = ip
112 cfg['key'] = bytes_to_str(key)
117 cfg['key'] = bytes_to_str(key)
113 cfg['transport'] = transport
118 cfg['transport'] = transport
114
119
115 with open(fname, 'w') as f:
120 with open(fname, 'w') as f:
116 f.write(json.dumps(cfg, indent=2))
121 f.write(json.dumps(cfg, indent=2))
117
122
118 return fname, cfg
123 return fname, cfg
119
124
120
125
121 def get_connection_file(app=None):
126 def get_connection_file(app=None):
122 """Return the path to the connection file of an app
127 """Return the path to the connection file of an app
123
128
124 Parameters
129 Parameters
125 ----------
130 ----------
126 app : IPKernelApp instance [optional]
131 app : IPKernelApp instance [optional]
127 If unspecified, the currently running app will be used
132 If unspecified, the currently running app will be used
128 """
133 """
129 if app is None:
134 if app is None:
130 from IPython.kernel.zmq.kernelapp import IPKernelApp
135 from IPython.kernel.zmq.kernelapp import IPKernelApp
131 if not IPKernelApp.initialized():
136 if not IPKernelApp.initialized():
132 raise RuntimeError("app not specified, and not in a running Kernel")
137 raise RuntimeError("app not specified, and not in a running Kernel")
133
138
134 app = IPKernelApp.instance()
139 app = IPKernelApp.instance()
135 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
140 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
136
141
137
142
138 def find_connection_file(filename, profile=None):
143 def find_connection_file(filename, profile=None):
139 """find a connection file, and return its absolute path.
144 """find a connection file, and return its absolute path.
140
145
141 The current working directory and the profile's security
146 The current working directory and the profile's security
142 directory will be searched for the file if it is not given by
147 directory will be searched for the file if it is not given by
143 absolute path.
148 absolute path.
144
149
145 If profile is unspecified, then the current running application's
150 If profile is unspecified, then the current running application's
146 profile will be used, or 'default', if not run from IPython.
151 profile will be used, or 'default', if not run from IPython.
147
152
148 If the argument does not match an existing file, it will be interpreted as a
153 If the argument does not match an existing file, it will be interpreted as a
149 fileglob, and the matching file in the profile's security dir with
154 fileglob, and the matching file in the profile's security dir with
150 the latest access time will be used.
155 the latest access time will be used.
151
156
152 Parameters
157 Parameters
153 ----------
158 ----------
154 filename : str
159 filename : str
155 The connection file or fileglob to search for.
160 The connection file or fileglob to search for.
156 profile : str [optional]
161 profile : str [optional]
157 The name of the profile to use when searching for the connection file,
162 The name of the profile to use when searching for the connection file,
158 if different from the current IPython session or 'default'.
163 if different from the current IPython session or 'default'.
159
164
160 Returns
165 Returns
161 -------
166 -------
162 str : The absolute path of the connection file.
167 str : The absolute path of the connection file.
163 """
168 """
164 from IPython.core.application import BaseIPythonApplication as IPApp
169 from IPython.core.application import BaseIPythonApplication as IPApp
165 try:
170 try:
166 # quick check for absolute path, before going through logic
171 # quick check for absolute path, before going through logic
167 return filefind(filename)
172 return filefind(filename)
168 except IOError:
173 except IOError:
169 pass
174 pass
170
175
171 if profile is None:
176 if profile is None:
172 # profile unspecified, check if running from an IPython app
177 # profile unspecified, check if running from an IPython app
173 if IPApp.initialized():
178 if IPApp.initialized():
174 app = IPApp.instance()
179 app = IPApp.instance()
175 profile_dir = app.profile_dir
180 profile_dir = app.profile_dir
176 else:
181 else:
177 # not running in IPython, use default profile
182 # not running in IPython, use default profile
178 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), 'default')
183 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), 'default')
179 else:
184 else:
180 # find profiledir by profile name:
185 # find profiledir by profile name:
181 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), profile)
186 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), profile)
182 security_dir = profile_dir.security_dir
187 security_dir = profile_dir.security_dir
183
188
184 try:
189 try:
185 # first, try explicit name
190 # first, try explicit name
186 return filefind(filename, ['.', security_dir])
191 return filefind(filename, ['.', security_dir])
187 except IOError:
192 except IOError:
188 pass
193 pass
189
194
190 # not found by full name
195 # not found by full name
191
196
192 if '*' in filename:
197 if '*' in filename:
193 # given as a glob already
198 # given as a glob already
194 pat = filename
199 pat = filename
195 else:
200 else:
196 # accept any substring match
201 # accept any substring match
197 pat = '*%s*' % filename
202 pat = '*%s*' % filename
198 matches = glob.glob( os.path.join(security_dir, pat) )
203 matches = glob.glob( os.path.join(security_dir, pat) )
199 if not matches:
204 if not matches:
200 raise IOError("Could not find %r in %r" % (filename, security_dir))
205 raise IOError("Could not find %r in %r" % (filename, security_dir))
201 elif len(matches) == 1:
206 elif len(matches) == 1:
202 return matches[0]
207 return matches[0]
203 else:
208 else:
204 # get most recent match, by access time:
209 # get most recent match, by access time:
205 return sorted(matches, key=lambda f: os.stat(f).st_atime)[-1]
210 return sorted(matches, key=lambda f: os.stat(f).st_atime)[-1]
206
211
207
212
208 def get_connection_info(connection_file=None, unpack=False, profile=None):
213 def get_connection_info(connection_file=None, unpack=False, profile=None):
209 """Return the connection information for the current Kernel.
214 """Return the connection information for the current Kernel.
210
215
211 Parameters
216 Parameters
212 ----------
217 ----------
213 connection_file : str [optional]
218 connection_file : str [optional]
214 The connection file to be used. Can be given by absolute path, or
219 The connection file to be used. Can be given by absolute path, or
215 IPython will search in the security directory of a given profile.
220 IPython will search in the security directory of a given profile.
216 If run from IPython,
221 If run from IPython,
217
222
218 If unspecified, the connection file for the currently running
223 If unspecified, the connection file for the currently running
219 IPython Kernel will be used, which is only allowed from inside a kernel.
224 IPython Kernel will be used, which is only allowed from inside a kernel.
220 unpack : bool [default: False]
225 unpack : bool [default: False]
221 if True, return the unpacked dict, otherwise just the string contents
226 if True, return the unpacked dict, otherwise just the string contents
222 of the file.
227 of the file.
223 profile : str [optional]
228 profile : str [optional]
224 The name of the profile to use when searching for the connection file,
229 The name of the profile to use when searching for the connection file,
225 if different from the current IPython session or 'default'.
230 if different from the current IPython session or 'default'.
226
231
227
232
228 Returns
233 Returns
229 -------
234 -------
230 The connection dictionary of the current kernel, as string or dict,
235 The connection dictionary of the current kernel, as string or dict,
231 depending on `unpack`.
236 depending on `unpack`.
232 """
237 """
233 if connection_file is None:
238 if connection_file is None:
234 # get connection file from current kernel
239 # get connection file from current kernel
235 cf = get_connection_file()
240 cf = get_connection_file()
236 else:
241 else:
237 # connection file specified, allow shortnames:
242 # connection file specified, allow shortnames:
238 cf = find_connection_file(connection_file, profile=profile)
243 cf = find_connection_file(connection_file, profile=profile)
239
244
240 with open(cf) as f:
245 with open(cf) as f:
241 info = f.read()
246 info = f.read()
242
247
243 if unpack:
248 if unpack:
244 info = json.loads(info)
249 info = json.loads(info)
245 # ensure key is bytes:
250 # ensure key is bytes:
246 info['key'] = str_to_bytes(info.get('key', ''))
251 info['key'] = str_to_bytes(info.get('key', ''))
247 return info
252 return info
248
253
249
254
250 def connect_qtconsole(connection_file=None, argv=None, profile=None):
255 def connect_qtconsole(connection_file=None, argv=None, profile=None):
251 """Connect a qtconsole to the current kernel.
256 """Connect a qtconsole to the current kernel.
252
257
253 This is useful for connecting a second qtconsole to a kernel, or to a
258 This is useful for connecting a second qtconsole to a kernel, or to a
254 local notebook.
259 local notebook.
255
260
256 Parameters
261 Parameters
257 ----------
262 ----------
258 connection_file : str [optional]
263 connection_file : str [optional]
259 The connection file to be used. Can be given by absolute path, or
264 The connection file to be used. Can be given by absolute path, or
260 IPython will search in the security directory of a given profile.
265 IPython will search in the security directory of a given profile.
261 If run from IPython,
266 If run from IPython,
262
267
263 If unspecified, the connection file for the currently running
268 If unspecified, the connection file for the currently running
264 IPython Kernel will be used, which is only allowed from inside a kernel.
269 IPython Kernel will be used, which is only allowed from inside a kernel.
265 argv : list [optional]
270 argv : list [optional]
266 Any extra args to be passed to the console.
271 Any extra args to be passed to the console.
267 profile : str [optional]
272 profile : str [optional]
268 The name of the profile to use when searching for the connection file,
273 The name of the profile to use when searching for the connection file,
269 if different from the current IPython session or 'default'.
274 if different from the current IPython session or 'default'.
270
275
271
276
272 Returns
277 Returns
273 -------
278 -------
274 subprocess.Popen instance running the qtconsole frontend
279 subprocess.Popen instance running the qtconsole frontend
275 """
280 """
276 argv = [] if argv is None else argv
281 argv = [] if argv is None else argv
277
282
278 if connection_file is None:
283 if connection_file is None:
279 # get connection file from current kernel
284 # get connection file from current kernel
280 cf = get_connection_file()
285 cf = get_connection_file()
281 else:
286 else:
282 cf = find_connection_file(connection_file, profile=profile)
287 cf = find_connection_file(connection_file, profile=profile)
283
288
284 cmd = ';'.join([
289 cmd = ';'.join([
285 "from IPython.frontend.qt.console import qtconsoleapp",
290 "from IPython.frontend.qt.console import qtconsoleapp",
286 "qtconsoleapp.main()"
291 "qtconsoleapp.main()"
287 ])
292 ])
288
293
289 return Popen([sys.executable, '-c', cmd, '--existing', cf] + argv, stdout=PIPE, stderr=PIPE)
294 return Popen([sys.executable, '-c', cmd, '--existing', cf] + argv, stdout=PIPE, stderr=PIPE)
290
295
291
296
292 def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
297 def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
293 """tunnel connections to a kernel via ssh
298 """tunnel connections to a kernel via ssh
294
299
295 This will open four SSH tunnels from localhost on this machine to the
300 This will open four SSH tunnels from localhost on this machine to the
296 ports associated with the kernel. They can be either direct
301 ports associated with the kernel. They can be either direct
297 localhost-localhost tunnels, or if an intermediate server is necessary,
302 localhost-localhost tunnels, or if an intermediate server is necessary,
298 the kernel must be listening on a public IP.
303 the kernel must be listening on a public IP.
299
304
300 Parameters
305 Parameters
301 ----------
306 ----------
302 connection_info : dict or str (path)
307 connection_info : dict or str (path)
303 Either a connection dict, or the path to a JSON connection file
308 Either a connection dict, or the path to a JSON connection file
304 sshserver : str
309 sshserver : str
305 The ssh sever to use to tunnel to the kernel. Can be a full
310 The ssh sever to use to tunnel to the kernel. Can be a full
306 `user@server:port` string. ssh config aliases are respected.
311 `user@server:port` string. ssh config aliases are respected.
307 sshkey : str [optional]
312 sshkey : str [optional]
308 Path to file containing ssh key to use for authentication.
313 Path to file containing ssh key to use for authentication.
309 Only necessary if your ssh config does not already associate
314 Only necessary if your ssh config does not already associate
310 a keyfile with the host.
315 a keyfile with the host.
311
316
312 Returns
317 Returns
313 -------
318 -------
314
319
315 (shell, iopub, stdin, hb) : ints
320 (shell, iopub, stdin, hb) : ints
316 The four ports on localhost that have been forwarded to the kernel.
321 The four ports on localhost that have been forwarded to the kernel.
317 """
322 """
318 if isinstance(connection_info, basestring):
323 if isinstance(connection_info, basestring):
319 # it's a path, unpack it
324 # it's a path, unpack it
320 with open(connection_info) as f:
325 with open(connection_info) as f:
321 connection_info = json.loads(f.read())
326 connection_info = json.loads(f.read())
322
327
323 cf = connection_info
328 cf = connection_info
324
329
325 lports = tunnel.select_random_ports(4)
330 lports = tunnel.select_random_ports(4)
326 rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
331 rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
327
332
328 remote_ip = cf['ip']
333 remote_ip = cf['ip']
329
334
330 if tunnel.try_passwordless_ssh(sshserver, sshkey):
335 if tunnel.try_passwordless_ssh(sshserver, sshkey):
331 password=False
336 password=False
332 else:
337 else:
333 password = getpass("SSH Password for %s: "%sshserver)
338 password = getpass("SSH Password for %s: "%sshserver)
334
339
335 for lp,rp in zip(lports, rports):
340 for lp,rp in zip(lports, rports):
336 tunnel.ssh_tunnel(lp, rp, sshserver, remote_ip, sshkey, password)
341 tunnel.ssh_tunnel(lp, rp, sshserver, remote_ip, sshkey, password)
337
342
338 return tuple(lports)
343 return tuple(lports)
339
344
345
346 #-----------------------------------------------------------------------------
347 # Mixin for classes that workw ith connection files
348 #-----------------------------------------------------------------------------
349
350 class ConnectionFileMixin(HasTraits):
351 """Mixin for configurable classes that work with connection files"""
352
353 # The addresses for the communication channels
354 connection_file = Unicode('')
355 _connection_file_written = Bool(False)
356
357 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
358
359 ip = Unicode(LOCALHOST, config=True,
360 help="""Set the kernel\'s IP address [default localhost].
361 If the IP address is something other than localhost, then
362 Consoles on other machines will be able to connect
363 to the Kernel, so be careful!"""
364 )
365
366 def _ip_default(self):
367 if self.transport == 'ipc':
368 if self.connection_file:
369 return os.path.splitext(self.connection_file)[0] + '-ipc'
370 else:
371 return 'kernel-ipc'
372 else:
373 return LOCALHOST
374
375 def _ip_changed(self, name, old, new):
376 if new == '*':
377 self.ip = '0.0.0.0'
378
379 # protected traits
380
381 shell_port = Integer(0)
382 iopub_port = Integer(0)
383 stdin_port = Integer(0)
384 hb_port = Integer(0)
385
386 #--------------------------------------------------------------------------
387 # Connection and ipc file management
388 #--------------------------------------------------------------------------
389
390 def cleanup_connection_file(self):
391 """Cleanup connection file *if we wrote it*
392
393 Will not raise if the connection file was already removed somehow.
394 """
395 if self._connection_file_written:
396 # cleanup connection files on full shutdown of kernel we started
397 self._connection_file_written = False
398 try:
399 os.remove(self.connection_file)
400 except (IOError, OSError, AttributeError):
401 pass
402
403 def cleanup_ipc_files(self):
404 """Cleanup ipc files if we wrote them."""
405 if self.transport != 'ipc':
406 return
407 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
408 ipcfile = "%s-%i" % (self.ip, port)
409 try:
410 os.remove(ipcfile)
411 except (IOError, OSError):
412 pass
413
414 def write_connection_file(self):
415 """Write connection info to JSON dict in self.connection_file."""
416 if self._connection_file_written:
417 return
418 self.connection_file,cfg = write_connection_file(self.connection_file,
419 transport=self.transport, ip=self.ip, key=self.session.key,
420 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
421 shell_port=self.shell_port, hb_port=self.hb_port)
422 # write_connection_file also sets default ports:
423 self.shell_port = cfg['shell_port']
424 self.stdin_port = cfg['stdin_port']
425 self.iopub_port = cfg['iopub_port']
426 self.hb_port = cfg['hb_port']
427
428 self._connection_file_written = True
429
430 def load_connection_file(self):
431 """Load connection info from JSON dict in self.connection_file."""
432 with open(self.connection_file) as f:
433 cfg = json.loads(f.read())
434
435 self.transport = cfg.get('transport', 'tcp')
436 self.ip = cfg['ip']
437 self.shell_port = cfg['shell_port']
438 self.stdin_port = cfg['stdin_port']
439 self.iopub_port = cfg['iopub_port']
440 self.hb_port = cfg['hb_port']
441 self.session.key = str_to_bytes(cfg['key'])
442
443
444
340 __all__ = [
445 __all__ = [
341 'write_connection_file',
446 'write_connection_file',
342 'get_connection_file',
447 'get_connection_file',
343 'find_connection_file',
448 'find_connection_file',
344 'get_connection_info',
449 'get_connection_info',
345 'connect_qtconsole',
450 'connect_qtconsole',
346 'tunnel_to_kernel',
451 'tunnel_to_kernel',
347 ]
452 ]
@@ -1,50 +1,50
1 """A kernel manager with ioloop based logic."""
1 """A kernel manager with a tornado IOLoop"""
2
2
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
4 # Copyright (C) 2013 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 from __future__ import absolute_import
14 from __future__ import absolute_import
15
15
16 import zmq
16 import zmq
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 from IPython.utils.traitlets import (
19 from IPython.utils.traitlets import (
20 Instance
20 Instance
21 )
21 )
22
22
23 from IPython.kernel.blocking.manager import BlockingKernelManager
23 from IPython.kernel.manager import KernelManager
24 from .restarter import IOLoopKernelRestarter
24 from .restarter import IOLoopKernelRestarter
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Code
27 # Code
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 class IOLoopKernelManager(BlockingKernelManager):
30 class IOLoopKernelManager(KernelManager):
31
31
32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
33 def _loop_default(self):
33 def _loop_default(self):
34 return ioloop.IOLoop.instance()
34 return ioloop.IOLoop.instance()
35
35
36 _restarter = Instance('IPython.kernel.ioloop.IOLoopKernelRestarter')
36 _restarter = Instance('IPython.kernel.ioloop.IOLoopKernelRestarter')
37
37
38 def start_restarter(self):
38 def start_restarter(self):
39 if self.autorestart and self.has_kernel:
39 if self.autorestart and self.has_kernel:
40 if self._restarter is None:
40 if self._restarter is None:
41 self._restarter = IOLoopKernelRestarter(
41 self._restarter = IOLoopKernelRestarter(
42 kernel_manager=self, loop=self.loop,
42 kernel_manager=self, loop=self.loop,
43 config=self.config, log=self.log
43 config=self.config, log=self.log
44 )
44 )
45 self._restarter.start()
45 self._restarter.start()
46
46
47 def stop_restarter(self):
47 def stop_restarter(self):
48 if self.autorestart:
48 if self.autorestart:
49 if self._restarter is not None:
49 if self._restarter is not None:
50 self._restarter.stop()
50 self._restarter.stop()
This diff has been collapsed as it changes many lines, (869 lines changed) Show them Hide them
@@ -1,1146 +1,325
1 """Base classes to manage the interaction with a running kernel.
1 """Base class to manage a running kernel
2
3 TODO
4 * Create logger to handle debugging and console messages.
5 """
2 """
6
3
7 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
5 # Copyright (C) 2013 The IPython Development Team
9 #
6 #
10 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
13
10
14 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
15 # Imports
12 # Imports
16 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
17
14
18 from __future__ import absolute_import
15 from __future__ import absolute_import
19
16
20 # Standard library imports
17 # Standard library imports
21 import atexit
22 import errno
23 import json
24 import os
25 import signal
18 import signal
26 import sys
19 import sys
27 from threading import Thread
28 import time
20 import time
29
21
30 import zmq
22 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
35
23
36 # Local imports
24 # Local imports
37 from IPython.config.configurable import Configurable
25 from IPython.config.configurable import LoggingConfigurable
38 from IPython.utils.importstring import import_item
26 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
41 Any, Instance, Type, Unicode, List, Integer, Bool,
28 Any, Instance, Unicode, List, Bool,
42 CaselessStrEnum, DottedObjectName
43 )
29 )
44 from IPython.utils.py3compat import str_to_bytes
45 from IPython.kernel import (
30 from IPython.kernel import (
46 write_connection_file,
47 make_ipkernel_cmd,
31 make_ipkernel_cmd,
48 launch_kernel,
32 launch_kernel,
49 )
33 )
34 from .connect import ConnectionFileMixin
50 from .zmq.session import Session
35 from .zmq.session import Session
51 from .managerabc import (
36 from .managerabc import (
52 ShellChannelABC, IOPubChannelABC,
53 HBChannelABC, StdInChannelABC,
54 KernelManagerABC
37 KernelManagerABC
55 )
38 )
56
39
57 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
58 # Constants and exceptions
59 #-----------------------------------------------------------------------------
60
61 class InvalidPortNumber(Exception):
62 pass
63
64 #-----------------------------------------------------------------------------
65 # Utility functions
66 #-----------------------------------------------------------------------------
67
68 # some utilities to validate message structure, these might get moved elsewhere
69 # if they prove to have more generic utility
70
71 def validate_string_list(lst):
72 """Validate that the input is a list of strings.
73
74 Raises ValueError if not."""
75 if not isinstance(lst, list):
76 raise ValueError('input %r must be a list' % lst)
77 for x in lst:
78 if not isinstance(x, basestring):
79 raise ValueError('element %r in list must be a string' % x)
80
81
82 def validate_string_dict(dct):
83 """Validate that the input is a dict with string keys and values.
84
85 Raises ValueError if not."""
86 for k,v in dct.iteritems():
87 if not isinstance(k, basestring):
88 raise ValueError('key %r in dict must be a string' % k)
89 if not isinstance(v, basestring):
90 raise ValueError('value %r in dict must be a string' % v)
91
92
93 #-----------------------------------------------------------------------------
94 # ZMQ Socket Channel classes
95 #-----------------------------------------------------------------------------
96
97 class ZMQSocketChannel(Thread):
98 """The base class for the channels that use ZMQ sockets."""
99 context = None
100 session = None
101 socket = None
102 ioloop = None
103 stream = None
104 _address = None
105 _exiting = False
106
107 def __init__(self, context, session, address):
108 """Create a channel.
109
110 Parameters
111 ----------
112 context : :class:`zmq.Context`
113 The ZMQ context to use.
114 session : :class:`session.Session`
115 The session to use.
116 address : zmq url
117 Standard (ip, port) tuple that the kernel is listening on.
118 """
119 super(ZMQSocketChannel, self).__init__()
120 self.daemon = True
121
122 self.context = context
123 self.session = session
124 if isinstance(address, tuple):
125 if address[1] == 0:
126 message = 'The port number for a channel cannot be 0.'
127 raise InvalidPortNumber(message)
128 address = "tcp://%s:%i" % address
129 self._address = address
130 atexit.register(self._notice_exit)
131
132 def _notice_exit(self):
133 self._exiting = True
134
135 def _run_loop(self):
136 """Run my loop, ignoring EINTR events in the poller"""
137 while True:
138 try:
139 self.ioloop.start()
140 except ZMQError as e:
141 if e.errno == errno.EINTR:
142 continue
143 else:
144 raise
145 except Exception:
146 if self._exiting:
147 break
148 else:
149 raise
150 else:
151 break
152
153 def stop(self):
154 """Stop the channel's event loop and join its thread.
155
156 This calls :method:`Thread.join` and returns when the thread
157 terminates. :class:`RuntimeError` will be raised if
158 :method:`self.start` is called again.
159 """
160 self.join()
161
162 @property
163 def address(self):
164 """Get the channel's address as a zmq url string.
165
166 These URLS have the form: 'tcp://127.0.0.1:5555'.
167 """
168 return self._address
169
170 def _queue_send(self, msg):
171 """Queue a message to be sent from the IOLoop's thread.
172
173 Parameters
174 ----------
175 msg : message to send
176
177 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
178 thread control of the action.
179 """
180 def thread_send():
181 self.session.send(self.stream, msg)
182 self.ioloop.add_callback(thread_send)
183
184 def _handle_recv(self, msg):
185 """Callback for stream.on_recv.
186
187 Unpacks message, and calls handlers with it.
188 """
189 ident,smsg = self.session.feed_identities(msg)
190 self.call_handlers(self.session.unserialize(smsg))
191
192
193
194 class ShellChannel(ZMQSocketChannel):
195 """The shell channel for issuing request/replies to the kernel."""
196
197 command_queue = None
198 # flag for whether execute requests should be allowed to call raw_input:
199 allow_stdin = True
200
201 def __init__(self, context, session, address):
202 super(ShellChannel, self).__init__(context, session, address)
203 self.ioloop = ioloop.IOLoop()
204
205 def run(self):
206 """The thread's main activity. Call start() instead."""
207 self.socket = self.context.socket(zmq.DEALER)
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
213 try:
214 self.socket.close()
215 except:
216 pass
217
218 def stop(self):
219 """Stop the channel's event loop and join its thread."""
220 self.ioloop.stop()
221 super(ShellChannel, self).stop()
222
223 def call_handlers(self, msg):
224 """This method is called in the ioloop thread when a message arrives.
225
226 Subclasses should override this method to handle incoming messages.
227 It is important to remember that this method is called in the thread
228 so that some logic must be done to ensure that the application leve
229 handlers are called in the application thread.
230 """
231 raise NotImplementedError('call_handlers must be defined in a subclass.')
232
233 def execute(self, code, silent=False, store_history=True,
234 user_variables=None, user_expressions=None, allow_stdin=None):
235 """Execute code in the kernel.
236
237 Parameters
238 ----------
239 code : str
240 A string of Python code.
241
242 silent : bool, optional (default False)
243 If set, the kernel will execute the code as quietly possible, and
244 will force store_history to be False.
245
246 store_history : bool, optional (default True)
247 If set, the kernel will store command history. This is forced
248 to be False if silent is True.
249
250 user_variables : list, optional
251 A list of variable names to pull from the user's namespace. They
252 will come back as a dict with these names as keys and their
253 :func:`repr` as values.
254
255 user_expressions : dict, optional
256 A dict mapping names to expressions to be evaluated in the user's
257 dict. The expression values are returned as strings formatted using
258 :func:`repr`.
259
260 allow_stdin : bool, optional (default self.allow_stdin)
261 Flag for whether the kernel can send stdin requests to frontends.
262
263 Some frontends (e.g. the Notebook) do not support stdin requests.
264 If raw_input is called from code executed from such a frontend, a
265 StdinNotImplementedError will be raised.
266
267 Returns
268 -------
269 The msg_id of the message sent.
270 """
271 if user_variables is None:
272 user_variables = []
273 if user_expressions is None:
274 user_expressions = {}
275 if allow_stdin is None:
276 allow_stdin = self.allow_stdin
277
278
279 # Don't waste network traffic if inputs are invalid
280 if not isinstance(code, basestring):
281 raise ValueError('code %r must be a string' % code)
282 validate_string_list(user_variables)
283 validate_string_dict(user_expressions)
284
285 # Create class for content/msg creation. Related to, but possibly
286 # not in Session.
287 content = dict(code=code, silent=silent, store_history=store_history,
288 user_variables=user_variables,
289 user_expressions=user_expressions,
290 allow_stdin=allow_stdin,
291 )
292 msg = self.session.msg('execute_request', content)
293 self._queue_send(msg)
294 return msg['header']['msg_id']
295
296 def complete(self, text, line, cursor_pos, block=None):
297 """Tab complete text in the kernel's namespace.
298
299 Parameters
300 ----------
301 text : str
302 The text to complete.
303 line : str
304 The full line of text that is the surrounding context for the
305 text to complete.
306 cursor_pos : int
307 The position of the cursor in the line where the completion was
308 requested.
309 block : str, optional
310 The full block of code in which the completion is being requested.
311
312 Returns
313 -------
314 The msg_id of the message sent.
315 """
316 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
317 msg = self.session.msg('complete_request', content)
318 self._queue_send(msg)
319 return msg['header']['msg_id']
320
321 def object_info(self, oname, detail_level=0):
322 """Get metadata information about an object in the kernel's namespace.
323
324 Parameters
325 ----------
326 oname : str
327 A string specifying the object name.
328 detail_level : int, optional
329 The level of detail for the introspection (0-2)
330
331 Returns
332 -------
333 The msg_id of the message sent.
334 """
335 content = dict(oname=oname, detail_level=detail_level)
336 msg = self.session.msg('object_info_request', content)
337 self._queue_send(msg)
338 return msg['header']['msg_id']
339
340 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
341 """Get entries from the kernel's history list.
342
343 Parameters
344 ----------
345 raw : bool
346 If True, return the raw input.
347 output : bool
348 If True, then return the output as well.
349 hist_access_type : str
350 'range' (fill in session, start and stop params), 'tail' (fill in n)
351 or 'search' (fill in pattern param).
352
353 session : int
354 For a range request, the session from which to get lines. Session
355 numbers are positive integers; negative ones count back from the
356 current session.
357 start : int
358 The first line number of a history range.
359 stop : int
360 The final (excluded) line number of a history range.
361
362 n : int
363 The number of lines of history to get for a tail request.
364
365 pattern : str
366 The glob-syntax pattern for a search request.
367
368 Returns
369 -------
370 The msg_id of the message sent.
371 """
372 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
373 **kwargs)
374 msg = self.session.msg('history_request', content)
375 self._queue_send(msg)
376 return msg['header']['msg_id']
377
378 def kernel_info(self):
379 """Request kernel info."""
380 msg = self.session.msg('kernel_info_request')
381 self._queue_send(msg)
382 return msg['header']['msg_id']
383
384 def shutdown(self, restart=False):
385 """Request an immediate kernel shutdown.
386
387 Upon receipt of the (empty) reply, client code can safely assume that
388 the kernel has shut down and it's safe to forcefully terminate it if
389 it's still alive.
390
391 The kernel will send the reply via a function registered with Python's
392 atexit module, ensuring it's truly done as the kernel is done with all
393 normal operation.
394 """
395 # Send quit message to kernel. Once we implement kernel-side setattr,
396 # this should probably be done that way, but for now this will do.
397 msg = self.session.msg('shutdown_request', {'restart':restart})
398 self._queue_send(msg)
399 return msg['header']['msg_id']
400
401
402
403 class IOPubChannel(ZMQSocketChannel):
404 """The iopub channel which listens for messages that the kernel publishes.
405
406 This channel is where all output is published to frontends.
407 """
408
409 def __init__(self, context, session, address):
410 super(IOPubChannel, self).__init__(context, session, address)
411 self.ioloop = ioloop.IOLoop()
412
413 def run(self):
414 """The thread's main activity. Call start() instead."""
415 self.socket = self.context.socket(zmq.SUB)
416 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
417 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
418 self.socket.connect(self.address)
419 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
420 self.stream.on_recv(self._handle_recv)
421 self._run_loop()
422 try:
423 self.socket.close()
424 except:
425 pass
426
427 def stop(self):
428 """Stop the channel's event loop and join its thread."""
429 self.ioloop.stop()
430 super(IOPubChannel, self).stop()
431
432 def call_handlers(self, msg):
433 """This method is called in the ioloop thread when a message arrives.
434
435 Subclasses should override this method to handle incoming messages.
436 It is important to remember that this method is called in the thread
437 so that some logic must be done to ensure that the application leve
438 handlers are called in the application thread.
439 """
440 raise NotImplementedError('call_handlers must be defined in a subclass.')
441
442 def flush(self, timeout=1.0):
443 """Immediately processes all pending messages on the iopub channel.
444
445 Callers should use this method to ensure that :method:`call_handlers`
446 has been called for all messages that have been received on the
447 0MQ SUB socket of this channel.
448
449 This method is thread safe.
450
451 Parameters
452 ----------
453 timeout : float, optional
454 The maximum amount of time to spend flushing, in seconds. The
455 default is one second.
456 """
457 # We do the IOLoop callback process twice to ensure that the IOLoop
458 # gets to perform at least one full poll.
459 stop_time = time.time() + timeout
460 for i in xrange(2):
461 self._flushed = False
462 self.ioloop.add_callback(self._flush)
463 while not self._flushed and time.time() < stop_time:
464 time.sleep(0.01)
465
466 def _flush(self):
467 """Callback for :method:`self.flush`."""
468 self.stream.flush()
469 self._flushed = True
470
471
472 class StdInChannel(ZMQSocketChannel):
473 """The stdin channel to handle raw_input requests that the kernel makes."""
474
475 msg_queue = None
476
477 def __init__(self, context, session, address):
478 super(StdInChannel, self).__init__(context, session, address)
479 self.ioloop = ioloop.IOLoop()
480
481 def run(self):
482 """The thread's main activity. Call start() instead."""
483 self.socket = self.context.socket(zmq.DEALER)
484 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
485 self.socket.connect(self.address)
486 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
487 self.stream.on_recv(self._handle_recv)
488 self._run_loop()
489 try:
490 self.socket.close()
491 except:
492 pass
493
494 def stop(self):
495 """Stop the channel's event loop and join its thread."""
496 self.ioloop.stop()
497 super(StdInChannel, self).stop()
498
499 def call_handlers(self, msg):
500 """This method is called in the ioloop thread when a message arrives.
501
502 Subclasses should override this method to handle incoming messages.
503 It is important to remember that this method is called in the thread
504 so that some logic must be done to ensure that the application leve
505 handlers are called in the application thread.
506 """
507 raise NotImplementedError('call_handlers must be defined in a subclass.')
508
509 def input(self, string):
510 """Send a string of raw input to the kernel."""
511 content = dict(value=string)
512 msg = self.session.msg('input_reply', content)
513 self._queue_send(msg)
514
515
516 class HBChannel(ZMQSocketChannel):
517 """The heartbeat channel which monitors the kernel heartbeat.
518
519 Note that the heartbeat channel is paused by default. As long as you start
520 this channel, the kernel manager will ensure that it is paused and un-paused
521 as appropriate.
522 """
523
524 time_to_dead = 3.0
525 socket = None
526 poller = None
527 _running = None
528 _pause = None
529 _beating = None
530
531 def __init__(self, context, session, address):
532 super(HBChannel, self).__init__(context, session, address)
533 self._running = False
534 self._pause =True
535 self.poller = zmq.Poller()
536
537 def _create_socket(self):
538 if self.socket is not None:
539 # close previous socket, before opening a new one
540 self.poller.unregister(self.socket)
541 self.socket.close()
542 self.socket = self.context.socket(zmq.REQ)
543 self.socket.setsockopt(zmq.LINGER, 0)
544 self.socket.connect(self.address)
545
546 self.poller.register(self.socket, zmq.POLLIN)
547
548 def _poll(self, start_time):
549 """poll for heartbeat replies until we reach self.time_to_dead.
550
551 Ignores interrupts, and returns the result of poll(), which
552 will be an empty list if no messages arrived before the timeout,
553 or the event tuple if there is a message to receive.
554 """
555
556 until_dead = self.time_to_dead - (time.time() - start_time)
557 # ensure poll at least once
558 until_dead = max(until_dead, 1e-3)
559 events = []
560 while True:
561 try:
562 events = self.poller.poll(1000 * until_dead)
563 except ZMQError as e:
564 if e.errno == errno.EINTR:
565 # ignore interrupts during heartbeat
566 # this may never actually happen
567 until_dead = self.time_to_dead - (time.time() - start_time)
568 until_dead = max(until_dead, 1e-3)
569 pass
570 else:
571 raise
572 except Exception:
573 if self._exiting:
574 break
575 else:
576 raise
577 else:
578 break
579 return events
580
581 def run(self):
582 """The thread's main activity. Call start() instead."""
583 self._create_socket()
584 self._running = True
585 self._beating = True
586
587 while self._running:
588 if self._pause:
589 # just sleep, and skip the rest of the loop
590 time.sleep(self.time_to_dead)
591 continue
592
593 since_last_heartbeat = 0.0
594 # io.rprint('Ping from HB channel') # dbg
595 # no need to catch EFSM here, because the previous event was
596 # either a recv or connect, which cannot be followed by EFSM
597 self.socket.send(b'ping')
598 request_time = time.time()
599 ready = self._poll(request_time)
600 if ready:
601 self._beating = True
602 # the poll above guarantees we have something to recv
603 self.socket.recv()
604 # sleep the remainder of the cycle
605 remainder = self.time_to_dead - (time.time() - request_time)
606 if remainder > 0:
607 time.sleep(remainder)
608 continue
609 else:
610 # nothing was received within the time limit, signal heart failure
611 self._beating = False
612 since_last_heartbeat = time.time() - request_time
613 self.call_handlers(since_last_heartbeat)
614 # and close/reopen the socket, because the REQ/REP cycle has been broken
615 self._create_socket()
616 continue
617 try:
618 self.socket.close()
619 except:
620 pass
621
622 def pause(self):
623 """Pause the heartbeat."""
624 self._pause = True
625
626 def unpause(self):
627 """Unpause the heartbeat."""
628 self._pause = False
629
630 def is_beating(self):
631 """Is the heartbeat running and responsive (and not paused)."""
632 if self.is_alive() and not self._pause and self._beating:
633 return True
634 else:
635 return False
636
637 def stop(self):
638 """Stop the channel's event loop and join its thread."""
639 self._running = False
640 super(HBChannel, self).stop()
641
642 def call_handlers(self, since_last_heartbeat):
643 """This method is called in the ioloop thread when a message arrives.
644
645 Subclasses should override this method to handle incoming messages.
646 It is important to remember that this method is called in the thread
647 so that some logic must be done to ensure that the application level
648 handlers are called in the application thread.
649 """
650 raise NotImplementedError('call_handlers must be defined in a subclass.')
651
652
653 #-----------------------------------------------------------------------------
654 # Main kernel manager class
41 # Main kernel manager class
655 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
656
43
657 class KernelManager(Configurable):
44 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
658 """Manages a single kernel on this host along with its channels.
45 """Manages a single kernel in a subprocess on this host.
659
660 There are four channels associated with each kernel:
661
662 * shell: for request/reply calls to the kernel.
663 * iopub: for the kernel to publish results to frontends.
664 * hb: for monitoring the kernel's heartbeat.
665 * stdin: for frontends to reply to raw_input calls in the kernel.
666
667 The usage of the channels that this class manages is optional. It is
668 entirely possible to connect to the kernels directly using ZeroMQ
669 sockets. These channels are useful primarily for talking to a kernel
670 whose :class:`KernelManager` is in the same process.
671
46
672 This version manages kernels started using Popen.
47 This version starts kernels with Popen.
673 """
48 """
49
674 # The PyZMQ Context to use for communication with the kernel.
50 # The PyZMQ Context to use for communication with the kernel.
675 context = Instance(zmq.Context)
51 context = Instance(zmq.Context)
676 def _context_default(self):
52 def _context_default(self):
677 return zmq.Context.instance()
53 return zmq.Context.instance()
678
54
679 # The Session to use for communication with the kernel.
55 # The Session to use for communication with the kernel.
680 session = Instance(Session)
56 session = Instance(Session)
681 def _session_default(self):
57 def _session_default(self):
682 return Session(config=self.config)
58 return Session(config=self.config)
683
59
684 # The kernel process with which the KernelManager is communicating.
60 # The kernel process with which the KernelManager is communicating.
685 # generally a Popen instance
61 # generally a Popen instance
686 kernel = Any()
62 kernel = Any()
687
63
688 kernel_cmd = List(Unicode, config=True,
64 kernel_cmd = List(Unicode, config=True,
689 help="""The Popen Command to launch the kernel.
65 help="""The Popen Command to launch the kernel.
690 Override this if you have a custom
66 Override this if you have a custom
691 """
67 """
692 )
68 )
693
69
694 def _kernel_cmd_changed(self, name, old, new):
70 def _kernel_cmd_changed(self, name, old, new):
695 self.ipython_kernel = False
71 self.ipython_kernel = False
696
72
697 ipython_kernel = Bool(True)
73 ipython_kernel = Bool(True)
698
74
699 # The addresses for the communication channels.
75 # Protected traits
700 connection_file = Unicode('')
701
702 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
703
704 ip = Unicode(LOCALHOST, config=True,
705 help="""Set the kernel\'s IP address [default localhost].
706 If the IP address is something other than localhost, then
707 Consoles on other machines will be able to connect
708 to the Kernel, so be careful!"""
709 )
710
711 def _ip_default(self):
712 if self.transport == 'ipc':
713 if self.connection_file:
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 else:
716 return 'kernel-ipc'
717 else:
718 return LOCALHOST
719
720 def _ip_changed(self, name, old, new):
721 if new == '*':
722 self.ip = '0.0.0.0'
723
724 shell_port = Integer(0)
725 iopub_port = Integer(0)
726 stdin_port = Integer(0)
727 hb_port = Integer(0)
728
729 # The classes to use for the various channels.
730 shell_channel_class = Type(ShellChannel)
731 iopub_channel_class = Type(IOPubChannel)
732 stdin_channel_class = Type(StdInChannel)
733 hb_channel_class = Type(HBChannel)
734
735 # Protected traits.
736 _launch_args = Any
76 _launch_args = Any
737 _shell_channel = Any
738 _iopub_channel = Any
739 _stdin_channel = Any
740 _hb_channel = Any
741 _connection_file_written=Bool(False)
742
77
743 autorestart = Bool(False, config=True,
78 autorestart = Bool(False, config=True,
744 help="""Should we autorestart the kernel if it dies."""
79 help="""Should we autorestart the kernel if it dies."""
745 )
80 )
746
81
747 def __del__(self):
82 def __del__(self):
748 self.cleanup_connection_file()
83 self.cleanup_connection_file()
749
84
750 #--------------------------------------------------------------------------
85 #--------------------------------------------------------------------------
751 # Channel management methods:
752 #--------------------------------------------------------------------------
753
754 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
755 """Starts the channels for this kernel.
756
757 This will create the channels if they do not exist and then start
758 them (their activity runs in a thread). If port numbers of 0 are
759 being used (random ports) then you must first call
760 :method:`start_kernel`. If the channels have been stopped and you
761 call this, :class:`RuntimeError` will be raised.
762 """
763 if shell:
764 self.shell_channel.start()
765 if iopub:
766 self.iopub_channel.start()
767 if stdin:
768 self.stdin_channel.start()
769 self.shell_channel.allow_stdin = True
770 else:
771 self.shell_channel.allow_stdin = False
772 if hb:
773 self.hb_channel.start()
774
775 def stop_channels(self):
776 """Stops all the running channels for this kernel.
777
778 This stops their event loops and joins their threads.
779 """
780 if self.shell_channel.is_alive():
781 self.shell_channel.stop()
782 if self.iopub_channel.is_alive():
783 self.iopub_channel.stop()
784 if self.stdin_channel.is_alive():
785 self.stdin_channel.stop()
786 if self.hb_channel.is_alive():
787 self.hb_channel.stop()
788
789 @property
790 def channels_running(self):
791 """Are any of the channels created and running?"""
792 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
793 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
794
795 def _make_url(self, port):
796 """Make a zmq url with a port.
797
798 There are two cases that this handles:
799
800 * tcp: tcp://ip:port
801 * ipc: ipc://ip-port
802 """
803 if self.transport == 'tcp':
804 return "tcp://%s:%i" % (self.ip, port)
805 else:
806 return "%s://%s-%s" % (self.transport, self.ip, port)
807
808 @property
809 def shell_channel(self):
810 """Get the shell channel object for this kernel."""
811 if self._shell_channel is None:
812 self._shell_channel = self.shell_channel_class(
813 self.context, self.session, self._make_url(self.shell_port)
814 )
815 return self._shell_channel
816
817 @property
818 def iopub_channel(self):
819 """Get the iopub channel object for this kernel."""
820 if self._iopub_channel is None:
821 self._iopub_channel = self.iopub_channel_class(
822 self.context, self.session, self._make_url(self.iopub_port)
823 )
824 return self._iopub_channel
825
826 @property
827 def stdin_channel(self):
828 """Get the stdin channel object for this kernel."""
829 if self._stdin_channel is None:
830 self._stdin_channel = self.stdin_channel_class(
831 self.context, self.session, self._make_url(self.stdin_port)
832 )
833 return self._stdin_channel
834
835 @property
836 def hb_channel(self):
837 """Get the hb channel object for this kernel."""
838 if self._hb_channel is None:
839 self._hb_channel = self.hb_channel_class(
840 self.context, self.session, self._make_url(self.hb_port)
841 )
842 return self._hb_channel
843
844 #--------------------------------------------------------------------------
845 # Connection and ipc file management
846 #--------------------------------------------------------------------------
847
848 def cleanup_connection_file(self):
849 """Cleanup connection file *if we wrote it*
850
851 Will not raise if the connection file was already removed somehow.
852 """
853 if self._connection_file_written:
854 # cleanup connection files on full shutdown of kernel we started
855 self._connection_file_written = False
856 try:
857 os.remove(self.connection_file)
858 except (IOError, OSError, AttributeError):
859 pass
860
861 def cleanup_ipc_files(self):
862 """Cleanup ipc files if we wrote them."""
863 if self.transport != 'ipc':
864 return
865 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
866 ipcfile = "%s-%i" % (self.ip, port)
867 try:
868 os.remove(ipcfile)
869 except (IOError, OSError):
870 pass
871
872 def load_connection_file(self):
873 """Load connection info from JSON dict in self.connection_file."""
874 with open(self.connection_file) as f:
875 cfg = json.loads(f.read())
876
877 from pprint import pprint
878 pprint(cfg)
879 self.transport = cfg.get('transport', 'tcp')
880 self.ip = cfg['ip']
881 self.shell_port = cfg['shell_port']
882 self.stdin_port = cfg['stdin_port']
883 self.iopub_port = cfg['iopub_port']
884 self.hb_port = cfg['hb_port']
885 self.session.key = str_to_bytes(cfg['key'])
886
887 def write_connection_file(self):
888 """Write connection info to JSON dict in self.connection_file."""
889 if self._connection_file_written:
890 return
891 self.connection_file,cfg = write_connection_file(self.connection_file,
892 transport=self.transport, ip=self.ip, key=self.session.key,
893 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
894 shell_port=self.shell_port, hb_port=self.hb_port)
895 # write_connection_file also sets default ports:
896 self.shell_port = cfg['shell_port']
897 self.stdin_port = cfg['stdin_port']
898 self.iopub_port = cfg['iopub_port']
899 self.hb_port = cfg['hb_port']
900
901 self._connection_file_written = True
902
903 #--------------------------------------------------------------------------
904 # Kernel restarter
86 # Kernel restarter
905 #--------------------------------------------------------------------------
87 #--------------------------------------------------------------------------
906
88
907 def start_restarter(self):
89 def start_restarter(self):
908 pass
90 pass
909
91
910 def stop_restarter(self):
92 def stop_restarter(self):
911 pass
93 pass
912
94
913 #--------------------------------------------------------------------------
95 #--------------------------------------------------------------------------
914 # Kernel management
96 # Kernel management
915 #--------------------------------------------------------------------------
97 #--------------------------------------------------------------------------
916
98
917 def format_kernel_cmd(self, **kw):
99 def format_kernel_cmd(self, **kw):
918 """format templated args (e.g. {connection_file})"""
100 """format templated args (e.g. {connection_file})"""
919 if self.kernel_cmd:
101 if self.kernel_cmd:
920 cmd = self.kernel_cmd
102 cmd = self.kernel_cmd
921 else:
103 else:
922 cmd = make_ipkernel_cmd(
104 cmd = make_ipkernel_cmd(
923 'from IPython.kernel.zmq.kernelapp import main; main()',
105 'from IPython.kernel.zmq.kernelapp import main; main()',
924 **kw
106 **kw
925 )
107 )
926 ns = dict(connection_file=self.connection_file)
108 ns = dict(connection_file=self.connection_file)
927 ns.update(self._launch_args)
109 ns.update(self._launch_args)
928 return [ c.format(**ns) for c in cmd ]
110 return [ c.format(**ns) for c in cmd ]
929
111
930 def _launch_kernel(self, kernel_cmd, **kw):
112 def _launch_kernel(self, kernel_cmd, **kw):
931 """actually launch the kernel
113 """actually launch the kernel
932
114
933 override in a subclass to launch kernel subprocesses differently
115 override in a subclass to launch kernel subprocesses differently
934 """
116 """
935 return launch_kernel(kernel_cmd, **kw)
117 return launch_kernel(kernel_cmd, **kw)
936
118
937 def start_kernel(self, **kw):
119 def start_kernel(self, **kw):
938 """Starts a kernel on this host in a separate process.
120 """Starts a kernel on this host in a separate process.
939
121
940 If random ports (port=0) are being used, this method must be called
122 If random ports (port=0) are being used, this method must be called
941 before the channels are created.
123 before the channels are created.
942
124
943 Parameters:
125 Parameters:
944 -----------
126 -----------
945 **kw : optional
127 **kw : optional
946 keyword arguments that are passed down to build the kernel_cmd
128 keyword arguments that are passed down to build the kernel_cmd
947 and launching the kernel (e.g. Popen kwargs).
129 and launching the kernel (e.g. Popen kwargs).
948 """
130 """
949 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
131 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
950 raise RuntimeError("Can only launch a kernel on a local interface. "
132 raise RuntimeError("Can only launch a kernel on a local interface. "
951 "Make sure that the '*_address' attributes are "
133 "Make sure that the '*_address' attributes are "
952 "configured properly. "
134 "configured properly. "
953 "Currently valid addresses are: %s"%LOCAL_IPS
135 "Currently valid addresses are: %s"%LOCAL_IPS
954 )
136 )
955
137
956 # write connection file / get default ports
138 # write connection file / get default ports
957 self.write_connection_file()
139 self.write_connection_file()
958
140
959 # save kwargs for use in restart
141 # save kwargs for use in restart
960 self._launch_args = kw.copy()
142 self._launch_args = kw.copy()
961 # build the Popen cmd
143 # build the Popen cmd
962 kernel_cmd = self.format_kernel_cmd(**kw)
144 kernel_cmd = self.format_kernel_cmd(**kw)
963 # launch the kernel subprocess
145 # launch the kernel subprocess
964 self.kernel = self._launch_kernel(kernel_cmd,
146 self.kernel = self._launch_kernel(kernel_cmd,
965 ipython_kernel=self.ipython_kernel,
147 ipython_kernel=self.ipython_kernel,
966 **kw)
148 **kw)
967 self.start_restarter()
149 self.start_restarter()
968
150
151 def _send_shutdown_request(self, restart=False):
152 """TODO: send a shutdown request via control channel"""
153 raise NotImplementedError("Soft shutdown needs control channel")
154
969 def shutdown_kernel(self, now=False, restart=False):
155 def shutdown_kernel(self, now=False, restart=False):
970 """Attempts to the stop the kernel process cleanly.
156 """Attempts to the stop the kernel process cleanly.
971
157
972 This attempts to shutdown the kernels cleanly by:
158 This attempts to shutdown the kernels cleanly by:
973
159
974 1. Sending it a shutdown message over the shell channel.
160 1. Sending it a shutdown message over the shell channel.
975 2. If that fails, the kernel is shutdown forcibly by sending it
161 2. If that fails, the kernel is shutdown forcibly by sending it
976 a signal.
162 a signal.
977
163
978 Parameters:
164 Parameters:
979 -----------
165 -----------
980 now : bool
166 now : bool
981 Should the kernel be forcible killed *now*. This skips the
167 Should the kernel be forcible killed *now*. This skips the
982 first, nice shutdown attempt.
168 first, nice shutdown attempt.
983 restart: bool
169 restart: bool
984 Will this kernel be restarted after it is shutdown. When this
170 Will this kernel be restarted after it is shutdown. When this
985 is True, connection files will not be cleaned up.
171 is True, connection files will not be cleaned up.
986 """
172 """
987
173
988 # Pause the heart beat channel if it exists.
989 if self._hb_channel is not None:
990 self._hb_channel.pause()
991
992 # Stop monitoring for restarting while we shutdown.
174 # Stop monitoring for restarting while we shutdown.
993 self.stop_restarter()
175 self.stop_restarter()
994
176
995 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
177 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
996 if sys.platform == 'win32':
178 if sys.platform == 'win32':
997 self._kill_kernel()
179 self._kill_kernel()
998 return
180 return
999
181
182 # bypass clean shutdown while
183 # FIXME: add control channel for clean shutdown
184 now = True
185
1000 if now:
186 if now:
1001 if self.has_kernel:
187 if self.has_kernel:
1002 self._kill_kernel()
188 self._kill_kernel()
1003 else:
189 else:
1004 # Don't send any additional kernel kill messages immediately, to give
190 # Don't send any additional kernel kill messages immediately, to give
1005 # the kernel a chance to properly execute shutdown actions. Wait for at
191 # the kernel a chance to properly execute shutdown actions. Wait for at
1006 # most 1s, checking every 0.1s.
192 # most 1s, checking every 0.1s.
1007 self.shell_channel.shutdown(restart=restart)
193 # FIXME: this method is not yet implemented (need Control channel)
194 self._send_shutdown_request(restart=restart)
1008 for i in range(10):
195 for i in range(10):
1009 if self.is_alive():
196 if self.is_alive():
1010 time.sleep(0.1)
197 time.sleep(0.1)
1011 else:
198 else:
1012 break
199 break
1013 else:
200 else:
1014 # OK, we've waited long enough.
201 # OK, we've waited long enough.
1015 if self.has_kernel:
202 if self.has_kernel:
1016 self._kill_kernel()
203 self._kill_kernel()
1017
204
1018 if not restart:
205 if not restart:
1019 self.cleanup_connection_file()
206 self.cleanup_connection_file()
1020 self.cleanup_ipc_files()
207 self.cleanup_ipc_files()
1021 else:
208 else:
1022 self.cleanup_ipc_files()
209 self.cleanup_ipc_files()
1023
210
1024 def restart_kernel(self, now=False, **kw):
211 def restart_kernel(self, now=False, **kw):
1025 """Restarts a kernel with the arguments that were used to launch it.
212 """Restarts a kernel with the arguments that were used to launch it.
1026
213
1027 If the old kernel was launched with random ports, the same ports will be
214 If the old kernel was launched with random ports, the same ports will be
1028 used for the new kernel. The same connection file is used again.
215 used for the new kernel. The same connection file is used again.
1029
216
1030 Parameters
217 Parameters
1031 ----------
218 ----------
1032 now : bool, optional
219 now : bool, optional
1033 If True, the kernel is forcefully restarted *immediately*, without
220 If True, the kernel is forcefully restarted *immediately*, without
1034 having a chance to do any cleanup action. Otherwise the kernel is
221 having a chance to do any cleanup action. Otherwise the kernel is
1035 given 1s to clean up before a forceful restart is issued.
222 given 1s to clean up before a forceful restart is issued.
1036
223
1037 In all cases the kernel is restarted, the only difference is whether
224 In all cases the kernel is restarted, the only difference is whether
1038 it is given a chance to perform a clean shutdown or not.
225 it is given a chance to perform a clean shutdown or not.
1039
226
1040 **kw : optional
227 **kw : optional
1041 Any options specified here will overwrite those used to launch the
228 Any options specified here will overwrite those used to launch the
1042 kernel.
229 kernel.
1043 """
230 """
1044 if self._launch_args is None:
231 if self._launch_args is None:
1045 raise RuntimeError("Cannot restart the kernel. "
232 raise RuntimeError("Cannot restart the kernel. "
1046 "No previous call to 'start_kernel'.")
233 "No previous call to 'start_kernel'.")
1047 else:
234 else:
1048 # Stop currently running kernel.
235 # Stop currently running kernel.
1049 self.shutdown_kernel(now=now, restart=True)
236 self.shutdown_kernel(now=now, restart=True)
1050
237
1051 # Start new kernel.
238 # Start new kernel.
1052 self._launch_args.update(kw)
239 self._launch_args.update(kw)
1053 self.start_kernel(**self._launch_args)
240 self.start_kernel(**self._launch_args)
1054
241
1055 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
242 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1056 # unless there is some delay here.
243 # unless there is some delay here.
1057 if sys.platform == 'win32':
244 if sys.platform == 'win32':
1058 time.sleep(0.2)
245 time.sleep(0.2)
1059
246
1060 @property
247 @property
1061 def has_kernel(self):
248 def has_kernel(self):
1062 """Has a kernel been started that we are managing."""
249 """Has a kernel been started that we are managing."""
1063 return self.kernel is not None
250 return self.kernel is not None
1064
251
1065 def _kill_kernel(self):
252 def _kill_kernel(self):
1066 """Kill the running kernel.
253 """Kill the running kernel.
1067
254
1068 This is a private method, callers should use shutdown_kernel(now=True).
255 This is a private method, callers should use shutdown_kernel(now=True).
1069 """
256 """
1070 if self.has_kernel:
257 if self.has_kernel:
1071
258
1072 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
259 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1073 # TerminateProcess() on Win32).
260 # TerminateProcess() on Win32).
1074 try:
261 try:
1075 self.kernel.kill()
262 self.kernel.kill()
1076 except OSError as e:
263 except OSError as e:
1077 # In Windows, we will get an Access Denied error if the process
264 # In Windows, we will get an Access Denied error if the process
1078 # has already terminated. Ignore it.
265 # has already terminated. Ignore it.
1079 if sys.platform == 'win32':
266 if sys.platform == 'win32':
1080 if e.winerror != 5:
267 if e.winerror != 5:
1081 raise
268 raise
1082 # On Unix, we may get an ESRCH error if the process has already
269 # On Unix, we may get an ESRCH error if the process has already
1083 # terminated. Ignore it.
270 # terminated. Ignore it.
1084 else:
271 else:
1085 from errno import ESRCH
272 from errno import ESRCH
1086 if e.errno != ESRCH:
273 if e.errno != ESRCH:
1087 raise
274 raise
1088
275
1089 # Block until the kernel terminates.
276 # Block until the kernel terminates.
1090 self.kernel.wait()
277 self.kernel.wait()
1091 self.kernel = None
278 self.kernel = None
1092 else:
279 else:
1093 raise RuntimeError("Cannot kill kernel. No kernel is running!")
280 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1094
281
1095 def interrupt_kernel(self):
282 def interrupt_kernel(self):
1096 """Interrupts the kernel by sending it a signal.
283 """Interrupts the kernel by sending it a signal.
1097
284
1098 Unlike ``signal_kernel``, this operation is well supported on all
285 Unlike ``signal_kernel``, this operation is well supported on all
1099 platforms.
286 platforms.
1100 """
287 """
1101 if self.has_kernel:
288 if self.has_kernel:
1102 if sys.platform == 'win32':
289 if sys.platform == 'win32':
1103 from .zmq.parentpoller import ParentPollerWindows as Poller
290 from .zmq.parentpoller import ParentPollerWindows as Poller
1104 Poller.send_interrupt(self.kernel.win32_interrupt_event)
291 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1105 else:
292 else:
1106 self.kernel.send_signal(signal.SIGINT)
293 self.kernel.send_signal(signal.SIGINT)
1107 else:
294 else:
1108 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
295 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1109
296
1110 def signal_kernel(self, signum):
297 def signal_kernel(self, signum):
1111 """Sends a signal to the kernel.
298 """Sends a signal to the kernel.
1112
299
1113 Note that since only SIGTERM is supported on Windows, this function is
300 Note that since only SIGTERM is supported on Windows, this function is
1114 only useful on Unix systems.
301 only useful on Unix systems.
1115 """
302 """
1116 if self.has_kernel:
303 if self.has_kernel:
1117 self.kernel.send_signal(signum)
304 self.kernel.send_signal(signum)
1118 else:
305 else:
1119 raise RuntimeError("Cannot signal kernel. No kernel is running!")
306 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1120
307
1121 def is_alive(self):
308 def is_alive(self):
1122 """Is the kernel process still running?"""
309 """Is the kernel process still running?"""
1123 if self.has_kernel:
310 if self.has_kernel:
1124 if self.kernel.poll() is None:
311 if self.kernel.poll() is None:
1125 return True
312 return True
1126 else:
313 else:
1127 return False
314 return False
1128 elif self._hb_channel is not None:
1129 # We didn't start the kernel with this KernelManager so we
1130 # use the heartbeat.
1131 return self._hb_channel.is_beating()
1132 else:
315 else:
1133 # no heartbeat and not local, we can't tell if it's running,
316 # we don't have a kernel
1134 # so naively return True
317 return False
1135 return True
1136
318
1137
319
1138 #-----------------------------------------------------------------------------
320 #-----------------------------------------------------------------------------
1139 # ABC Registration
321 # ABC Registration
1140 #-----------------------------------------------------------------------------
322 #-----------------------------------------------------------------------------
1141
323
1142 ShellChannelABC.register(ShellChannel)
1143 IOPubChannelABC.register(IOPubChannel)
1144 HBChannelABC.register(HBChannel)
1145 StdInChannelABC.register(StdInChannel)
1146 KernelManagerABC.register(KernelManager)
324 KernelManagerABC.register(KernelManager)
325
@@ -1,274 +1,271
1 """A kernel manager for multiple kernels
1 """A kernel manager for multiple kernels
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2013 The IPython Development Team
9 # Copyright (C) 2013 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import absolute_import
19 from __future__ import absolute_import
20
20
21 import os
21 import os
22 import uuid
22 import uuid
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26
26
27 from IPython.config.configurable import LoggingConfigurable
27 from IPython.config.configurable import LoggingConfigurable
28 from IPython.utils.importstring import import_item
28 from IPython.utils.importstring import import_item
29 from IPython.utils.traitlets import (
29 from IPython.utils.traitlets import (
30 Instance, Dict, Unicode, Any, DottedObjectName, Bool
30 Instance, Dict, Unicode, Any, DottedObjectName, Bool
31 )
31 )
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Classes
34 # Classes
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 class DuplicateKernelError(Exception):
37 class DuplicateKernelError(Exception):
38 pass
38 pass
39
39
40
40
41 class MultiKernelManager(LoggingConfigurable):
41 class MultiKernelManager(LoggingConfigurable):
42 """A class for managing multiple kernels."""
42 """A class for managing multiple kernels."""
43
43
44 kernel_manager_class = DottedObjectName(
44 kernel_manager_class = DottedObjectName(
45 "IPython.kernel.ioloop.IOLoopKernelManager", config=True,
45 "IPython.kernel.ioloop.IOLoopKernelManager", config=True,
46 help="""The kernel manager class. This is configurable to allow
46 help="""The kernel manager class. This is configurable to allow
47 subclassing of the KernelManager for customized behavior.
47 subclassing of the KernelManager for customized behavior.
48 """
48 """
49 )
49 )
50 def _kernel_manager_class_changed(self, name, old, new):
50 def _kernel_manager_class_changed(self, name, old, new):
51 self.kernel_manager_factory = import_item(new)
51 self.kernel_manager_factory = import_item(new)
52
52
53 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
54 def _kernel_manager_factory_default(self):
54 def _kernel_manager_factory_default(self):
55 return import_item(self.kernel_manager_class)
55 return import_item(self.kernel_manager_class)
56
56
57 context = Instance('zmq.Context')
57 context = Instance('zmq.Context')
58 def _context_default(self):
58 def _context_default(self):
59 return zmq.Context.instance()
59 return zmq.Context.instance()
60
60
61 connection_dir = Unicode('')
61 connection_dir = Unicode('')
62
62
63 _kernels = Dict()
63 _kernels = Dict()
64
64
65 def list_kernel_ids(self):
65 def list_kernel_ids(self):
66 """Return a list of the kernel ids of the active kernels."""
66 """Return a list of the kernel ids of the active kernels."""
67 # Create a copy so we can iterate over kernels in operations
67 # Create a copy so we can iterate over kernels in operations
68 # that delete keys.
68 # that delete keys.
69 return list(self._kernels.keys())
69 return list(self._kernels.keys())
70
70
71 def __len__(self):
71 def __len__(self):
72 """Return the number of running kernels."""
72 """Return the number of running kernels."""
73 return len(self.list_kernel_ids())
73 return len(self.list_kernel_ids())
74
74
75 def __contains__(self, kernel_id):
75 def __contains__(self, kernel_id):
76 return kernel_id in self._kernels
76 return kernel_id in self._kernels
77
77
78 def start_kernel(self, **kwargs):
78 def start_kernel(self, **kwargs):
79 """Start a new kernel.
79 """Start a new kernel.
80
80
81 The caller can pick a kernel_id by passing one in as a keyword arg,
81 The caller can pick a kernel_id by passing one in as a keyword arg,
82 otherwise one will be picked using a uuid.
82 otherwise one will be picked using a uuid.
83
83
84 To silence the kernel's stdout/stderr, call this using::
84 To silence the kernel's stdout/stderr, call this using::
85
85
86 km.start_kernel(stdout=PIPE, stderr=PIPE)
86 km.start_kernel(stdout=PIPE, stderr=PIPE)
87
87
88 """
88 """
89 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
89 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
90 if kernel_id in self:
90 if kernel_id in self:
91 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
91 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
92 # kernel_manager_factory is the constructor for the KernelManager
92 # kernel_manager_factory is the constructor for the KernelManager
93 # subclass we are using. It can be configured as any Configurable,
93 # subclass we are using. It can be configured as any Configurable,
94 # including things like its transport and ip.
94 # including things like its transport and ip.
95 km = self.kernel_manager_factory(connection_file=os.path.join(
95 km = self.kernel_manager_factory(connection_file=os.path.join(
96 self.connection_dir, "kernel-%s.json" % kernel_id),
96 self.connection_dir, "kernel-%s.json" % kernel_id),
97 config=self.config, autorestart=True, log=self.log
97 config=self.config, autorestart=True, log=self.log
98 )
98 )
99 km.start_kernel(**kwargs)
99 km.start_kernel(**kwargs)
100 # start just the shell channel, needed for graceful restart
101 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
102 self._kernels[kernel_id] = km
100 self._kernels[kernel_id] = km
103 return kernel_id
101 return kernel_id
104
102
105 def shutdown_kernel(self, kernel_id, now=False):
103 def shutdown_kernel(self, kernel_id, now=False):
106 """Shutdown a kernel by its kernel uuid.
104 """Shutdown a kernel by its kernel uuid.
107
105
108 Parameters
106 Parameters
109 ==========
107 ==========
110 kernel_id : uuid
108 kernel_id : uuid
111 The id of the kernel to shutdown.
109 The id of the kernel to shutdown.
112 now : bool
110 now : bool
113 Should the kernel be shutdown forcibly using a signal.
111 Should the kernel be shutdown forcibly using a signal.
114 """
112 """
115 k = self.get_kernel(kernel_id)
113 k = self.get_kernel(kernel_id)
116 k.shutdown_kernel(now=now)
114 k.shutdown_kernel(now=now)
117 k.shell_channel.stop()
118 del self._kernels[kernel_id]
115 del self._kernels[kernel_id]
119
116
120 def shutdown_all(self, now=False):
117 def shutdown_all(self, now=False):
121 """Shutdown all kernels."""
118 """Shutdown all kernels."""
122 for kid in self.list_kernel_ids():
119 for kid in self.list_kernel_ids():
123 self.shutdown_kernel(kid, now=now)
120 self.shutdown_kernel(kid, now=now)
124
121
125 def interrupt_kernel(self, kernel_id):
122 def interrupt_kernel(self, kernel_id):
126 """Interrupt (SIGINT) the kernel by its uuid.
123 """Interrupt (SIGINT) the kernel by its uuid.
127
124
128 Parameters
125 Parameters
129 ==========
126 ==========
130 kernel_id : uuid
127 kernel_id : uuid
131 The id of the kernel to interrupt.
128 The id of the kernel to interrupt.
132 """
129 """
133 return self.get_kernel(kernel_id).interrupt_kernel()
130 return self.get_kernel(kernel_id).interrupt_kernel()
134
131
135 def signal_kernel(self, kernel_id, signum):
132 def signal_kernel(self, kernel_id, signum):
136 """Sends a signal to the kernel by its uuid.
133 """Sends a signal to the kernel by its uuid.
137
134
138 Note that since only SIGTERM is supported on Windows, this function
135 Note that since only SIGTERM is supported on Windows, this function
139 is only useful on Unix systems.
136 is only useful on Unix systems.
140
137
141 Parameters
138 Parameters
142 ==========
139 ==========
143 kernel_id : uuid
140 kernel_id : uuid
144 The id of the kernel to signal.
141 The id of the kernel to signal.
145 """
142 """
146 return self.get_kernel(kernel_id).signal_kernel(signum)
143 return self.get_kernel(kernel_id).signal_kernel(signum)
147
144
148 def restart_kernel(self, kernel_id):
145 def restart_kernel(self, kernel_id):
149 """Restart a kernel by its uuid, keeping the same ports.
146 """Restart a kernel by its uuid, keeping the same ports.
150
147
151 Parameters
148 Parameters
152 ==========
149 ==========
153 kernel_id : uuid
150 kernel_id : uuid
154 The id of the kernel to interrupt.
151 The id of the kernel to interrupt.
155 """
152 """
156 km = self.get_kernel(kernel_id)
153 km = self.get_kernel(kernel_id)
157 km.restart_kernel()
154 km.restart_kernel()
158
155
159 def is_alive(self, kernel_id):
156 def is_alive(self, kernel_id):
160 """Is the kernel alive.
157 """Is the kernel alive.
161
158
162 This calls KernelManager.is_alive() which calls Popen.poll on the
159 This calls KernelManager.is_alive() which calls Popen.poll on the
163 actual kernel subprocess.
160 actual kernel subprocess.
164
161
165 Parameters
162 Parameters
166 ==========
163 ==========
167 kernel_id : uuid
164 kernel_id : uuid
168 The id of the kernel.
165 The id of the kernel.
169 """
166 """
170 return self.get_kernel(kernel_id).is_alive()
167 return self.get_kernel(kernel_id).is_alive()
171
168
172 def get_kernel(self, kernel_id):
169 def get_kernel(self, kernel_id):
173 """Get the single KernelManager object for a kernel by its uuid.
170 """Get the single KernelManager object for a kernel by its uuid.
174
171
175 Parameters
172 Parameters
176 ==========
173 ==========
177 kernel_id : uuid
174 kernel_id : uuid
178 The id of the kernel.
175 The id of the kernel.
179 """
176 """
180 km = self._kernels.get(kernel_id)
177 km = self._kernels.get(kernel_id)
181 if km is not None:
178 if km is not None:
182 return km
179 return km
183 else:
180 else:
184 raise KeyError("Kernel with id not found: %s" % kernel_id)
181 raise KeyError("Kernel with id not found: %s" % kernel_id)
185
182
186 def get_connection_info(self, kernel_id):
183 def get_connection_info(self, kernel_id):
187 """Return a dictionary of connection data for a kernel.
184 """Return a dictionary of connection data for a kernel.
188
185
189 Parameters
186 Parameters
190 ==========
187 ==========
191 kernel_id : uuid
188 kernel_id : uuid
192 The id of the kernel.
189 The id of the kernel.
193
190
194 Returns
191 Returns
195 =======
192 =======
196 connection_dict : dict
193 connection_dict : dict
197 A dict of the information needed to connect to a kernel.
194 A dict of the information needed to connect to a kernel.
198 This includes the ip address and the integer port
195 This includes the ip address and the integer port
199 numbers of the different channels (stdin_port, iopub_port,
196 numbers of the different channels (stdin_port, iopub_port,
200 shell_port, hb_port).
197 shell_port, hb_port).
201 """
198 """
202 km = self.get_kernel(kernel_id)
199 km = self.get_kernel(kernel_id)
203 return dict(transport=km.transport,
200 return dict(transport=km.transport,
204 ip=km.ip,
201 ip=km.ip,
205 shell_port=km.shell_port,
202 shell_port=km.shell_port,
206 iopub_port=km.iopub_port,
203 iopub_port=km.iopub_port,
207 stdin_port=km.stdin_port,
204 stdin_port=km.stdin_port,
208 hb_port=km.hb_port,
205 hb_port=km.hb_port,
209 )
206 )
210
207
211 def _make_url(self, transport, ip, port):
208 def _make_url(self, transport, ip, port):
212 """Make a ZeroMQ URL for a given transport, ip and port."""
209 """Make a ZeroMQ URL for a given transport, ip and port."""
213 if transport == 'tcp':
210 if transport == 'tcp':
214 return "tcp://%s:%i" % (ip, port)
211 return "tcp://%s:%i" % (ip, port)
215 else:
212 else:
216 return "%s://%s-%s" % (transport, ip, port)
213 return "%s://%s-%s" % (transport, ip, port)
217
214
218 def _create_connected_stream(self, kernel_id, socket_type, channel):
215 def _create_connected_stream(self, kernel_id, socket_type, channel):
219 """Create a connected ZMQStream for a kernel."""
216 """Create a connected ZMQStream for a kernel."""
220 cinfo = self.get_connection_info(kernel_id)
217 cinfo = self.get_connection_info(kernel_id)
221 url = self._make_url(cinfo['transport'], cinfo['ip'],
218 url = self._make_url(cinfo['transport'], cinfo['ip'],
222 cinfo['%s_port' % channel]
219 cinfo['%s_port' % channel]
223 )
220 )
224 sock = self.context.socket(socket_type)
221 sock = self.context.socket(socket_type)
225 self.log.info("Connecting to: %s" % url)
222 self.log.info("Connecting to: %s" % url)
226 sock.connect(url)
223 sock.connect(url)
227 return ZMQStream(sock)
224 return ZMQStream(sock)
228
225
229 def create_iopub_stream(self, kernel_id):
226 def create_iopub_stream(self, kernel_id):
230 """Return a ZMQStream object connected to the iopub channel.
227 """Return a ZMQStream object connected to the iopub channel.
231
228
232 Parameters
229 Parameters
233 ==========
230 ==========
234 kernel_id : uuid
231 kernel_id : uuid
235 The id of the kernel.
232 The id of the kernel.
236
233
237 Returns
234 Returns
238 =======
235 =======
239 stream : ZMQStream
236 stream : ZMQStream
240 """
237 """
241 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
238 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
242 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
239 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
243 return iopub_stream
240 return iopub_stream
244
241
245 def create_shell_stream(self, kernel_id):
242 def create_shell_stream(self, kernel_id):
246 """Return a ZMQStream object connected to the shell channel.
243 """Return a ZMQStream object connected to the shell channel.
247
244
248 Parameters
245 Parameters
249 ==========
246 ==========
250 kernel_id : uuid
247 kernel_id : uuid
251 The id of the kernel.
248 The id of the kernel.
252
249
253 Returns
250 Returns
254 =======
251 =======
255 stream : ZMQStream
252 stream : ZMQStream
256 """
253 """
257 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
254 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
258 return shell_stream
255 return shell_stream
259
256
260 def create_hb_stream(self, kernel_id):
257 def create_hb_stream(self, kernel_id):
261 """Return a ZMQStream object connected to the hb channel.
258 """Return a ZMQStream object connected to the hb channel.
262
259
263 Parameters
260 Parameters
264 ==========
261 ==========
265 kernel_id : uuid
262 kernel_id : uuid
266 The id of the kernel.
263 The id of the kernel.
267
264
268 Returns
265 Returns
269 =======
266 =======
270 stream : ZMQStream
267 stream : ZMQStream
271 """
268 """
272 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
269 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
273 return hb_stream
270 return hb_stream
274
271
General Comments 0
You need to be logged in to leave comments. Login now