##// END OF EJS Templates
split KernelManager into KernelManager + KernelClient
MinRK -
Show More
@@ -0,0 +1,33 b''
1 """Implements a fully blocking kernel client.
2
3 Useful for test suites and blocking terminal interfaces.
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
15
16 from IPython.utils.traitlets import Type
17 from IPython.kernel.client import KernelClient
18 from .channels import (
19 BlockingIOPubChannel, BlockingHBChannel,
20 BlockingShellChannel, BlockingStdInChannel
21 )
22
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
26
27 class BlockingKernelClient(KernelClient):
28
29 # The classes to use for the various channels
30 shell_channel_class = Type(BlockingShellChannel)
31 iopub_channel_class = Type(BlockingIOPubChannel)
32 stdin_channel_class = Type(BlockingStdInChannel)
33 hb_channel_class = Type(BlockingHBChannel)
@@ -0,0 +1,193 b''
1 """Abstract base classes for kernel client channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.channels.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.channels.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.channels.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.channels.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.channels.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
This diff has been collapsed as it changes many lines, (638 lines changed) Show them Hide them
@@ -0,0 +1,638 b''
1 """Base classes to manage a Client's interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 # Standard library imports
18 import atexit
19 import errno
20 from threading import Thread
21 import time
22
23 import zmq
24 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
25 # during garbage collection of threads at exit:
26 from zmq import ZMQError
27 from zmq.eventloop import ioloop, zmqstream
28
29 # Local imports
30 from .channelabc import (
31 ShellChannelABC, IOPubChannelABC,
32 HBChannelABC, StdInChannelABC,
33 )
34
35 #-----------------------------------------------------------------------------
36 # Constants and exceptions
37 #-----------------------------------------------------------------------------
38
39 class InvalidPortNumber(Exception):
40 pass
41
42 #-----------------------------------------------------------------------------
43 # Utility functions
44 #-----------------------------------------------------------------------------
45
46 # some utilities to validate message structure, these might get moved elsewhere
47 # if they prove to have more generic utility
48
49 def validate_string_list(lst):
50 """Validate that the input is a list of strings.
51
52 Raises ValueError if not."""
53 if not isinstance(lst, list):
54 raise ValueError('input %r must be a list' % lst)
55 for x in lst:
56 if not isinstance(x, basestring):
57 raise ValueError('element %r in list must be a string' % x)
58
59
60 def validate_string_dict(dct):
61 """Validate that the input is a dict with string keys and values.
62
63 Raises ValueError if not."""
64 for k,v in dct.iteritems():
65 if not isinstance(k, basestring):
66 raise ValueError('key %r in dict must be a string' % k)
67 if not isinstance(v, basestring):
68 raise ValueError('value %r in dict must be a string' % v)
69
70
71 #-----------------------------------------------------------------------------
72 # ZMQ Socket Channel classes
73 #-----------------------------------------------------------------------------
74
75 class ZMQSocketChannel(Thread):
76 """The base class for the channels that use ZMQ sockets."""
77 context = None
78 session = None
79 socket = None
80 ioloop = None
81 stream = None
82 _address = None
83 _exiting = False
84
85 def __init__(self, context, session, address):
86 """Create a channel.
87
88 Parameters
89 ----------
90 context : :class:`zmq.Context`
91 The ZMQ context to use.
92 session : :class:`session.Session`
93 The session to use.
94 address : zmq url
95 Standard (ip, port) tuple that the kernel is listening on.
96 """
97 super(ZMQSocketChannel, self).__init__()
98 self.daemon = True
99
100 self.context = context
101 self.session = session
102 if isinstance(address, tuple):
103 if address[1] == 0:
104 message = 'The port number for a channel cannot be 0.'
105 raise InvalidPortNumber(message)
106 address = "tcp://%s:%i" % address
107 self._address = address
108 atexit.register(self._notice_exit)
109
110 def _notice_exit(self):
111 self._exiting = True
112
113 def _run_loop(self):
114 """Run my loop, ignoring EINTR events in the poller"""
115 while True:
116 try:
117 self.ioloop.start()
118 except ZMQError as e:
119 if e.errno == errno.EINTR:
120 continue
121 else:
122 raise
123 except Exception:
124 if self._exiting:
125 break
126 else:
127 raise
128 else:
129 break
130
131 def stop(self):
132 """Stop the channel's event loop and join its thread.
133
134 This calls :method:`Thread.join` and returns when the thread
135 terminates. :class:`RuntimeError` will be raised if
136 :method:`self.start` is called again.
137 """
138 self.join()
139
140 @property
141 def address(self):
142 """Get the channel's address as a zmq url string.
143
144 These URLS have the form: 'tcp://127.0.0.1:5555'.
145 """
146 return self._address
147
148 def _queue_send(self, msg):
149 """Queue a message to be sent from the IOLoop's thread.
150
151 Parameters
152 ----------
153 msg : message to send
154
155 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
156 thread control of the action.
157 """
158 def thread_send():
159 self.session.send(self.stream, msg)
160 self.ioloop.add_callback(thread_send)
161
162 def _handle_recv(self, msg):
163 """Callback for stream.on_recv.
164
165 Unpacks message, and calls handlers with it.
166 """
167 ident,smsg = self.session.feed_identities(msg)
168 self.call_handlers(self.session.unserialize(smsg))
169
170
171
172 class ShellChannel(ZMQSocketChannel):
173 """The shell channel for issuing request/replies to the kernel."""
174
175 command_queue = None
176 # flag for whether execute requests should be allowed to call raw_input:
177 allow_stdin = True
178
179 def __init__(self, context, session, address):
180 super(ShellChannel, self).__init__(context, session, address)
181 self.ioloop = ioloop.IOLoop()
182
183 def run(self):
184 """The thread's main activity. Call start() instead."""
185 self.socket = self.context.socket(zmq.DEALER)
186 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
187 self.socket.connect(self.address)
188 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
189 self.stream.on_recv(self._handle_recv)
190 self._run_loop()
191 try:
192 self.socket.close()
193 except:
194 pass
195
196 def stop(self):
197 """Stop the channel's event loop and join its thread."""
198 self.ioloop.stop()
199 super(ShellChannel, self).stop()
200
201 def call_handlers(self, msg):
202 """This method is called in the ioloop thread when a message arrives.
203
204 Subclasses should override this method to handle incoming messages.
205 It is important to remember that this method is called in the thread
206 so that some logic must be done to ensure that the application leve
207 handlers are called in the application thread.
208 """
209 raise NotImplementedError('call_handlers must be defined in a subclass.')
210
211 def execute(self, code, silent=False, store_history=True,
212 user_variables=None, user_expressions=None, allow_stdin=None):
213 """Execute code in the kernel.
214
215 Parameters
216 ----------
217 code : str
218 A string of Python code.
219
220 silent : bool, optional (default False)
221 If set, the kernel will execute the code as quietly possible, and
222 will force store_history to be False.
223
224 store_history : bool, optional (default True)
225 If set, the kernel will store command history. This is forced
226 to be False if silent is True.
227
228 user_variables : list, optional
229 A list of variable names to pull from the user's namespace. They
230 will come back as a dict with these names as keys and their
231 :func:`repr` as values.
232
233 user_expressions : dict, optional
234 A dict mapping names to expressions to be evaluated in the user's
235 dict. The expression values are returned as strings formatted using
236 :func:`repr`.
237
238 allow_stdin : bool, optional (default self.allow_stdin)
239 Flag for whether the kernel can send stdin requests to frontends.
240
241 Some frontends (e.g. the Notebook) do not support stdin requests.
242 If raw_input is called from code executed from such a frontend, a
243 StdinNotImplementedError will be raised.
244
245 Returns
246 -------
247 The msg_id of the message sent.
248 """
249 if user_variables is None:
250 user_variables = []
251 if user_expressions is None:
252 user_expressions = {}
253 if allow_stdin is None:
254 allow_stdin = self.allow_stdin
255
256
257 # Don't waste network traffic if inputs are invalid
258 if not isinstance(code, basestring):
259 raise ValueError('code %r must be a string' % code)
260 validate_string_list(user_variables)
261 validate_string_dict(user_expressions)
262
263 # Create class for content/msg creation. Related to, but possibly
264 # not in Session.
265 content = dict(code=code, silent=silent, store_history=store_history,
266 user_variables=user_variables,
267 user_expressions=user_expressions,
268 allow_stdin=allow_stdin,
269 )
270 msg = self.session.msg('execute_request', content)
271 self._queue_send(msg)
272 return msg['header']['msg_id']
273
274 def complete(self, text, line, cursor_pos, block=None):
275 """Tab complete text in the kernel's namespace.
276
277 Parameters
278 ----------
279 text : str
280 The text to complete.
281 line : str
282 The full line of text that is the surrounding context for the
283 text to complete.
284 cursor_pos : int
285 The position of the cursor in the line where the completion was
286 requested.
287 block : str, optional
288 The full block of code in which the completion is being requested.
289
290 Returns
291 -------
292 The msg_id of the message sent.
293 """
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 msg = self.session.msg('complete_request', content)
296 self._queue_send(msg)
297 return msg['header']['msg_id']
298
299 def object_info(self, oname, detail_level=0):
300 """Get metadata information about an object in the kernel's namespace.
301
302 Parameters
303 ----------
304 oname : str
305 A string specifying the object name.
306 detail_level : int, optional
307 The level of detail for the introspection (0-2)
308
309 Returns
310 -------
311 The msg_id of the message sent.
312 """
313 content = dict(oname=oname, detail_level=detail_level)
314 msg = self.session.msg('object_info_request', content)
315 self._queue_send(msg)
316 return msg['header']['msg_id']
317
318 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
319 """Get entries from the kernel's history list.
320
321 Parameters
322 ----------
323 raw : bool
324 If True, return the raw input.
325 output : bool
326 If True, then return the output as well.
327 hist_access_type : str
328 'range' (fill in session, start and stop params), 'tail' (fill in n)
329 or 'search' (fill in pattern param).
330
331 session : int
332 For a range request, the session from which to get lines. Session
333 numbers are positive integers; negative ones count back from the
334 current session.
335 start : int
336 The first line number of a history range.
337 stop : int
338 The final (excluded) line number of a history range.
339
340 n : int
341 The number of lines of history to get for a tail request.
342
343 pattern : str
344 The glob-syntax pattern for a search request.
345
346 Returns
347 -------
348 The msg_id of the message sent.
349 """
350 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
351 **kwargs)
352 msg = self.session.msg('history_request', content)
353 self._queue_send(msg)
354 return msg['header']['msg_id']
355
356 def kernel_info(self):
357 """Request kernel info."""
358 msg = self.session.msg('kernel_info_request')
359 self._queue_send(msg)
360 return msg['header']['msg_id']
361
362 def shutdown(self, restart=False):
363 """Request an immediate kernel shutdown.
364
365 Upon receipt of the (empty) reply, client code can safely assume that
366 the kernel has shut down and it's safe to forcefully terminate it if
367 it's still alive.
368
369 The kernel will send the reply via a function registered with Python's
370 atexit module, ensuring it's truly done as the kernel is done with all
371 normal operation.
372 """
373 # Send quit message to kernel. Once we implement kernel-side setattr,
374 # this should probably be done that way, but for now this will do.
375 msg = self.session.msg('shutdown_request', {'restart':restart})
376 self._queue_send(msg)
377 return msg['header']['msg_id']
378
379
380
381 class IOPubChannel(ZMQSocketChannel):
382 """The iopub channel which listens for messages that the kernel publishes.
383
384 This channel is where all output is published to frontends.
385 """
386
387 def __init__(self, context, session, address):
388 super(IOPubChannel, self).__init__(context, session, address)
389 self.ioloop = ioloop.IOLoop()
390
391 def run(self):
392 """The thread's main activity. Call start() instead."""
393 self.socket = self.context.socket(zmq.SUB)
394 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
395 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
396 self.socket.connect(self.address)
397 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
398 self.stream.on_recv(self._handle_recv)
399 self._run_loop()
400 try:
401 self.socket.close()
402 except:
403 pass
404
405 def stop(self):
406 """Stop the channel's event loop and join its thread."""
407 self.ioloop.stop()
408 super(IOPubChannel, self).stop()
409
410 def call_handlers(self, msg):
411 """This method is called in the ioloop thread when a message arrives.
412
413 Subclasses should override this method to handle incoming messages.
414 It is important to remember that this method is called in the thread
415 so that some logic must be done to ensure that the application leve
416 handlers are called in the application thread.
417 """
418 raise NotImplementedError('call_handlers must be defined in a subclass.')
419
420 def flush(self, timeout=1.0):
421 """Immediately processes all pending messages on the iopub channel.
422
423 Callers should use this method to ensure that :method:`call_handlers`
424 has been called for all messages that have been received on the
425 0MQ SUB socket of this channel.
426
427 This method is thread safe.
428
429 Parameters
430 ----------
431 timeout : float, optional
432 The maximum amount of time to spend flushing, in seconds. The
433 default is one second.
434 """
435 # We do the IOLoop callback process twice to ensure that the IOLoop
436 # gets to perform at least one full poll.
437 stop_time = time.time() + timeout
438 for i in xrange(2):
439 self._flushed = False
440 self.ioloop.add_callback(self._flush)
441 while not self._flushed and time.time() < stop_time:
442 time.sleep(0.01)
443
444 def _flush(self):
445 """Callback for :method:`self.flush`."""
446 self.stream.flush()
447 self._flushed = True
448
449
450 class StdInChannel(ZMQSocketChannel):
451 """The stdin channel to handle raw_input requests that the kernel makes."""
452
453 msg_queue = None
454
455 def __init__(self, context, session, address):
456 super(StdInChannel, self).__init__(context, session, address)
457 self.ioloop = ioloop.IOLoop()
458
459 def run(self):
460 """The thread's main activity. Call start() instead."""
461 self.socket = self.context.socket(zmq.DEALER)
462 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
463 self.socket.connect(self.address)
464 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
465 self.stream.on_recv(self._handle_recv)
466 self._run_loop()
467 try:
468 self.socket.close()
469 except:
470 pass
471
472 def stop(self):
473 """Stop the channel's event loop and join its thread."""
474 self.ioloop.stop()
475 super(StdInChannel, self).stop()
476
477 def call_handlers(self, msg):
478 """This method is called in the ioloop thread when a message arrives.
479
480 Subclasses should override this method to handle incoming messages.
481 It is important to remember that this method is called in the thread
482 so that some logic must be done to ensure that the application leve
483 handlers are called in the application thread.
484 """
485 raise NotImplementedError('call_handlers must be defined in a subclass.')
486
487 def input(self, string):
488 """Send a string of raw input to the kernel."""
489 content = dict(value=string)
490 msg = self.session.msg('input_reply', content)
491 self._queue_send(msg)
492
493
494 class HBChannel(ZMQSocketChannel):
495 """The heartbeat channel which monitors the kernel heartbeat.
496
497 Note that the heartbeat channel is paused by default. As long as you start
498 this channel, the kernel manager will ensure that it is paused and un-paused
499 as appropriate.
500 """
501
502 time_to_dead = 3.0
503 socket = None
504 poller = None
505 _running = None
506 _pause = None
507 _beating = None
508
509 def __init__(self, context, session, address):
510 super(HBChannel, self).__init__(context, session, address)
511 self._running = False
512 self._pause =True
513 self.poller = zmq.Poller()
514
515 def _create_socket(self):
516 if self.socket is not None:
517 # close previous socket, before opening a new one
518 self.poller.unregister(self.socket)
519 self.socket.close()
520 self.socket = self.context.socket(zmq.REQ)
521 self.socket.setsockopt(zmq.LINGER, 0)
522 self.socket.connect(self.address)
523
524 self.poller.register(self.socket, zmq.POLLIN)
525
526 def _poll(self, start_time):
527 """poll for heartbeat replies until we reach self.time_to_dead.
528
529 Ignores interrupts, and returns the result of poll(), which
530 will be an empty list if no messages arrived before the timeout,
531 or the event tuple if there is a message to receive.
532 """
533
534 until_dead = self.time_to_dead - (time.time() - start_time)
535 # ensure poll at least once
536 until_dead = max(until_dead, 1e-3)
537 events = []
538 while True:
539 try:
540 events = self.poller.poll(1000 * until_dead)
541 except ZMQError as e:
542 if e.errno == errno.EINTR:
543 # ignore interrupts during heartbeat
544 # this may never actually happen
545 until_dead = self.time_to_dead - (time.time() - start_time)
546 until_dead = max(until_dead, 1e-3)
547 pass
548 else:
549 raise
550 except Exception:
551 if self._exiting:
552 break
553 else:
554 raise
555 else:
556 break
557 return events
558
559 def run(self):
560 """The thread's main activity. Call start() instead."""
561 self._create_socket()
562 self._running = True
563 self._beating = True
564
565 while self._running:
566 if self._pause:
567 # just sleep, and skip the rest of the loop
568 time.sleep(self.time_to_dead)
569 continue
570
571 since_last_heartbeat = 0.0
572 # io.rprint('Ping from HB channel') # dbg
573 # no need to catch EFSM here, because the previous event was
574 # either a recv or connect, which cannot be followed by EFSM
575 self.socket.send(b'ping')
576 request_time = time.time()
577 ready = self._poll(request_time)
578 if ready:
579 self._beating = True
580 # the poll above guarantees we have something to recv
581 self.socket.recv()
582 # sleep the remainder of the cycle
583 remainder = self.time_to_dead - (time.time() - request_time)
584 if remainder > 0:
585 time.sleep(remainder)
586 continue
587 else:
588 # nothing was received within the time limit, signal heart failure
589 self._beating = False
590 since_last_heartbeat = time.time() - request_time
591 self.call_handlers(since_last_heartbeat)
592 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 self._create_socket()
594 continue
595 try:
596 self.socket.close()
597 except:
598 pass
599
600 def pause(self):
601 """Pause the heartbeat."""
602 self._pause = True
603
604 def unpause(self):
605 """Unpause the heartbeat."""
606 self._pause = False
607
608 def is_beating(self):
609 """Is the heartbeat running and responsive (and not paused)."""
610 if self.is_alive() and not self._pause and self._beating:
611 return True
612 else:
613 return False
614
615 def stop(self):
616 """Stop the channel's event loop and join its thread."""
617 self._running = False
618 super(HBChannel, self).stop()
619
620 def call_handlers(self, since_last_heartbeat):
621 """This method is called in the ioloop thread when a message arrives.
622
623 Subclasses should override this method to handle incoming messages.
624 It is important to remember that this method is called in the thread
625 so that some logic must be done to ensure that the application level
626 handlers are called in the application thread.
627 """
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
629
630
631 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
632 # ABC Registration
633 #-----------------------------------------------------------------------------
634
635 ShellChannelABC.register(ShellChannel)
636 IOPubChannelABC.register(IOPubChannel)
637 HBChannelABC.register(HBChannel)
638 StdInChannelABC.register(StdInChannel)
@@ -0,0 +1,182 b''
1 """Base class to manage the interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 import zmq
18
19 # Local imports
20 from IPython.config.configurable import LoggingConfigurable
21 from IPython.utils.traitlets import (
22 Any, Instance, Type,
23 )
24
25 from .zmq.session import Session
26 from .channels import (
27 ShellChannel, IOPubChannel,
28 HBChannel, StdInChannel,
29 )
30 from .clientabc import KernelClientABC
31 from .connect import ConnectionFileMixin
32
33
34 #-----------------------------------------------------------------------------
35 # Main kernel client class
36 #-----------------------------------------------------------------------------
37
38 class KernelClient(LoggingConfigurable, ConnectionFileMixin):
39 """Communicates with a single kernel on any host via zmq channels.
40
41 There are four channels associated with each kernel:
42
43 * shell: for request/reply calls to the kernel.
44 * iopub: for the kernel to publish results to frontends.
45 * hb: for monitoring the kernel's heartbeat.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
47
48 """
49
50 # The PyZMQ Context to use for communication with the kernel.
51 context = Instance(zmq.Context)
52 def _context_default(self):
53 return zmq.Context.instance()
54
55 # The Session to use for communication with the kernel.
56 session = Instance(Session)
57 def _session_default(self):
58 return Session(config=self.config)
59
60 # The classes to use for the various channels
61 shell_channel_class = Type(ShellChannel)
62 iopub_channel_class = Type(IOPubChannel)
63 stdin_channel_class = Type(StdInChannel)
64 hb_channel_class = Type(HBChannel)
65
66 # Protected traits
67 _shell_channel = Any
68 _iopub_channel = Any
69 _stdin_channel = Any
70 _hb_channel = Any
71
72 #--------------------------------------------------------------------------
73 # Channel management methods
74 #--------------------------------------------------------------------------
75
76 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
77 """Starts the channels for this kernel.
78
79 This will create the channels if they do not exist and then start
80 them (their activity runs in a thread). If port numbers of 0 are
81 being used (random ports) then you must first call
82 :method:`start_kernel`. If the channels have been stopped and you
83 call this, :class:`RuntimeError` will be raised.
84 """
85 if shell:
86 self.shell_channel.start()
87 if iopub:
88 self.iopub_channel.start()
89 if stdin:
90 self.stdin_channel.start()
91 self.shell_channel.allow_stdin = True
92 else:
93 self.shell_channel.allow_stdin = False
94 if hb:
95 self.hb_channel.start()
96
97 def stop_channels(self):
98 """Stops all the running channels for this kernel.
99
100 This stops their event loops and joins their threads.
101 """
102 if self.shell_channel.is_alive():
103 self.shell_channel.stop()
104 if self.iopub_channel.is_alive():
105 self.iopub_channel.stop()
106 if self.stdin_channel.is_alive():
107 self.stdin_channel.stop()
108 if self.hb_channel.is_alive():
109 self.hb_channel.stop()
110
111 @property
112 def channels_running(self):
113 """Are any of the channels created and running?"""
114 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
115 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
116
117 def _make_url(self, port):
118 """Make a zmq url with a port.
119
120 There are two cases that this handles:
121
122 * tcp: tcp://ip:port
123 * ipc: ipc://ip-port
124 """
125 if self.transport == 'tcp':
126 return "tcp://%s:%i" % (self.ip, port)
127 else:
128 return "%s://%s-%s" % (self.transport, self.ip, port)
129
130 @property
131 def shell_channel(self):
132 """Get the shell channel object for this kernel."""
133 if self._shell_channel is None:
134 self._shell_channel = self.shell_channel_class(
135 self.context, self.session, self._make_url(self.shell_port)
136 )
137 return self._shell_channel
138
139 @property
140 def iopub_channel(self):
141 """Get the iopub channel object for this kernel."""
142 if self._iopub_channel is None:
143 self._iopub_channel = self.iopub_channel_class(
144 self.context, self.session, self._make_url(self.iopub_port)
145 )
146 return self._iopub_channel
147
148 @property
149 def stdin_channel(self):
150 """Get the stdin channel object for this kernel."""
151 if self._stdin_channel is None:
152 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, self._make_url(self.stdin_port)
154 )
155 return self._stdin_channel
156
157 @property
158 def hb_channel(self):
159 """Get the hb channel object for this kernel."""
160 if self._hb_channel is None:
161 self._hb_channel = self.hb_channel_class(
162 self.context, self.session, self._make_url(self.hb_port)
163 )
164 return self._hb_channel
165
166 def is_alive(self):
167 """Is the kernel process still running?"""
168 if self._hb_channel is not None:
169 # We didn't start the kernel with this KernelManager so we
170 # use the heartbeat.
171 return self._hb_channel.is_beating()
172 else:
173 # no heartbeat and not local, we can't tell if it's running,
174 # so naively return True
175 return True
176
177
178 #-----------------------------------------------------------------------------
179 # ABC Registration
180 #-----------------------------------------------------------------------------
181
182 KernelClientABC.register(KernelClient)
@@ -0,0 +1,193 b''
1 """Abstract base classes for kernel clients and channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.kernelmanager.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.kernelmanager.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.kernelmanager.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.kernelmanager.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.kernelmanager.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
@@ -1,10 +1,10 b''
1 1 """IPython kernels and associated utilities"""
2 2
3 3 # just for friendlier zmq version check
4 4 from . import zmq
5 5
6 6 from .connect import *
7 7 from .launcher import *
8 8 from .manager import KernelManager
9 from .blocking import BlockingKernelManager
9 from .blocking import BlockingKernelClient
10 10 from .multikernelmanager import MultiKernelManager
@@ -1,1 +1,1 b''
1 from .manager import BlockingKernelManager No newline at end of file
1 from .client import BlockingKernelClient No newline at end of file
@@ -1,89 +1,79 b''
1 """ Implements a fully blocking kernel manager.
1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2012 The IPython Development Team
6 # Copyright (C) 2013 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 16 import Queue
17 17
18 from IPython.utils.traitlets import Type
19 from IPython.kernel.manager import KernelManager, IOPubChannel, HBChannel, \
18 from IPython.kernel.channels import IOPubChannel, HBChannel, \
20 19 ShellChannel, StdInChannel
21 20
22 21 #-----------------------------------------------------------------------------
23 22 # Blocking kernel manager
24 23 #-----------------------------------------------------------------------------
25 24
26 25
27 26 class BlockingChannelMixin(object):
28 27
29 28 def __init__(self, *args, **kwds):
30 29 super(BlockingChannelMixin, self).__init__(*args, **kwds)
31 30 self._in_queue = Queue.Queue()
32 31
33 32 def call_handlers(self, msg):
34 33 self._in_queue.put(msg)
35 34
36 35 def get_msg(self, block=True, timeout=None):
37 36 """ Gets a message if there is one that is ready. """
38 37 if timeout is None:
39 38 # Queue.get(timeout=None) has stupid uninteruptible
40 39 # behavior, so wait for a week instead
41 40 timeout = 604800
42 41 return self._in_queue.get(block, timeout)
43 42
44 43 def get_msgs(self):
45 44 """ Get all messages that are currently ready. """
46 45 msgs = []
47 46 while True:
48 47 try:
49 48 msgs.append(self.get_msg(block=False))
50 49 except Queue.Empty:
51 50 break
52 51 return msgs
53 52
54 53 def msg_ready(self):
55 54 """ Is there a message that has been received? """
56 55 return not self._in_queue.empty()
57 56
58 57
59 58 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
60 59 pass
61 60
62 61
63 62 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
64 63 pass
65 64
66 65
67 66 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
68 67 pass
69 68
70 69
71 70 class BlockingHBChannel(HBChannel):
72 71
73 72 # This kernel needs quicker monitoring, shorten to 1 sec.
74 73 # less than 0.5s is unreliable, and will get occasional
75 74 # false reports of missed beats.
76 75 time_to_dead = 1.
77 76
78 77 def call_handlers(self, since_last_heartbeat):
79 78 """ Pause beating on missed heartbeat. """
80 79 pass
81
82
83 class BlockingKernelManager(KernelManager):
84
85 # The classes to use for the various channels.
86 shell_channel_class = Type(BlockingShellChannel)
87 iopub_channel_class = Type(BlockingIOPubChannel)
88 stdin_channel_class = Type(BlockingStdInChannel)
89 hb_channel_class = Type(BlockingHBChannel)
@@ -1,347 +1,452 b''
1 1 """Utilities for connecting to kernels
2 2
3 3 Authors:
4 4
5 5 * Min Ragan-Kelley
6 6
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2013 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 import glob
21 21 import json
22 22 import os
23 23 import socket
24 24 import sys
25 25 from getpass import getpass
26 26 from subprocess import Popen, PIPE
27 27 import tempfile
28 28
29 29 # external imports
30 30 from IPython.external.ssh import tunnel
31 31
32 32 # IPython imports
33 # from IPython.config import Configurable
33 34 from IPython.core.profiledir import ProfileDir
34 35 from IPython.utils.localinterfaces import LOCALHOST
35 36 from IPython.utils.path import filefind, get_ipython_dir
36 37 from IPython.utils.py3compat import str_to_bytes, bytes_to_str
38 from IPython.utils.traitlets import (
39 Bool, Integer, Unicode, CaselessStrEnum,
40 HasTraits,
41 )
37 42
38 43
39 44 #-----------------------------------------------------------------------------
40 45 # Working with Connection Files
41 46 #-----------------------------------------------------------------------------
42 47
43 48 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
44 49 ip=LOCALHOST, key=b'', transport='tcp'):
45 50 """Generates a JSON config file, including the selection of random ports.
46 51
47 52 Parameters
48 53 ----------
49 54
50 55 fname : unicode
51 56 The path to the file to write
52 57
53 58 shell_port : int, optional
54 59 The port to use for ROUTER channel.
55 60
56 61 iopub_port : int, optional
57 62 The port to use for the SUB channel.
58 63
59 64 stdin_port : int, optional
60 65 The port to use for the REQ (raw input) channel.
61 66
62 67 hb_port : int, optional
63 68 The port to use for the hearbeat REP channel.
64 69
65 70 ip : str, optional
66 71 The ip address the kernel will bind to.
67 72
68 73 key : str, optional
69 74 The Session key used for HMAC authentication.
70 75
71 76 """
72 77 # default to temporary connector file
73 78 if not fname:
74 79 fname = tempfile.mktemp('.json')
75 80
76 81 # Find open ports as necessary.
77 82
78 83 ports = []
79 84 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
80 85 int(stdin_port <= 0) + int(hb_port <= 0)
81 86 if transport == 'tcp':
82 87 for i in range(ports_needed):
83 88 sock = socket.socket()
84 89 sock.bind(('', 0))
85 90 ports.append(sock)
86 91 for i, sock in enumerate(ports):
87 92 port = sock.getsockname()[1]
88 93 sock.close()
89 94 ports[i] = port
90 95 else:
91 96 N = 1
92 97 for i in range(ports_needed):
93 98 while os.path.exists("%s-%s" % (ip, str(N))):
94 99 N += 1
95 100 ports.append(N)
96 101 N += 1
97 102 if shell_port <= 0:
98 103 shell_port = ports.pop(0)
99 104 if iopub_port <= 0:
100 105 iopub_port = ports.pop(0)
101 106 if stdin_port <= 0:
102 107 stdin_port = ports.pop(0)
103 108 if hb_port <= 0:
104 109 hb_port = ports.pop(0)
105 110
106 111 cfg = dict( shell_port=shell_port,
107 112 iopub_port=iopub_port,
108 113 stdin_port=stdin_port,
109 114 hb_port=hb_port,
110 115 )
111 116 cfg['ip'] = ip
112 117 cfg['key'] = bytes_to_str(key)
113 118 cfg['transport'] = transport
114 119
115 120 with open(fname, 'w') as f:
116 121 f.write(json.dumps(cfg, indent=2))
117 122
118 123 return fname, cfg
119 124
120 125
121 126 def get_connection_file(app=None):
122 127 """Return the path to the connection file of an app
123 128
124 129 Parameters
125 130 ----------
126 131 app : IPKernelApp instance [optional]
127 132 If unspecified, the currently running app will be used
128 133 """
129 134 if app is None:
130 135 from IPython.kernel.zmq.kernelapp import IPKernelApp
131 136 if not IPKernelApp.initialized():
132 137 raise RuntimeError("app not specified, and not in a running Kernel")
133 138
134 139 app = IPKernelApp.instance()
135 140 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
136 141
137 142
138 143 def find_connection_file(filename, profile=None):
139 144 """find a connection file, and return its absolute path.
140 145
141 146 The current working directory and the profile's security
142 147 directory will be searched for the file if it is not given by
143 148 absolute path.
144 149
145 150 If profile is unspecified, then the current running application's
146 151 profile will be used, or 'default', if not run from IPython.
147 152
148 153 If the argument does not match an existing file, it will be interpreted as a
149 154 fileglob, and the matching file in the profile's security dir with
150 155 the latest access time will be used.
151 156
152 157 Parameters
153 158 ----------
154 159 filename : str
155 160 The connection file or fileglob to search for.
156 161 profile : str [optional]
157 162 The name of the profile to use when searching for the connection file,
158 163 if different from the current IPython session or 'default'.
159 164
160 165 Returns
161 166 -------
162 167 str : The absolute path of the connection file.
163 168 """
164 169 from IPython.core.application import BaseIPythonApplication as IPApp
165 170 try:
166 171 # quick check for absolute path, before going through logic
167 172 return filefind(filename)
168 173 except IOError:
169 174 pass
170 175
171 176 if profile is None:
172 177 # profile unspecified, check if running from an IPython app
173 178 if IPApp.initialized():
174 179 app = IPApp.instance()
175 180 profile_dir = app.profile_dir
176 181 else:
177 182 # not running in IPython, use default profile
178 183 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), 'default')
179 184 else:
180 185 # find profiledir by profile name:
181 186 profile_dir = ProfileDir.find_profile_dir_by_name(get_ipython_dir(), profile)
182 187 security_dir = profile_dir.security_dir
183 188
184 189 try:
185 190 # first, try explicit name
186 191 return filefind(filename, ['.', security_dir])
187 192 except IOError:
188 193 pass
189 194
190 195 # not found by full name
191 196
192 197 if '*' in filename:
193 198 # given as a glob already
194 199 pat = filename
195 200 else:
196 201 # accept any substring match
197 202 pat = '*%s*' % filename
198 203 matches = glob.glob( os.path.join(security_dir, pat) )
199 204 if not matches:
200 205 raise IOError("Could not find %r in %r" % (filename, security_dir))
201 206 elif len(matches) == 1:
202 207 return matches[0]
203 208 else:
204 209 # get most recent match, by access time:
205 210 return sorted(matches, key=lambda f: os.stat(f).st_atime)[-1]
206 211
207 212
208 213 def get_connection_info(connection_file=None, unpack=False, profile=None):
209 214 """Return the connection information for the current Kernel.
210 215
211 216 Parameters
212 217 ----------
213 218 connection_file : str [optional]
214 219 The connection file to be used. Can be given by absolute path, or
215 220 IPython will search in the security directory of a given profile.
216 221 If run from IPython,
217 222
218 223 If unspecified, the connection file for the currently running
219 224 IPython Kernel will be used, which is only allowed from inside a kernel.
220 225 unpack : bool [default: False]
221 226 if True, return the unpacked dict, otherwise just the string contents
222 227 of the file.
223 228 profile : str [optional]
224 229 The name of the profile to use when searching for the connection file,
225 230 if different from the current IPython session or 'default'.
226 231
227 232
228 233 Returns
229 234 -------
230 235 The connection dictionary of the current kernel, as string or dict,
231 236 depending on `unpack`.
232 237 """
233 238 if connection_file is None:
234 239 # get connection file from current kernel
235 240 cf = get_connection_file()
236 241 else:
237 242 # connection file specified, allow shortnames:
238 243 cf = find_connection_file(connection_file, profile=profile)
239 244
240 245 with open(cf) as f:
241 246 info = f.read()
242 247
243 248 if unpack:
244 249 info = json.loads(info)
245 250 # ensure key is bytes:
246 251 info['key'] = str_to_bytes(info.get('key', ''))
247 252 return info
248 253
249 254
250 255 def connect_qtconsole(connection_file=None, argv=None, profile=None):
251 256 """Connect a qtconsole to the current kernel.
252 257
253 258 This is useful for connecting a second qtconsole to a kernel, or to a
254 259 local notebook.
255 260
256 261 Parameters
257 262 ----------
258 263 connection_file : str [optional]
259 264 The connection file to be used. Can be given by absolute path, or
260 265 IPython will search in the security directory of a given profile.
261 266 If run from IPython,
262 267
263 268 If unspecified, the connection file for the currently running
264 269 IPython Kernel will be used, which is only allowed from inside a kernel.
265 270 argv : list [optional]
266 271 Any extra args to be passed to the console.
267 272 profile : str [optional]
268 273 The name of the profile to use when searching for the connection file,
269 274 if different from the current IPython session or 'default'.
270 275
271 276
272 277 Returns
273 278 -------
274 279 subprocess.Popen instance running the qtconsole frontend
275 280 """
276 281 argv = [] if argv is None else argv
277 282
278 283 if connection_file is None:
279 284 # get connection file from current kernel
280 285 cf = get_connection_file()
281 286 else:
282 287 cf = find_connection_file(connection_file, profile=profile)
283 288
284 289 cmd = ';'.join([
285 290 "from IPython.frontend.qt.console import qtconsoleapp",
286 291 "qtconsoleapp.main()"
287 292 ])
288 293
289 294 return Popen([sys.executable, '-c', cmd, '--existing', cf] + argv, stdout=PIPE, stderr=PIPE)
290 295
291 296
292 297 def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
293 298 """tunnel connections to a kernel via ssh
294 299
295 300 This will open four SSH tunnels from localhost on this machine to the
296 301 ports associated with the kernel. They can be either direct
297 302 localhost-localhost tunnels, or if an intermediate server is necessary,
298 303 the kernel must be listening on a public IP.
299 304
300 305 Parameters
301 306 ----------
302 307 connection_info : dict or str (path)
303 308 Either a connection dict, or the path to a JSON connection file
304 309 sshserver : str
305 310 The ssh sever to use to tunnel to the kernel. Can be a full
306 311 `user@server:port` string. ssh config aliases are respected.
307 312 sshkey : str [optional]
308 313 Path to file containing ssh key to use for authentication.
309 314 Only necessary if your ssh config does not already associate
310 315 a keyfile with the host.
311 316
312 317 Returns
313 318 -------
314 319
315 320 (shell, iopub, stdin, hb) : ints
316 321 The four ports on localhost that have been forwarded to the kernel.
317 322 """
318 323 if isinstance(connection_info, basestring):
319 324 # it's a path, unpack it
320 325 with open(connection_info) as f:
321 326 connection_info = json.loads(f.read())
322 327
323 328 cf = connection_info
324 329
325 330 lports = tunnel.select_random_ports(4)
326 331 rports = cf['shell_port'], cf['iopub_port'], cf['stdin_port'], cf['hb_port']
327 332
328 333 remote_ip = cf['ip']
329 334
330 335 if tunnel.try_passwordless_ssh(sshserver, sshkey):
331 336 password=False
332 337 else:
333 338 password = getpass("SSH Password for %s: "%sshserver)
334 339
335 340 for lp,rp in zip(lports, rports):
336 341 tunnel.ssh_tunnel(lp, rp, sshserver, remote_ip, sshkey, password)
337 342
338 343 return tuple(lports)
339 344
345
346 #-----------------------------------------------------------------------------
347 # Mixin for classes that workw ith connection files
348 #-----------------------------------------------------------------------------
349
350 class ConnectionFileMixin(HasTraits):
351 """Mixin for configurable classes that work with connection files"""
352
353 # The addresses for the communication channels
354 connection_file = Unicode('')
355 _connection_file_written = Bool(False)
356
357 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
358
359 ip = Unicode(LOCALHOST, config=True,
360 help="""Set the kernel\'s IP address [default localhost].
361 If the IP address is something other than localhost, then
362 Consoles on other machines will be able to connect
363 to the Kernel, so be careful!"""
364 )
365
366 def _ip_default(self):
367 if self.transport == 'ipc':
368 if self.connection_file:
369 return os.path.splitext(self.connection_file)[0] + '-ipc'
370 else:
371 return 'kernel-ipc'
372 else:
373 return LOCALHOST
374
375 def _ip_changed(self, name, old, new):
376 if new == '*':
377 self.ip = '0.0.0.0'
378
379 # protected traits
380
381 shell_port = Integer(0)
382 iopub_port = Integer(0)
383 stdin_port = Integer(0)
384 hb_port = Integer(0)
385
386 #--------------------------------------------------------------------------
387 # Connection and ipc file management
388 #--------------------------------------------------------------------------
389
390 def cleanup_connection_file(self):
391 """Cleanup connection file *if we wrote it*
392
393 Will not raise if the connection file was already removed somehow.
394 """
395 if self._connection_file_written:
396 # cleanup connection files on full shutdown of kernel we started
397 self._connection_file_written = False
398 try:
399 os.remove(self.connection_file)
400 except (IOError, OSError, AttributeError):
401 pass
402
403 def cleanup_ipc_files(self):
404 """Cleanup ipc files if we wrote them."""
405 if self.transport != 'ipc':
406 return
407 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
408 ipcfile = "%s-%i" % (self.ip, port)
409 try:
410 os.remove(ipcfile)
411 except (IOError, OSError):
412 pass
413
414 def write_connection_file(self):
415 """Write connection info to JSON dict in self.connection_file."""
416 if self._connection_file_written:
417 return
418 self.connection_file,cfg = write_connection_file(self.connection_file,
419 transport=self.transport, ip=self.ip, key=self.session.key,
420 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
421 shell_port=self.shell_port, hb_port=self.hb_port)
422 # write_connection_file also sets default ports:
423 self.shell_port = cfg['shell_port']
424 self.stdin_port = cfg['stdin_port']
425 self.iopub_port = cfg['iopub_port']
426 self.hb_port = cfg['hb_port']
427
428 self._connection_file_written = True
429
430 def load_connection_file(self):
431 """Load connection info from JSON dict in self.connection_file."""
432 with open(self.connection_file) as f:
433 cfg = json.loads(f.read())
434
435 self.transport = cfg.get('transport', 'tcp')
436 self.ip = cfg['ip']
437 self.shell_port = cfg['shell_port']
438 self.stdin_port = cfg['stdin_port']
439 self.iopub_port = cfg['iopub_port']
440 self.hb_port = cfg['hb_port']
441 self.session.key = str_to_bytes(cfg['key'])
442
443
444
340 445 __all__ = [
341 446 'write_connection_file',
342 447 'get_connection_file',
343 448 'find_connection_file',
344 449 'get_connection_info',
345 450 'connect_qtconsole',
346 451 'tunnel_to_kernel',
347 ] No newline at end of file
452 ]
@@ -1,50 +1,50 b''
1 """A kernel manager with ioloop based logic."""
1 """A kernel manager with a tornado IOLoop"""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import zmq
17 17 from zmq.eventloop import ioloop
18 18
19 19 from IPython.utils.traitlets import (
20 20 Instance
21 21 )
22 22
23 from IPython.kernel.blocking.manager import BlockingKernelManager
23 from IPython.kernel.manager import KernelManager
24 24 from .restarter import IOLoopKernelRestarter
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Code
28 28 #-----------------------------------------------------------------------------
29 29
30 class IOLoopKernelManager(BlockingKernelManager):
30 class IOLoopKernelManager(KernelManager):
31 31
32 32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
33 33 def _loop_default(self):
34 34 return ioloop.IOLoop.instance()
35 35
36 36 _restarter = Instance('IPython.kernel.ioloop.IOLoopKernelRestarter')
37 37
38 38 def start_restarter(self):
39 39 if self.autorestart and self.has_kernel:
40 40 if self._restarter is None:
41 41 self._restarter = IOLoopKernelRestarter(
42 42 kernel_manager=self, loop=self.loop,
43 43 config=self.config, log=self.log
44 44 )
45 45 self._restarter.start()
46 46
47 47 def stop_restarter(self):
48 48 if self.autorestart:
49 49 if self._restarter is not None:
50 50 self._restarter.stop()
This diff has been collapsed as it changes many lines, (869 lines changed) Show them Hide them
@@ -1,1146 +1,325 b''
1 """Base classes to manage the interaction with a running kernel.
2
3 TODO
4 * Create logger to handle debugging and console messages.
1 """Base class to manage a running kernel
5 2 """
6 3
7 4 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
5 # Copyright (C) 2013 The IPython Development Team
9 6 #
10 7 # Distributed under the terms of the BSD License. The full license is in
11 8 # the file COPYING, distributed as part of this software.
12 9 #-----------------------------------------------------------------------------
13 10
14 11 #-----------------------------------------------------------------------------
15 12 # Imports
16 13 #-----------------------------------------------------------------------------
17 14
18 15 from __future__ import absolute_import
19 16
20 17 # Standard library imports
21 import atexit
22 import errno
23 import json
24 import os
25 18 import signal
26 19 import sys
27 from threading import Thread
28 20 import time
29 21
30 22 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
35 23
36 24 # Local imports
37 from IPython.config.configurable import Configurable
38 from IPython.utils.importstring import import_item
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.localinterfaces import LOCAL_IPS
40 27 from IPython.utils.traitlets import (
41 Any, Instance, Type, Unicode, List, Integer, Bool,
42 CaselessStrEnum, DottedObjectName
28 Any, Instance, Unicode, List, Bool,
43 29 )
44 from IPython.utils.py3compat import str_to_bytes
45 30 from IPython.kernel import (
46 write_connection_file,
47 31 make_ipkernel_cmd,
48 32 launch_kernel,
49 33 )
34 from .connect import ConnectionFileMixin
50 35 from .zmq.session import Session
51 36 from .managerabc import (
52 ShellChannelABC, IOPubChannelABC,
53 HBChannelABC, StdInChannelABC,
54 37 KernelManagerABC
55 38 )
56 39
57 40 #-----------------------------------------------------------------------------
58 # Constants and exceptions
59 #-----------------------------------------------------------------------------
60
61 class InvalidPortNumber(Exception):
62 pass
63
64 #-----------------------------------------------------------------------------
65 # Utility functions
66 #-----------------------------------------------------------------------------
67
68 # some utilities to validate message structure, these might get moved elsewhere
69 # if they prove to have more generic utility
70
71 def validate_string_list(lst):
72 """Validate that the input is a list of strings.
73
74 Raises ValueError if not."""
75 if not isinstance(lst, list):
76 raise ValueError('input %r must be a list' % lst)
77 for x in lst:
78 if not isinstance(x, basestring):
79 raise ValueError('element %r in list must be a string' % x)
80
81
82 def validate_string_dict(dct):
83 """Validate that the input is a dict with string keys and values.
84
85 Raises ValueError if not."""
86 for k,v in dct.iteritems():
87 if not isinstance(k, basestring):
88 raise ValueError('key %r in dict must be a string' % k)
89 if not isinstance(v, basestring):
90 raise ValueError('value %r in dict must be a string' % v)
91
92
93 #-----------------------------------------------------------------------------
94 # ZMQ Socket Channel classes
95 #-----------------------------------------------------------------------------
96
97 class ZMQSocketChannel(Thread):
98 """The base class for the channels that use ZMQ sockets."""
99 context = None
100 session = None
101 socket = None
102 ioloop = None
103 stream = None
104 _address = None
105 _exiting = False
106
107 def __init__(self, context, session, address):
108 """Create a channel.
109
110 Parameters
111 ----------
112 context : :class:`zmq.Context`
113 The ZMQ context to use.
114 session : :class:`session.Session`
115 The session to use.
116 address : zmq url
117 Standard (ip, port) tuple that the kernel is listening on.
118 """
119 super(ZMQSocketChannel, self).__init__()
120 self.daemon = True
121
122 self.context = context
123 self.session = session
124 if isinstance(address, tuple):
125 if address[1] == 0:
126 message = 'The port number for a channel cannot be 0.'
127 raise InvalidPortNumber(message)
128 address = "tcp://%s:%i" % address
129 self._address = address
130 atexit.register(self._notice_exit)
131
132 def _notice_exit(self):
133 self._exiting = True
134
135 def _run_loop(self):
136 """Run my loop, ignoring EINTR events in the poller"""
137 while True:
138 try:
139 self.ioloop.start()
140 except ZMQError as e:
141 if e.errno == errno.EINTR:
142 continue
143 else:
144 raise
145 except Exception:
146 if self._exiting:
147 break
148 else:
149 raise
150 else:
151 break
152
153 def stop(self):
154 """Stop the channel's event loop and join its thread.
155
156 This calls :method:`Thread.join` and returns when the thread
157 terminates. :class:`RuntimeError` will be raised if
158 :method:`self.start` is called again.
159 """
160 self.join()
161
162 @property
163 def address(self):
164 """Get the channel's address as a zmq url string.
165
166 These URLS have the form: 'tcp://127.0.0.1:5555'.
167 """
168 return self._address
169
170 def _queue_send(self, msg):
171 """Queue a message to be sent from the IOLoop's thread.
172
173 Parameters
174 ----------
175 msg : message to send
176
177 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
178 thread control of the action.
179 """
180 def thread_send():
181 self.session.send(self.stream, msg)
182 self.ioloop.add_callback(thread_send)
183
184 def _handle_recv(self, msg):
185 """Callback for stream.on_recv.
186
187 Unpacks message, and calls handlers with it.
188 """
189 ident,smsg = self.session.feed_identities(msg)
190 self.call_handlers(self.session.unserialize(smsg))
191
192
193
194 class ShellChannel(ZMQSocketChannel):
195 """The shell channel for issuing request/replies to the kernel."""
196
197 command_queue = None
198 # flag for whether execute requests should be allowed to call raw_input:
199 allow_stdin = True
200
201 def __init__(self, context, session, address):
202 super(ShellChannel, self).__init__(context, session, address)
203 self.ioloop = ioloop.IOLoop()
204
205 def run(self):
206 """The thread's main activity. Call start() instead."""
207 self.socket = self.context.socket(zmq.DEALER)
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
213 try:
214 self.socket.close()
215 except:
216 pass
217
218 def stop(self):
219 """Stop the channel's event loop and join its thread."""
220 self.ioloop.stop()
221 super(ShellChannel, self).stop()
222
223 def call_handlers(self, msg):
224 """This method is called in the ioloop thread when a message arrives.
225
226 Subclasses should override this method to handle incoming messages.
227 It is important to remember that this method is called in the thread
228 so that some logic must be done to ensure that the application leve
229 handlers are called in the application thread.
230 """
231 raise NotImplementedError('call_handlers must be defined in a subclass.')
232
233 def execute(self, code, silent=False, store_history=True,
234 user_variables=None, user_expressions=None, allow_stdin=None):
235 """Execute code in the kernel.
236
237 Parameters
238 ----------
239 code : str
240 A string of Python code.
241
242 silent : bool, optional (default False)
243 If set, the kernel will execute the code as quietly possible, and
244 will force store_history to be False.
245
246 store_history : bool, optional (default True)
247 If set, the kernel will store command history. This is forced
248 to be False if silent is True.
249
250 user_variables : list, optional
251 A list of variable names to pull from the user's namespace. They
252 will come back as a dict with these names as keys and their
253 :func:`repr` as values.
254
255 user_expressions : dict, optional
256 A dict mapping names to expressions to be evaluated in the user's
257 dict. The expression values are returned as strings formatted using
258 :func:`repr`.
259
260 allow_stdin : bool, optional (default self.allow_stdin)
261 Flag for whether the kernel can send stdin requests to frontends.
262
263 Some frontends (e.g. the Notebook) do not support stdin requests.
264 If raw_input is called from code executed from such a frontend, a
265 StdinNotImplementedError will be raised.
266
267 Returns
268 -------
269 The msg_id of the message sent.
270 """
271 if user_variables is None:
272 user_variables = []
273 if user_expressions is None:
274 user_expressions = {}
275 if allow_stdin is None:
276 allow_stdin = self.allow_stdin
277
278
279 # Don't waste network traffic if inputs are invalid
280 if not isinstance(code, basestring):
281 raise ValueError('code %r must be a string' % code)
282 validate_string_list(user_variables)
283 validate_string_dict(user_expressions)
284
285 # Create class for content/msg creation. Related to, but possibly
286 # not in Session.
287 content = dict(code=code, silent=silent, store_history=store_history,
288 user_variables=user_variables,
289 user_expressions=user_expressions,
290 allow_stdin=allow_stdin,
291 )
292 msg = self.session.msg('execute_request', content)
293 self._queue_send(msg)
294 return msg['header']['msg_id']
295
296 def complete(self, text, line, cursor_pos, block=None):
297 """Tab complete text in the kernel's namespace.
298
299 Parameters
300 ----------
301 text : str
302 The text to complete.
303 line : str
304 The full line of text that is the surrounding context for the
305 text to complete.
306 cursor_pos : int
307 The position of the cursor in the line where the completion was
308 requested.
309 block : str, optional
310 The full block of code in which the completion is being requested.
311
312 Returns
313 -------
314 The msg_id of the message sent.
315 """
316 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
317 msg = self.session.msg('complete_request', content)
318 self._queue_send(msg)
319 return msg['header']['msg_id']
320
321 def object_info(self, oname, detail_level=0):
322 """Get metadata information about an object in the kernel's namespace.
323
324 Parameters
325 ----------
326 oname : str
327 A string specifying the object name.
328 detail_level : int, optional
329 The level of detail for the introspection (0-2)
330
331 Returns
332 -------
333 The msg_id of the message sent.
334 """
335 content = dict(oname=oname, detail_level=detail_level)
336 msg = self.session.msg('object_info_request', content)
337 self._queue_send(msg)
338 return msg['header']['msg_id']
339
340 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
341 """Get entries from the kernel's history list.
342
343 Parameters
344 ----------
345 raw : bool
346 If True, return the raw input.
347 output : bool
348 If True, then return the output as well.
349 hist_access_type : str
350 'range' (fill in session, start and stop params), 'tail' (fill in n)
351 or 'search' (fill in pattern param).
352
353 session : int
354 For a range request, the session from which to get lines. Session
355 numbers are positive integers; negative ones count back from the
356 current session.
357 start : int
358 The first line number of a history range.
359 stop : int
360 The final (excluded) line number of a history range.
361
362 n : int
363 The number of lines of history to get for a tail request.
364
365 pattern : str
366 The glob-syntax pattern for a search request.
367
368 Returns
369 -------
370 The msg_id of the message sent.
371 """
372 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
373 **kwargs)
374 msg = self.session.msg('history_request', content)
375 self._queue_send(msg)
376 return msg['header']['msg_id']
377
378 def kernel_info(self):
379 """Request kernel info."""
380 msg = self.session.msg('kernel_info_request')
381 self._queue_send(msg)
382 return msg['header']['msg_id']
383
384 def shutdown(self, restart=False):
385 """Request an immediate kernel shutdown.
386
387 Upon receipt of the (empty) reply, client code can safely assume that
388 the kernel has shut down and it's safe to forcefully terminate it if
389 it's still alive.
390
391 The kernel will send the reply via a function registered with Python's
392 atexit module, ensuring it's truly done as the kernel is done with all
393 normal operation.
394 """
395 # Send quit message to kernel. Once we implement kernel-side setattr,
396 # this should probably be done that way, but for now this will do.
397 msg = self.session.msg('shutdown_request', {'restart':restart})
398 self._queue_send(msg)
399 return msg['header']['msg_id']
400
401
402
403 class IOPubChannel(ZMQSocketChannel):
404 """The iopub channel which listens for messages that the kernel publishes.
405
406 This channel is where all output is published to frontends.
407 """
408
409 def __init__(self, context, session, address):
410 super(IOPubChannel, self).__init__(context, session, address)
411 self.ioloop = ioloop.IOLoop()
412
413 def run(self):
414 """The thread's main activity. Call start() instead."""
415 self.socket = self.context.socket(zmq.SUB)
416 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
417 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
418 self.socket.connect(self.address)
419 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
420 self.stream.on_recv(self._handle_recv)
421 self._run_loop()
422 try:
423 self.socket.close()
424 except:
425 pass
426
427 def stop(self):
428 """Stop the channel's event loop and join its thread."""
429 self.ioloop.stop()
430 super(IOPubChannel, self).stop()
431
432 def call_handlers(self, msg):
433 """This method is called in the ioloop thread when a message arrives.
434
435 Subclasses should override this method to handle incoming messages.
436 It is important to remember that this method is called in the thread
437 so that some logic must be done to ensure that the application leve
438 handlers are called in the application thread.
439 """
440 raise NotImplementedError('call_handlers must be defined in a subclass.')
441
442 def flush(self, timeout=1.0):
443 """Immediately processes all pending messages on the iopub channel.
444
445 Callers should use this method to ensure that :method:`call_handlers`
446 has been called for all messages that have been received on the
447 0MQ SUB socket of this channel.
448
449 This method is thread safe.
450
451 Parameters
452 ----------
453 timeout : float, optional
454 The maximum amount of time to spend flushing, in seconds. The
455 default is one second.
456 """
457 # We do the IOLoop callback process twice to ensure that the IOLoop
458 # gets to perform at least one full poll.
459 stop_time = time.time() + timeout
460 for i in xrange(2):
461 self._flushed = False
462 self.ioloop.add_callback(self._flush)
463 while not self._flushed and time.time() < stop_time:
464 time.sleep(0.01)
465
466 def _flush(self):
467 """Callback for :method:`self.flush`."""
468 self.stream.flush()
469 self._flushed = True
470
471
472 class StdInChannel(ZMQSocketChannel):
473 """The stdin channel to handle raw_input requests that the kernel makes."""
474
475 msg_queue = None
476
477 def __init__(self, context, session, address):
478 super(StdInChannel, self).__init__(context, session, address)
479 self.ioloop = ioloop.IOLoop()
480
481 def run(self):
482 """The thread's main activity. Call start() instead."""
483 self.socket = self.context.socket(zmq.DEALER)
484 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
485 self.socket.connect(self.address)
486 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
487 self.stream.on_recv(self._handle_recv)
488 self._run_loop()
489 try:
490 self.socket.close()
491 except:
492 pass
493
494 def stop(self):
495 """Stop the channel's event loop and join its thread."""
496 self.ioloop.stop()
497 super(StdInChannel, self).stop()
498
499 def call_handlers(self, msg):
500 """This method is called in the ioloop thread when a message arrives.
501
502 Subclasses should override this method to handle incoming messages.
503 It is important to remember that this method is called in the thread
504 so that some logic must be done to ensure that the application leve
505 handlers are called in the application thread.
506 """
507 raise NotImplementedError('call_handlers must be defined in a subclass.')
508
509 def input(self, string):
510 """Send a string of raw input to the kernel."""
511 content = dict(value=string)
512 msg = self.session.msg('input_reply', content)
513 self._queue_send(msg)
514
515
516 class HBChannel(ZMQSocketChannel):
517 """The heartbeat channel which monitors the kernel heartbeat.
518
519 Note that the heartbeat channel is paused by default. As long as you start
520 this channel, the kernel manager will ensure that it is paused and un-paused
521 as appropriate.
522 """
523
524 time_to_dead = 3.0
525 socket = None
526 poller = None
527 _running = None
528 _pause = None
529 _beating = None
530
531 def __init__(self, context, session, address):
532 super(HBChannel, self).__init__(context, session, address)
533 self._running = False
534 self._pause =True
535 self.poller = zmq.Poller()
536
537 def _create_socket(self):
538 if self.socket is not None:
539 # close previous socket, before opening a new one
540 self.poller.unregister(self.socket)
541 self.socket.close()
542 self.socket = self.context.socket(zmq.REQ)
543 self.socket.setsockopt(zmq.LINGER, 0)
544 self.socket.connect(self.address)
545
546 self.poller.register(self.socket, zmq.POLLIN)
547
548 def _poll(self, start_time):
549 """poll for heartbeat replies until we reach self.time_to_dead.
550
551 Ignores interrupts, and returns the result of poll(), which
552 will be an empty list if no messages arrived before the timeout,
553 or the event tuple if there is a message to receive.
554 """
555
556 until_dead = self.time_to_dead - (time.time() - start_time)
557 # ensure poll at least once
558 until_dead = max(until_dead, 1e-3)
559 events = []
560 while True:
561 try:
562 events = self.poller.poll(1000 * until_dead)
563 except ZMQError as e:
564 if e.errno == errno.EINTR:
565 # ignore interrupts during heartbeat
566 # this may never actually happen
567 until_dead = self.time_to_dead - (time.time() - start_time)
568 until_dead = max(until_dead, 1e-3)
569 pass
570 else:
571 raise
572 except Exception:
573 if self._exiting:
574 break
575 else:
576 raise
577 else:
578 break
579 return events
580
581 def run(self):
582 """The thread's main activity. Call start() instead."""
583 self._create_socket()
584 self._running = True
585 self._beating = True
586
587 while self._running:
588 if self._pause:
589 # just sleep, and skip the rest of the loop
590 time.sleep(self.time_to_dead)
591 continue
592
593 since_last_heartbeat = 0.0
594 # io.rprint('Ping from HB channel') # dbg
595 # no need to catch EFSM here, because the previous event was
596 # either a recv or connect, which cannot be followed by EFSM
597 self.socket.send(b'ping')
598 request_time = time.time()
599 ready = self._poll(request_time)
600 if ready:
601 self._beating = True
602 # the poll above guarantees we have something to recv
603 self.socket.recv()
604 # sleep the remainder of the cycle
605 remainder = self.time_to_dead - (time.time() - request_time)
606 if remainder > 0:
607 time.sleep(remainder)
608 continue
609 else:
610 # nothing was received within the time limit, signal heart failure
611 self._beating = False
612 since_last_heartbeat = time.time() - request_time
613 self.call_handlers(since_last_heartbeat)
614 # and close/reopen the socket, because the REQ/REP cycle has been broken
615 self._create_socket()
616 continue
617 try:
618 self.socket.close()
619 except:
620 pass
621
622 def pause(self):
623 """Pause the heartbeat."""
624 self._pause = True
625
626 def unpause(self):
627 """Unpause the heartbeat."""
628 self._pause = False
629
630 def is_beating(self):
631 """Is the heartbeat running and responsive (and not paused)."""
632 if self.is_alive() and not self._pause and self._beating:
633 return True
634 else:
635 return False
636
637 def stop(self):
638 """Stop the channel's event loop and join its thread."""
639 self._running = False
640 super(HBChannel, self).stop()
641
642 def call_handlers(self, since_last_heartbeat):
643 """This method is called in the ioloop thread when a message arrives.
644
645 Subclasses should override this method to handle incoming messages.
646 It is important to remember that this method is called in the thread
647 so that some logic must be done to ensure that the application level
648 handlers are called in the application thread.
649 """
650 raise NotImplementedError('call_handlers must be defined in a subclass.')
651
652
653 #-----------------------------------------------------------------------------
654 41 # Main kernel manager class
655 42 #-----------------------------------------------------------------------------
656 43
657 class KernelManager(Configurable):
658 """Manages a single kernel on this host along with its channels.
659
660 There are four channels associated with each kernel:
44 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
45 """Manages a single kernel in a subprocess on this host.
661 46
662 * shell: for request/reply calls to the kernel.
663 * iopub: for the kernel to publish results to frontends.
664 * hb: for monitoring the kernel's heartbeat.
665 * stdin: for frontends to reply to raw_input calls in the kernel.
666
667 The usage of the channels that this class manages is optional. It is
668 entirely possible to connect to the kernels directly using ZeroMQ
669 sockets. These channels are useful primarily for talking to a kernel
670 whose :class:`KernelManager` is in the same process.
671
672 This version manages kernels started using Popen.
47 This version starts kernels with Popen.
673 48 """
49
674 50 # The PyZMQ Context to use for communication with the kernel.
675 51 context = Instance(zmq.Context)
676 52 def _context_default(self):
677 53 return zmq.Context.instance()
678 54
679 55 # The Session to use for communication with the kernel.
680 56 session = Instance(Session)
681 57 def _session_default(self):
682 58 return Session(config=self.config)
683 59
684 60 # The kernel process with which the KernelManager is communicating.
685 61 # generally a Popen instance
686 62 kernel = Any()
687 63
688 64 kernel_cmd = List(Unicode, config=True,
689 65 help="""The Popen Command to launch the kernel.
690 66 Override this if you have a custom
691 67 """
692 68 )
693 69
694 70 def _kernel_cmd_changed(self, name, old, new):
695 71 self.ipython_kernel = False
696 72
697 73 ipython_kernel = Bool(True)
698 74
699 # The addresses for the communication channels.
700 connection_file = Unicode('')
701
702 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
703
704 ip = Unicode(LOCALHOST, config=True,
705 help="""Set the kernel\'s IP address [default localhost].
706 If the IP address is something other than localhost, then
707 Consoles on other machines will be able to connect
708 to the Kernel, so be careful!"""
709 )
710
711 def _ip_default(self):
712 if self.transport == 'ipc':
713 if self.connection_file:
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 else:
716 return 'kernel-ipc'
717 else:
718 return LOCALHOST
719
720 def _ip_changed(self, name, old, new):
721 if new == '*':
722 self.ip = '0.0.0.0'
723
724 shell_port = Integer(0)
725 iopub_port = Integer(0)
726 stdin_port = Integer(0)
727 hb_port = Integer(0)
728
729 # The classes to use for the various channels.
730 shell_channel_class = Type(ShellChannel)
731 iopub_channel_class = Type(IOPubChannel)
732 stdin_channel_class = Type(StdInChannel)
733 hb_channel_class = Type(HBChannel)
734
735 # Protected traits.
75 # Protected traits
736 76 _launch_args = Any
737 _shell_channel = Any
738 _iopub_channel = Any
739 _stdin_channel = Any
740 _hb_channel = Any
741 _connection_file_written=Bool(False)
742 77
743 78 autorestart = Bool(False, config=True,
744 79 help="""Should we autorestart the kernel if it dies."""
745 80 )
746 81
747 82 def __del__(self):
748 83 self.cleanup_connection_file()
749 84
750 85 #--------------------------------------------------------------------------
751 # Channel management methods:
752 #--------------------------------------------------------------------------
753
754 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
755 """Starts the channels for this kernel.
756
757 This will create the channels if they do not exist and then start
758 them (their activity runs in a thread). If port numbers of 0 are
759 being used (random ports) then you must first call
760 :method:`start_kernel`. If the channels have been stopped and you
761 call this, :class:`RuntimeError` will be raised.
762 """
763 if shell:
764 self.shell_channel.start()
765 if iopub:
766 self.iopub_channel.start()
767 if stdin:
768 self.stdin_channel.start()
769 self.shell_channel.allow_stdin = True
770 else:
771 self.shell_channel.allow_stdin = False
772 if hb:
773 self.hb_channel.start()
774
775 def stop_channels(self):
776 """Stops all the running channels for this kernel.
777
778 This stops their event loops and joins their threads.
779 """
780 if self.shell_channel.is_alive():
781 self.shell_channel.stop()
782 if self.iopub_channel.is_alive():
783 self.iopub_channel.stop()
784 if self.stdin_channel.is_alive():
785 self.stdin_channel.stop()
786 if self.hb_channel.is_alive():
787 self.hb_channel.stop()
788
789 @property
790 def channels_running(self):
791 """Are any of the channels created and running?"""
792 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
793 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
794
795 def _make_url(self, port):
796 """Make a zmq url with a port.
797
798 There are two cases that this handles:
799
800 * tcp: tcp://ip:port
801 * ipc: ipc://ip-port
802 """
803 if self.transport == 'tcp':
804 return "tcp://%s:%i" % (self.ip, port)
805 else:
806 return "%s://%s-%s" % (self.transport, self.ip, port)
807
808 @property
809 def shell_channel(self):
810 """Get the shell channel object for this kernel."""
811 if self._shell_channel is None:
812 self._shell_channel = self.shell_channel_class(
813 self.context, self.session, self._make_url(self.shell_port)
814 )
815 return self._shell_channel
816
817 @property
818 def iopub_channel(self):
819 """Get the iopub channel object for this kernel."""
820 if self._iopub_channel is None:
821 self._iopub_channel = self.iopub_channel_class(
822 self.context, self.session, self._make_url(self.iopub_port)
823 )
824 return self._iopub_channel
825
826 @property
827 def stdin_channel(self):
828 """Get the stdin channel object for this kernel."""
829 if self._stdin_channel is None:
830 self._stdin_channel = self.stdin_channel_class(
831 self.context, self.session, self._make_url(self.stdin_port)
832 )
833 return self._stdin_channel
834
835 @property
836 def hb_channel(self):
837 """Get the hb channel object for this kernel."""
838 if self._hb_channel is None:
839 self._hb_channel = self.hb_channel_class(
840 self.context, self.session, self._make_url(self.hb_port)
841 )
842 return self._hb_channel
843
844 #--------------------------------------------------------------------------
845 # Connection and ipc file management
846 #--------------------------------------------------------------------------
847
848 def cleanup_connection_file(self):
849 """Cleanup connection file *if we wrote it*
850
851 Will not raise if the connection file was already removed somehow.
852 """
853 if self._connection_file_written:
854 # cleanup connection files on full shutdown of kernel we started
855 self._connection_file_written = False
856 try:
857 os.remove(self.connection_file)
858 except (IOError, OSError, AttributeError):
859 pass
860
861 def cleanup_ipc_files(self):
862 """Cleanup ipc files if we wrote them."""
863 if self.transport != 'ipc':
864 return
865 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
866 ipcfile = "%s-%i" % (self.ip, port)
867 try:
868 os.remove(ipcfile)
869 except (IOError, OSError):
870 pass
871
872 def load_connection_file(self):
873 """Load connection info from JSON dict in self.connection_file."""
874 with open(self.connection_file) as f:
875 cfg = json.loads(f.read())
876
877 from pprint import pprint
878 pprint(cfg)
879 self.transport = cfg.get('transport', 'tcp')
880 self.ip = cfg['ip']
881 self.shell_port = cfg['shell_port']
882 self.stdin_port = cfg['stdin_port']
883 self.iopub_port = cfg['iopub_port']
884 self.hb_port = cfg['hb_port']
885 self.session.key = str_to_bytes(cfg['key'])
886
887 def write_connection_file(self):
888 """Write connection info to JSON dict in self.connection_file."""
889 if self._connection_file_written:
890 return
891 self.connection_file,cfg = write_connection_file(self.connection_file,
892 transport=self.transport, ip=self.ip, key=self.session.key,
893 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
894 shell_port=self.shell_port, hb_port=self.hb_port)
895 # write_connection_file also sets default ports:
896 self.shell_port = cfg['shell_port']
897 self.stdin_port = cfg['stdin_port']
898 self.iopub_port = cfg['iopub_port']
899 self.hb_port = cfg['hb_port']
900
901 self._connection_file_written = True
902
903 #--------------------------------------------------------------------------
904 86 # Kernel restarter
905 87 #--------------------------------------------------------------------------
906 88
907 89 def start_restarter(self):
908 90 pass
909 91
910 92 def stop_restarter(self):
911 93 pass
912 94
913 95 #--------------------------------------------------------------------------
914 96 # Kernel management
915 97 #--------------------------------------------------------------------------
916 98
917 99 def format_kernel_cmd(self, **kw):
918 100 """format templated args (e.g. {connection_file})"""
919 101 if self.kernel_cmd:
920 102 cmd = self.kernel_cmd
921 103 else:
922 104 cmd = make_ipkernel_cmd(
923 105 'from IPython.kernel.zmq.kernelapp import main; main()',
924 106 **kw
925 107 )
926 108 ns = dict(connection_file=self.connection_file)
927 109 ns.update(self._launch_args)
928 110 return [ c.format(**ns) for c in cmd ]
929 111
930 112 def _launch_kernel(self, kernel_cmd, **kw):
931 113 """actually launch the kernel
932 114
933 115 override in a subclass to launch kernel subprocesses differently
934 116 """
935 117 return launch_kernel(kernel_cmd, **kw)
936 118
937 119 def start_kernel(self, **kw):
938 120 """Starts a kernel on this host in a separate process.
939 121
940 122 If random ports (port=0) are being used, this method must be called
941 123 before the channels are created.
942 124
943 125 Parameters:
944 126 -----------
945 127 **kw : optional
946 128 keyword arguments that are passed down to build the kernel_cmd
947 129 and launching the kernel (e.g. Popen kwargs).
948 130 """
949 131 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
950 132 raise RuntimeError("Can only launch a kernel on a local interface. "
951 133 "Make sure that the '*_address' attributes are "
952 134 "configured properly. "
953 135 "Currently valid addresses are: %s"%LOCAL_IPS
954 136 )
955 137
956 138 # write connection file / get default ports
957 139 self.write_connection_file()
958 140
959 141 # save kwargs for use in restart
960 142 self._launch_args = kw.copy()
961 143 # build the Popen cmd
962 144 kernel_cmd = self.format_kernel_cmd(**kw)
963 145 # launch the kernel subprocess
964 146 self.kernel = self._launch_kernel(kernel_cmd,
965 147 ipython_kernel=self.ipython_kernel,
966 148 **kw)
967 149 self.start_restarter()
968 150
151 def _send_shutdown_request(self, restart=False):
152 """TODO: send a shutdown request via control channel"""
153 raise NotImplementedError("Soft shutdown needs control channel")
154
969 155 def shutdown_kernel(self, now=False, restart=False):
970 156 """Attempts to the stop the kernel process cleanly.
971 157
972 158 This attempts to shutdown the kernels cleanly by:
973 159
974 160 1. Sending it a shutdown message over the shell channel.
975 161 2. If that fails, the kernel is shutdown forcibly by sending it
976 162 a signal.
977 163
978 164 Parameters:
979 165 -----------
980 166 now : bool
981 167 Should the kernel be forcible killed *now*. This skips the
982 168 first, nice shutdown attempt.
983 169 restart: bool
984 170 Will this kernel be restarted after it is shutdown. When this
985 171 is True, connection files will not be cleaned up.
986 172 """
987 173
988 # Pause the heart beat channel if it exists.
989 if self._hb_channel is not None:
990 self._hb_channel.pause()
991
992 174 # Stop monitoring for restarting while we shutdown.
993 175 self.stop_restarter()
994 176
995 177 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
996 178 if sys.platform == 'win32':
997 179 self._kill_kernel()
998 180 return
999 181
182 # bypass clean shutdown while
183 # FIXME: add control channel for clean shutdown
184 now = True
185
1000 186 if now:
1001 187 if self.has_kernel:
1002 188 self._kill_kernel()
1003 189 else:
1004 190 # Don't send any additional kernel kill messages immediately, to give
1005 191 # the kernel a chance to properly execute shutdown actions. Wait for at
1006 192 # most 1s, checking every 0.1s.
1007 self.shell_channel.shutdown(restart=restart)
193 # FIXME: this method is not yet implemented (need Control channel)
194 self._send_shutdown_request(restart=restart)
1008 195 for i in range(10):
1009 196 if self.is_alive():
1010 197 time.sleep(0.1)
1011 198 else:
1012 199 break
1013 200 else:
1014 201 # OK, we've waited long enough.
1015 202 if self.has_kernel:
1016 203 self._kill_kernel()
1017 204
1018 205 if not restart:
1019 206 self.cleanup_connection_file()
1020 207 self.cleanup_ipc_files()
1021 208 else:
1022 209 self.cleanup_ipc_files()
1023 210
1024 211 def restart_kernel(self, now=False, **kw):
1025 212 """Restarts a kernel with the arguments that were used to launch it.
1026 213
1027 214 If the old kernel was launched with random ports, the same ports will be
1028 215 used for the new kernel. The same connection file is used again.
1029 216
1030 217 Parameters
1031 218 ----------
1032 219 now : bool, optional
1033 220 If True, the kernel is forcefully restarted *immediately*, without
1034 221 having a chance to do any cleanup action. Otherwise the kernel is
1035 222 given 1s to clean up before a forceful restart is issued.
1036 223
1037 224 In all cases the kernel is restarted, the only difference is whether
1038 225 it is given a chance to perform a clean shutdown or not.
1039 226
1040 227 **kw : optional
1041 228 Any options specified here will overwrite those used to launch the
1042 229 kernel.
1043 230 """
1044 231 if self._launch_args is None:
1045 232 raise RuntimeError("Cannot restart the kernel. "
1046 233 "No previous call to 'start_kernel'.")
1047 234 else:
1048 235 # Stop currently running kernel.
1049 236 self.shutdown_kernel(now=now, restart=True)
1050 237
1051 238 # Start new kernel.
1052 239 self._launch_args.update(kw)
1053 240 self.start_kernel(**self._launch_args)
1054 241
1055 242 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1056 243 # unless there is some delay here.
1057 244 if sys.platform == 'win32':
1058 245 time.sleep(0.2)
1059 246
1060 247 @property
1061 248 def has_kernel(self):
1062 249 """Has a kernel been started that we are managing."""
1063 250 return self.kernel is not None
1064 251
1065 252 def _kill_kernel(self):
1066 253 """Kill the running kernel.
1067 254
1068 255 This is a private method, callers should use shutdown_kernel(now=True).
1069 256 """
1070 257 if self.has_kernel:
1071 258
1072 259 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1073 260 # TerminateProcess() on Win32).
1074 261 try:
1075 262 self.kernel.kill()
1076 263 except OSError as e:
1077 264 # In Windows, we will get an Access Denied error if the process
1078 265 # has already terminated. Ignore it.
1079 266 if sys.platform == 'win32':
1080 267 if e.winerror != 5:
1081 268 raise
1082 269 # On Unix, we may get an ESRCH error if the process has already
1083 270 # terminated. Ignore it.
1084 271 else:
1085 272 from errno import ESRCH
1086 273 if e.errno != ESRCH:
1087 274 raise
1088 275
1089 276 # Block until the kernel terminates.
1090 277 self.kernel.wait()
1091 278 self.kernel = None
1092 279 else:
1093 280 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1094 281
1095 282 def interrupt_kernel(self):
1096 283 """Interrupts the kernel by sending it a signal.
1097 284
1098 285 Unlike ``signal_kernel``, this operation is well supported on all
1099 286 platforms.
1100 287 """
1101 288 if self.has_kernel:
1102 289 if sys.platform == 'win32':
1103 290 from .zmq.parentpoller import ParentPollerWindows as Poller
1104 291 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1105 292 else:
1106 293 self.kernel.send_signal(signal.SIGINT)
1107 294 else:
1108 295 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1109 296
1110 297 def signal_kernel(self, signum):
1111 298 """Sends a signal to the kernel.
1112 299
1113 300 Note that since only SIGTERM is supported on Windows, this function is
1114 301 only useful on Unix systems.
1115 302 """
1116 303 if self.has_kernel:
1117 304 self.kernel.send_signal(signum)
1118 305 else:
1119 306 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1120 307
1121 308 def is_alive(self):
1122 309 """Is the kernel process still running?"""
1123 310 if self.has_kernel:
1124 311 if self.kernel.poll() is None:
1125 312 return True
1126 313 else:
1127 314 return False
1128 elif self._hb_channel is not None:
1129 # We didn't start the kernel with this KernelManager so we
1130 # use the heartbeat.
1131 return self._hb_channel.is_beating()
1132 315 else:
1133 # no heartbeat and not local, we can't tell if it's running,
1134 # so naively return True
1135 return True
316 # we don't have a kernel
317 return False
1136 318
1137 319
1138 320 #-----------------------------------------------------------------------------
1139 321 # ABC Registration
1140 322 #-----------------------------------------------------------------------------
1141 323
1142 ShellChannelABC.register(ShellChannel)
1143 IOPubChannelABC.register(IOPubChannel)
1144 HBChannelABC.register(HBChannel)
1145 StdInChannelABC.register(StdInChannel)
1146 324 KernelManagerABC.register(KernelManager)
325
@@ -1,274 +1,271 b''
1 1 """A kernel manager for multiple kernels
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2013 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import absolute_import
20 20
21 21 import os
22 22 import uuid
23 23
24 24 import zmq
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 from IPython.config.configurable import LoggingConfigurable
28 28 from IPython.utils.importstring import import_item
29 29 from IPython.utils.traitlets import (
30 30 Instance, Dict, Unicode, Any, DottedObjectName, Bool
31 31 )
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Classes
35 35 #-----------------------------------------------------------------------------
36 36
37 37 class DuplicateKernelError(Exception):
38 38 pass
39 39
40 40
41 41 class MultiKernelManager(LoggingConfigurable):
42 42 """A class for managing multiple kernels."""
43 43
44 44 kernel_manager_class = DottedObjectName(
45 45 "IPython.kernel.ioloop.IOLoopKernelManager", config=True,
46 46 help="""The kernel manager class. This is configurable to allow
47 47 subclassing of the KernelManager for customized behavior.
48 48 """
49 49 )
50 50 def _kernel_manager_class_changed(self, name, old, new):
51 51 self.kernel_manager_factory = import_item(new)
52 52
53 53 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
54 54 def _kernel_manager_factory_default(self):
55 55 return import_item(self.kernel_manager_class)
56 56
57 57 context = Instance('zmq.Context')
58 58 def _context_default(self):
59 59 return zmq.Context.instance()
60 60
61 61 connection_dir = Unicode('')
62 62
63 63 _kernels = Dict()
64 64
65 65 def list_kernel_ids(self):
66 66 """Return a list of the kernel ids of the active kernels."""
67 67 # Create a copy so we can iterate over kernels in operations
68 68 # that delete keys.
69 69 return list(self._kernels.keys())
70 70
71 71 def __len__(self):
72 72 """Return the number of running kernels."""
73 73 return len(self.list_kernel_ids())
74 74
75 75 def __contains__(self, kernel_id):
76 76 return kernel_id in self._kernels
77 77
78 78 def start_kernel(self, **kwargs):
79 79 """Start a new kernel.
80 80
81 81 The caller can pick a kernel_id by passing one in as a keyword arg,
82 82 otherwise one will be picked using a uuid.
83 83
84 84 To silence the kernel's stdout/stderr, call this using::
85 85
86 86 km.start_kernel(stdout=PIPE, stderr=PIPE)
87 87
88 88 """
89 89 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
90 90 if kernel_id in self:
91 91 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
92 92 # kernel_manager_factory is the constructor for the KernelManager
93 93 # subclass we are using. It can be configured as any Configurable,
94 94 # including things like its transport and ip.
95 95 km = self.kernel_manager_factory(connection_file=os.path.join(
96 96 self.connection_dir, "kernel-%s.json" % kernel_id),
97 97 config=self.config, autorestart=True, log=self.log
98 98 )
99 99 km.start_kernel(**kwargs)
100 # start just the shell channel, needed for graceful restart
101 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
102 100 self._kernels[kernel_id] = km
103 101 return kernel_id
104 102
105 103 def shutdown_kernel(self, kernel_id, now=False):
106 104 """Shutdown a kernel by its kernel uuid.
107 105
108 106 Parameters
109 107 ==========
110 108 kernel_id : uuid
111 109 The id of the kernel to shutdown.
112 110 now : bool
113 111 Should the kernel be shutdown forcibly using a signal.
114 112 """
115 113 k = self.get_kernel(kernel_id)
116 114 k.shutdown_kernel(now=now)
117 k.shell_channel.stop()
118 115 del self._kernels[kernel_id]
119 116
120 117 def shutdown_all(self, now=False):
121 118 """Shutdown all kernels."""
122 119 for kid in self.list_kernel_ids():
123 120 self.shutdown_kernel(kid, now=now)
124 121
125 122 def interrupt_kernel(self, kernel_id):
126 123 """Interrupt (SIGINT) the kernel by its uuid.
127 124
128 125 Parameters
129 126 ==========
130 127 kernel_id : uuid
131 128 The id of the kernel to interrupt.
132 129 """
133 130 return self.get_kernel(kernel_id).interrupt_kernel()
134 131
135 132 def signal_kernel(self, kernel_id, signum):
136 133 """Sends a signal to the kernel by its uuid.
137 134
138 135 Note that since only SIGTERM is supported on Windows, this function
139 136 is only useful on Unix systems.
140 137
141 138 Parameters
142 139 ==========
143 140 kernel_id : uuid
144 141 The id of the kernel to signal.
145 142 """
146 143 return self.get_kernel(kernel_id).signal_kernel(signum)
147 144
148 145 def restart_kernel(self, kernel_id):
149 146 """Restart a kernel by its uuid, keeping the same ports.
150 147
151 148 Parameters
152 149 ==========
153 150 kernel_id : uuid
154 151 The id of the kernel to interrupt.
155 152 """
156 153 km = self.get_kernel(kernel_id)
157 154 km.restart_kernel()
158 155
159 156 def is_alive(self, kernel_id):
160 157 """Is the kernel alive.
161 158
162 159 This calls KernelManager.is_alive() which calls Popen.poll on the
163 160 actual kernel subprocess.
164 161
165 162 Parameters
166 163 ==========
167 164 kernel_id : uuid
168 165 The id of the kernel.
169 166 """
170 167 return self.get_kernel(kernel_id).is_alive()
171 168
172 169 def get_kernel(self, kernel_id):
173 170 """Get the single KernelManager object for a kernel by its uuid.
174 171
175 172 Parameters
176 173 ==========
177 174 kernel_id : uuid
178 175 The id of the kernel.
179 176 """
180 177 km = self._kernels.get(kernel_id)
181 178 if km is not None:
182 179 return km
183 180 else:
184 181 raise KeyError("Kernel with id not found: %s" % kernel_id)
185 182
186 183 def get_connection_info(self, kernel_id):
187 184 """Return a dictionary of connection data for a kernel.
188 185
189 186 Parameters
190 187 ==========
191 188 kernel_id : uuid
192 189 The id of the kernel.
193 190
194 191 Returns
195 192 =======
196 193 connection_dict : dict
197 194 A dict of the information needed to connect to a kernel.
198 195 This includes the ip address and the integer port
199 196 numbers of the different channels (stdin_port, iopub_port,
200 197 shell_port, hb_port).
201 198 """
202 199 km = self.get_kernel(kernel_id)
203 200 return dict(transport=km.transport,
204 201 ip=km.ip,
205 202 shell_port=km.shell_port,
206 203 iopub_port=km.iopub_port,
207 204 stdin_port=km.stdin_port,
208 205 hb_port=km.hb_port,
209 206 )
210 207
211 208 def _make_url(self, transport, ip, port):
212 209 """Make a ZeroMQ URL for a given transport, ip and port."""
213 210 if transport == 'tcp':
214 211 return "tcp://%s:%i" % (ip, port)
215 212 else:
216 213 return "%s://%s-%s" % (transport, ip, port)
217 214
218 215 def _create_connected_stream(self, kernel_id, socket_type, channel):
219 216 """Create a connected ZMQStream for a kernel."""
220 217 cinfo = self.get_connection_info(kernel_id)
221 218 url = self._make_url(cinfo['transport'], cinfo['ip'],
222 219 cinfo['%s_port' % channel]
223 220 )
224 221 sock = self.context.socket(socket_type)
225 222 self.log.info("Connecting to: %s" % url)
226 223 sock.connect(url)
227 224 return ZMQStream(sock)
228 225
229 226 def create_iopub_stream(self, kernel_id):
230 227 """Return a ZMQStream object connected to the iopub channel.
231 228
232 229 Parameters
233 230 ==========
234 231 kernel_id : uuid
235 232 The id of the kernel.
236 233
237 234 Returns
238 235 =======
239 236 stream : ZMQStream
240 237 """
241 238 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub')
242 239 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
243 240 return iopub_stream
244 241
245 242 def create_shell_stream(self, kernel_id):
246 243 """Return a ZMQStream object connected to the shell channel.
247 244
248 245 Parameters
249 246 ==========
250 247 kernel_id : uuid
251 248 The id of the kernel.
252 249
253 250 Returns
254 251 =======
255 252 stream : ZMQStream
256 253 """
257 254 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell')
258 255 return shell_stream
259 256
260 257 def create_hb_stream(self, kernel_id):
261 258 """Return a ZMQStream object connected to the hb channel.
262 259
263 260 Parameters
264 261 ==========
265 262 kernel_id : uuid
266 263 The id of the kernel.
267 264
268 265 Returns
269 266 =======
270 267 stream : ZMQStream
271 268 """
272 269 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb')
273 270 return hb_stream
274 271
General Comments 0
You need to be logged in to leave comments. Login now