##// END OF EJS Templates
split KernelManager into KernelManager + KernelClient
MinRK -
Show More
@@ -0,0 +1,33 b''
1 """Implements a fully blocking kernel client.
2
3 Useful for test suites and blocking terminal interfaces.
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
15
16 from IPython.utils.traitlets import Type
17 from IPython.kernel.client import KernelClient
18 from .channels import (
19 BlockingIOPubChannel, BlockingHBChannel,
20 BlockingShellChannel, BlockingStdInChannel
21 )
22
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
26
27 class BlockingKernelClient(KernelClient):
28
29 # The classes to use for the various channels
30 shell_channel_class = Type(BlockingShellChannel)
31 iopub_channel_class = Type(BlockingIOPubChannel)
32 stdin_channel_class = Type(BlockingStdInChannel)
33 hb_channel_class = Type(BlockingHBChannel)
@@ -0,0 +1,193 b''
1 """Abstract base classes for kernel client channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.channels.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.channels.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.channels.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.channels.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.channels.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
This diff has been collapsed as it changes many lines, (638 lines changed) Show them Hide them
@@ -0,0 +1,638 b''
1 """Base classes to manage a Client's interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 # Standard library imports
18 import atexit
19 import errno
20 from threading import Thread
21 import time
22
23 import zmq
24 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
25 # during garbage collection of threads at exit:
26 from zmq import ZMQError
27 from zmq.eventloop import ioloop, zmqstream
28
29 # Local imports
30 from .channelabc import (
31 ShellChannelABC, IOPubChannelABC,
32 HBChannelABC, StdInChannelABC,
33 )
34
35 #-----------------------------------------------------------------------------
36 # Constants and exceptions
37 #-----------------------------------------------------------------------------
38
39 class InvalidPortNumber(Exception):
40 pass
41
42 #-----------------------------------------------------------------------------
43 # Utility functions
44 #-----------------------------------------------------------------------------
45
46 # some utilities to validate message structure, these might get moved elsewhere
47 # if they prove to have more generic utility
48
49 def validate_string_list(lst):
50 """Validate that the input is a list of strings.
51
52 Raises ValueError if not."""
53 if not isinstance(lst, list):
54 raise ValueError('input %r must be a list' % lst)
55 for x in lst:
56 if not isinstance(x, basestring):
57 raise ValueError('element %r in list must be a string' % x)
58
59
60 def validate_string_dict(dct):
61 """Validate that the input is a dict with string keys and values.
62
63 Raises ValueError if not."""
64 for k,v in dct.iteritems():
65 if not isinstance(k, basestring):
66 raise ValueError('key %r in dict must be a string' % k)
67 if not isinstance(v, basestring):
68 raise ValueError('value %r in dict must be a string' % v)
69
70
71 #-----------------------------------------------------------------------------
72 # ZMQ Socket Channel classes
73 #-----------------------------------------------------------------------------
74
75 class ZMQSocketChannel(Thread):
76 """The base class for the channels that use ZMQ sockets."""
77 context = None
78 session = None
79 socket = None
80 ioloop = None
81 stream = None
82 _address = None
83 _exiting = False
84
85 def __init__(self, context, session, address):
86 """Create a channel.
87
88 Parameters
89 ----------
90 context : :class:`zmq.Context`
91 The ZMQ context to use.
92 session : :class:`session.Session`
93 The session to use.
94 address : zmq url
95 Standard (ip, port) tuple that the kernel is listening on.
96 """
97 super(ZMQSocketChannel, self).__init__()
98 self.daemon = True
99
100 self.context = context
101 self.session = session
102 if isinstance(address, tuple):
103 if address[1] == 0:
104 message = 'The port number for a channel cannot be 0.'
105 raise InvalidPortNumber(message)
106 address = "tcp://%s:%i" % address
107 self._address = address
108 atexit.register(self._notice_exit)
109
110 def _notice_exit(self):
111 self._exiting = True
112
113 def _run_loop(self):
114 """Run my loop, ignoring EINTR events in the poller"""
115 while True:
116 try:
117 self.ioloop.start()
118 except ZMQError as e:
119 if e.errno == errno.EINTR:
120 continue
121 else:
122 raise
123 except Exception:
124 if self._exiting:
125 break
126 else:
127 raise
128 else:
129 break
130
131 def stop(self):
132 """Stop the channel's event loop and join its thread.
133
134 This calls :method:`Thread.join` and returns when the thread
135 terminates. :class:`RuntimeError` will be raised if
136 :method:`self.start` is called again.
137 """
138 self.join()
139
140 @property
141 def address(self):
142 """Get the channel's address as a zmq url string.
143
144 These URLS have the form: 'tcp://127.0.0.1:5555'.
145 """
146 return self._address
147
148 def _queue_send(self, msg):
149 """Queue a message to be sent from the IOLoop's thread.
150
151 Parameters
152 ----------
153 msg : message to send
154
155 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
156 thread control of the action.
157 """
158 def thread_send():
159 self.session.send(self.stream, msg)
160 self.ioloop.add_callback(thread_send)
161
162 def _handle_recv(self, msg):
163 """Callback for stream.on_recv.
164
165 Unpacks message, and calls handlers with it.
166 """
167 ident,smsg = self.session.feed_identities(msg)
168 self.call_handlers(self.session.unserialize(smsg))
169
170
171
172 class ShellChannel(ZMQSocketChannel):
173 """The shell channel for issuing request/replies to the kernel."""
174
175 command_queue = None
176 # flag for whether execute requests should be allowed to call raw_input:
177 allow_stdin = True
178
179 def __init__(self, context, session, address):
180 super(ShellChannel, self).__init__(context, session, address)
181 self.ioloop = ioloop.IOLoop()
182
183 def run(self):
184 """The thread's main activity. Call start() instead."""
185 self.socket = self.context.socket(zmq.DEALER)
186 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
187 self.socket.connect(self.address)
188 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
189 self.stream.on_recv(self._handle_recv)
190 self._run_loop()
191 try:
192 self.socket.close()
193 except:
194 pass
195
196 def stop(self):
197 """Stop the channel's event loop and join its thread."""
198 self.ioloop.stop()
199 super(ShellChannel, self).stop()
200
201 def call_handlers(self, msg):
202 """This method is called in the ioloop thread when a message arrives.
203
204 Subclasses should override this method to handle incoming messages.
205 It is important to remember that this method is called in the thread
206 so that some logic must be done to ensure that the application leve
207 handlers are called in the application thread.
208 """
209 raise NotImplementedError('call_handlers must be defined in a subclass.')
210
211 def execute(self, code, silent=False, store_history=True,
212 user_variables=None, user_expressions=None, allow_stdin=None):
213 """Execute code in the kernel.
214
215 Parameters
216 ----------
217 code : str
218 A string of Python code.
219
220 silent : bool, optional (default False)
221 If set, the kernel will execute the code as quietly possible, and
222 will force store_history to be False.
223
224 store_history : bool, optional (default True)
225 If set, the kernel will store command history. This is forced
226 to be False if silent is True.
227
228 user_variables : list, optional
229 A list of variable names to pull from the user's namespace. They
230 will come back as a dict with these names as keys and their
231 :func:`repr` as values.
232
233 user_expressions : dict, optional
234 A dict mapping names to expressions to be evaluated in the user's
235 dict. The expression values are returned as strings formatted using
236 :func:`repr`.
237
238 allow_stdin : bool, optional (default self.allow_stdin)
239 Flag for whether the kernel can send stdin requests to frontends.
240
241 Some frontends (e.g. the Notebook) do not support stdin requests.
242 If raw_input is called from code executed from such a frontend, a
243 StdinNotImplementedError will be raised.
244
245 Returns
246 -------
247 The msg_id of the message sent.
248 """
249 if user_variables is None:
250 user_variables = []
251 if user_expressions is None:
252 user_expressions = {}
253 if allow_stdin is None:
254 allow_stdin = self.allow_stdin
255
256
257 # Don't waste network traffic if inputs are invalid
258 if not isinstance(code, basestring):
259 raise ValueError('code %r must be a string' % code)
260 validate_string_list(user_variables)
261 validate_string_dict(user_expressions)
262
263 # Create class for content/msg creation. Related to, but possibly
264 # not in Session.
265 content = dict(code=code, silent=silent, store_history=store_history,
266 user_variables=user_variables,
267 user_expressions=user_expressions,
268 allow_stdin=allow_stdin,
269 )
270 msg = self.session.msg('execute_request', content)
271 self._queue_send(msg)
272 return msg['header']['msg_id']
273
274 def complete(self, text, line, cursor_pos, block=None):
275 """Tab complete text in the kernel's namespace.
276
277 Parameters
278 ----------
279 text : str
280 The text to complete.
281 line : str
282 The full line of text that is the surrounding context for the
283 text to complete.
284 cursor_pos : int
285 The position of the cursor in the line where the completion was
286 requested.
287 block : str, optional
288 The full block of code in which the completion is being requested.
289
290 Returns
291 -------
292 The msg_id of the message sent.
293 """
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 msg = self.session.msg('complete_request', content)
296 self._queue_send(msg)
297 return msg['header']['msg_id']
298
299 def object_info(self, oname, detail_level=0):
300 """Get metadata information about an object in the kernel's namespace.
301
302 Parameters
303 ----------
304 oname : str
305 A string specifying the object name.
306 detail_level : int, optional
307 The level of detail for the introspection (0-2)
308
309 Returns
310 -------
311 The msg_id of the message sent.
312 """
313 content = dict(oname=oname, detail_level=detail_level)
314 msg = self.session.msg('object_info_request', content)
315 self._queue_send(msg)
316 return msg['header']['msg_id']
317
318 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
319 """Get entries from the kernel's history list.
320
321 Parameters
322 ----------
323 raw : bool
324 If True, return the raw input.
325 output : bool
326 If True, then return the output as well.
327 hist_access_type : str
328 'range' (fill in session, start and stop params), 'tail' (fill in n)
329 or 'search' (fill in pattern param).
330
331 session : int
332 For a range request, the session from which to get lines. Session
333 numbers are positive integers; negative ones count back from the
334 current session.
335 start : int
336 The first line number of a history range.
337 stop : int
338 The final (excluded) line number of a history range.
339
340 n : int
341 The number of lines of history to get for a tail request.
342
343 pattern : str
344 The glob-syntax pattern for a search request.
345
346 Returns
347 -------
348 The msg_id of the message sent.
349 """
350 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
351 **kwargs)
352 msg = self.session.msg('history_request', content)
353 self._queue_send(msg)
354 return msg['header']['msg_id']
355
356 def kernel_info(self):
357 """Request kernel info."""
358 msg = self.session.msg('kernel_info_request')
359 self._queue_send(msg)
360 return msg['header']['msg_id']
361
362 def shutdown(self, restart=False):
363 """Request an immediate kernel shutdown.
364
365 Upon receipt of the (empty) reply, client code can safely assume that
366 the kernel has shut down and it's safe to forcefully terminate it if
367 it's still alive.
368
369 The kernel will send the reply via a function registered with Python's
370 atexit module, ensuring it's truly done as the kernel is done with all
371 normal operation.
372 """
373 # Send quit message to kernel. Once we implement kernel-side setattr,
374 # this should probably be done that way, but for now this will do.
375 msg = self.session.msg('shutdown_request', {'restart':restart})
376 self._queue_send(msg)
377 return msg['header']['msg_id']
378
379
380
381 class IOPubChannel(ZMQSocketChannel):
382 """The iopub channel which listens for messages that the kernel publishes.
383
384 This channel is where all output is published to frontends.
385 """
386
387 def __init__(self, context, session, address):
388 super(IOPubChannel, self).__init__(context, session, address)
389 self.ioloop = ioloop.IOLoop()
390
391 def run(self):
392 """The thread's main activity. Call start() instead."""
393 self.socket = self.context.socket(zmq.SUB)
394 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
395 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
396 self.socket.connect(self.address)
397 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
398 self.stream.on_recv(self._handle_recv)
399 self._run_loop()
400 try:
401 self.socket.close()
402 except:
403 pass
404
405 def stop(self):
406 """Stop the channel's event loop and join its thread."""
407 self.ioloop.stop()
408 super(IOPubChannel, self).stop()
409
410 def call_handlers(self, msg):
411 """This method is called in the ioloop thread when a message arrives.
412
413 Subclasses should override this method to handle incoming messages.
414 It is important to remember that this method is called in the thread
415 so that some logic must be done to ensure that the application leve
416 handlers are called in the application thread.
417 """
418 raise NotImplementedError('call_handlers must be defined in a subclass.')
419
420 def flush(self, timeout=1.0):
421 """Immediately processes all pending messages on the iopub channel.
422
423 Callers should use this method to ensure that :method:`call_handlers`
424 has been called for all messages that have been received on the
425 0MQ SUB socket of this channel.
426
427 This method is thread safe.
428
429 Parameters
430 ----------
431 timeout : float, optional
432 The maximum amount of time to spend flushing, in seconds. The
433 default is one second.
434 """
435 # We do the IOLoop callback process twice to ensure that the IOLoop
436 # gets to perform at least one full poll.
437 stop_time = time.time() + timeout
438 for i in xrange(2):
439 self._flushed = False
440 self.ioloop.add_callback(self._flush)
441 while not self._flushed and time.time() < stop_time:
442 time.sleep(0.01)
443
444 def _flush(self):
445 """Callback for :method:`self.flush`."""
446 self.stream.flush()
447 self._flushed = True
448
449
450 class StdInChannel(ZMQSocketChannel):
451 """The stdin channel to handle raw_input requests that the kernel makes."""
452
453 msg_queue = None
454
455 def __init__(self, context, session, address):
456 super(StdInChannel, self).__init__(context, session, address)
457 self.ioloop = ioloop.IOLoop()
458
459 def run(self):
460 """The thread's main activity. Call start() instead."""
461 self.socket = self.context.socket(zmq.DEALER)
462 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
463 self.socket.connect(self.address)
464 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
465 self.stream.on_recv(self._handle_recv)
466 self._run_loop()
467 try:
468 self.socket.close()
469 except:
470 pass
471
472 def stop(self):
473 """Stop the channel's event loop and join its thread."""
474 self.ioloop.stop()
475 super(StdInChannel, self).stop()
476
477 def call_handlers(self, msg):
478 """This method is called in the ioloop thread when a message arrives.
479
480 Subclasses should override this method to handle incoming messages.
481 It is important to remember that this method is called in the thread
482 so that some logic must be done to ensure that the application leve
483 handlers are called in the application thread.
484 """
485 raise NotImplementedError('call_handlers must be defined in a subclass.')
486
487 def input(self, string):
488 """Send a string of raw input to the kernel."""
489 content = dict(value=string)
490 msg = self.session.msg('input_reply', content)
491 self._queue_send(msg)
492
493
494 class HBChannel(ZMQSocketChannel):
495 """The heartbeat channel which monitors the kernel heartbeat.
496
497 Note that the heartbeat channel is paused by default. As long as you start
498 this channel, the kernel manager will ensure that it is paused and un-paused
499 as appropriate.
500 """
501
502 time_to_dead = 3.0
503 socket = None
504 poller = None
505 _running = None
506 _pause = None
507 _beating = None
508
509 def __init__(self, context, session, address):
510 super(HBChannel, self).__init__(context, session, address)
511 self._running = False
512 self._pause =True
513 self.poller = zmq.Poller()
514
515 def _create_socket(self):
516 if self.socket is not None:
517 # close previous socket, before opening a new one
518 self.poller.unregister(self.socket)
519 self.socket.close()
520 self.socket = self.context.socket(zmq.REQ)
521 self.socket.setsockopt(zmq.LINGER, 0)
522 self.socket.connect(self.address)
523
524 self.poller.register(self.socket, zmq.POLLIN)
525
526 def _poll(self, start_time):
527 """poll for heartbeat replies until we reach self.time_to_dead.
528
529 Ignores interrupts, and returns the result of poll(), which
530 will be an empty list if no messages arrived before the timeout,
531 or the event tuple if there is a message to receive.
532 """
533
534 until_dead = self.time_to_dead - (time.time() - start_time)
535 # ensure poll at least once
536 until_dead = max(until_dead, 1e-3)
537 events = []
538 while True:
539 try:
540 events = self.poller.poll(1000 * until_dead)
541 except ZMQError as e:
542 if e.errno == errno.EINTR:
543 # ignore interrupts during heartbeat
544 # this may never actually happen
545 until_dead = self.time_to_dead - (time.time() - start_time)
546 until_dead = max(until_dead, 1e-3)
547 pass
548 else:
549 raise
550 except Exception:
551 if self._exiting:
552 break
553 else:
554 raise
555 else:
556 break
557 return events
558
559 def run(self):
560 """The thread's main activity. Call start() instead."""
561 self._create_socket()
562 self._running = True
563 self._beating = True
564
565 while self._running:
566 if self._pause:
567 # just sleep, and skip the rest of the loop
568 time.sleep(self.time_to_dead)
569 continue
570
571 since_last_heartbeat = 0.0
572 # io.rprint('Ping from HB channel') # dbg
573 # no need to catch EFSM here, because the previous event was
574 # either a recv or connect, which cannot be followed by EFSM
575 self.socket.send(b'ping')
576 request_time = time.time()
577 ready = self._poll(request_time)
578 if ready:
579 self._beating = True
580 # the poll above guarantees we have something to recv
581 self.socket.recv()
582 # sleep the remainder of the cycle
583 remainder = self.time_to_dead - (time.time() - request_time)
584 if remainder > 0:
585 time.sleep(remainder)
586 continue
587 else:
588 # nothing was received within the time limit, signal heart failure
589 self._beating = False
590 since_last_heartbeat = time.time() - request_time
591 self.call_handlers(since_last_heartbeat)
592 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 self._create_socket()
594 continue
595 try:
596 self.socket.close()
597 except:
598 pass
599
600 def pause(self):
601 """Pause the heartbeat."""
602 self._pause = True
603
604 def unpause(self):
605 """Unpause the heartbeat."""
606 self._pause = False
607
608 def is_beating(self):
609 """Is the heartbeat running and responsive (and not paused)."""
610 if self.is_alive() and not self._pause and self._beating:
611 return True
612 else:
613 return False
614
615 def stop(self):
616 """Stop the channel's event loop and join its thread."""
617 self._running = False
618 super(HBChannel, self).stop()
619
620 def call_handlers(self, since_last_heartbeat):
621 """This method is called in the ioloop thread when a message arrives.
622
623 Subclasses should override this method to handle incoming messages.
624 It is important to remember that this method is called in the thread
625 so that some logic must be done to ensure that the application level
626 handlers are called in the application thread.
627 """
628 raise NotImplementedError('call_handlers must be defined in a subclass.')
629
630
631 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
632 # ABC Registration
633 #-----------------------------------------------------------------------------
634
635 ShellChannelABC.register(ShellChannel)
636 IOPubChannelABC.register(IOPubChannel)
637 HBChannelABC.register(HBChannel)
638 StdInChannelABC.register(StdInChannel)
@@ -0,0 +1,182 b''
1 """Base class to manage the interaction with a running kernel
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from __future__ import absolute_import
16
17 import zmq
18
19 # Local imports
20 from IPython.config.configurable import LoggingConfigurable
21 from IPython.utils.traitlets import (
22 Any, Instance, Type,
23 )
24
25 from .zmq.session import Session
26 from .channels import (
27 ShellChannel, IOPubChannel,
28 HBChannel, StdInChannel,
29 )
30 from .clientabc import KernelClientABC
31 from .connect import ConnectionFileMixin
32
33
34 #-----------------------------------------------------------------------------
35 # Main kernel client class
36 #-----------------------------------------------------------------------------
37
38 class KernelClient(LoggingConfigurable, ConnectionFileMixin):
39 """Communicates with a single kernel on any host via zmq channels.
40
41 There are four channels associated with each kernel:
42
43 * shell: for request/reply calls to the kernel.
44 * iopub: for the kernel to publish results to frontends.
45 * hb: for monitoring the kernel's heartbeat.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
47
48 """
49
50 # The PyZMQ Context to use for communication with the kernel.
51 context = Instance(zmq.Context)
52 def _context_default(self):
53 return zmq.Context.instance()
54
55 # The Session to use for communication with the kernel.
56 session = Instance(Session)
57 def _session_default(self):
58 return Session(config=self.config)
59
60 # The classes to use for the various channels
61 shell_channel_class = Type(ShellChannel)
62 iopub_channel_class = Type(IOPubChannel)
63 stdin_channel_class = Type(StdInChannel)
64 hb_channel_class = Type(HBChannel)
65
66 # Protected traits
67 _shell_channel = Any
68 _iopub_channel = Any
69 _stdin_channel = Any
70 _hb_channel = Any
71
72 #--------------------------------------------------------------------------
73 # Channel management methods
74 #--------------------------------------------------------------------------
75
76 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
77 """Starts the channels for this kernel.
78
79 This will create the channels if they do not exist and then start
80 them (their activity runs in a thread). If port numbers of 0 are
81 being used (random ports) then you must first call
82 :method:`start_kernel`. If the channels have been stopped and you
83 call this, :class:`RuntimeError` will be raised.
84 """
85 if shell:
86 self.shell_channel.start()
87 if iopub:
88 self.iopub_channel.start()
89 if stdin:
90 self.stdin_channel.start()
91 self.shell_channel.allow_stdin = True
92 else:
93 self.shell_channel.allow_stdin = False
94 if hb:
95 self.hb_channel.start()
96
97 def stop_channels(self):
98 """Stops all the running channels for this kernel.
99
100 This stops their event loops and joins their threads.
101 """
102 if self.shell_channel.is_alive():
103 self.shell_channel.stop()
104 if self.iopub_channel.is_alive():
105 self.iopub_channel.stop()
106 if self.stdin_channel.is_alive():
107 self.stdin_channel.stop()
108 if self.hb_channel.is_alive():
109 self.hb_channel.stop()
110
111 @property
112 def channels_running(self):
113 """Are any of the channels created and running?"""
114 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
115 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
116
117 def _make_url(self, port):
118 """Make a zmq url with a port.
119
120 There are two cases that this handles:
121
122 * tcp: tcp://ip:port
123 * ipc: ipc://ip-port
124 """
125 if self.transport == 'tcp':
126 return "tcp://%s:%i" % (self.ip, port)
127 else:
128 return "%s://%s-%s" % (self.transport, self.ip, port)
129
130 @property
131 def shell_channel(self):
132 """Get the shell channel object for this kernel."""
133 if self._shell_channel is None:
134 self._shell_channel = self.shell_channel_class(
135 self.context, self.session, self._make_url(self.shell_port)
136 )
137 return self._shell_channel
138
139 @property
140 def iopub_channel(self):
141 """Get the iopub channel object for this kernel."""
142 if self._iopub_channel is None:
143 self._iopub_channel = self.iopub_channel_class(
144 self.context, self.session, self._make_url(self.iopub_port)
145 )
146 return self._iopub_channel
147
148 @property
149 def stdin_channel(self):
150 """Get the stdin channel object for this kernel."""
151 if self._stdin_channel is None:
152 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, self._make_url(self.stdin_port)
154 )
155 return self._stdin_channel
156
157 @property
158 def hb_channel(self):
159 """Get the hb channel object for this kernel."""
160 if self._hb_channel is None:
161 self._hb_channel = self.hb_channel_class(
162 self.context, self.session, self._make_url(self.hb_port)
163 )
164 return self._hb_channel
165
166 def is_alive(self):
167 """Is the kernel process still running?"""
168 if self._hb_channel is not None:
169 # We didn't start the kernel with this KernelManager so we
170 # use the heartbeat.
171 return self._hb_channel.is_beating()
172 else:
173 # no heartbeat and not local, we can't tell if it's running,
174 # so naively return True
175 return True
176
177
178 #-----------------------------------------------------------------------------
179 # ABC Registration
180 #-----------------------------------------------------------------------------
181
182 KernelClientABC.register(KernelClient)
@@ -0,0 +1,193 b''
1 """Abstract base classes for kernel clients and channels"""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14 # Standard library imports
15 import abc
16
17 #-----------------------------------------------------------------------------
18 # Channels
19 #-----------------------------------------------------------------------------
20
21
22 class ChannelABC(object):
23 """A base class for all channel ABCs."""
24
25 __metaclass__ = abc.ABCMeta
26
27 @abc.abstractmethod
28 def start(self):
29 pass
30
31 @abc.abstractmethod
32 def stop(self):
33 pass
34
35 @abc.abstractmethod
36 def is_alive(self):
37 pass
38
39
40 class ShellChannelABC(ChannelABC):
41 """ShellChannel ABC.
42
43 The docstrings for this class can be found in the base implementation:
44
45 `IPython.kernel.kernelmanager.ShellChannel`
46 """
47
48 @abc.abstractproperty
49 def allow_stdin(self):
50 pass
51
52 @abc.abstractmethod
53 def execute(self, code, silent=False, store_history=True,
54 user_variables=None, user_expressions=None, allow_stdin=None):
55 pass
56
57 @abc.abstractmethod
58 def complete(self, text, line, cursor_pos, block=None):
59 pass
60
61 @abc.abstractmethod
62 def object_info(self, oname, detail_level=0):
63 pass
64
65 @abc.abstractmethod
66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 pass
68
69 @abc.abstractmethod
70 def kernel_info(self):
71 pass
72
73 @abc.abstractmethod
74 def shutdown(self, restart=False):
75 pass
76
77
78 class IOPubChannelABC(ChannelABC):
79 """IOPubChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.kernelmanager.IOPubChannel`
84 """
85
86 @abc.abstractmethod
87 def flush(self, timeout=1.0):
88 pass
89
90
91 class StdInChannelABC(ChannelABC):
92 """StdInChannel ABC.
93
94 The docstrings for this class can be found in the base implementation:
95
96 `IPython.kernel.kernelmanager.StdInChannel`
97 """
98
99 @abc.abstractmethod
100 def input(self, string):
101 pass
102
103
104 class HBChannelABC(ChannelABC):
105 """HBChannel ABC.
106
107 The docstrings for this class can be found in the base implementation:
108
109 `IPython.kernel.kernelmanager.HBChannel`
110 """
111
112 @abc.abstractproperty
113 def time_to_dead(self):
114 pass
115
116 @abc.abstractmethod
117 def pause(self):
118 pass
119
120 @abc.abstractmethod
121 def unpause(self):
122 pass
123
124 @abc.abstractmethod
125 def is_beating(self):
126 pass
127
128
129 #-----------------------------------------------------------------------------
130 # Main kernel manager class
131 #-----------------------------------------------------------------------------
132
133 class KernelClientABC(object):
134 """KernelManager ABC.
135
136 The docstrings for this class can be found in the base implementation:
137
138 `IPython.kernel.kernelmanager.KernelClient`
139 """
140
141 __metaclass__ = abc.ABCMeta
142
143 @abc.abstractproperty
144 def kernel(self):
145 pass
146
147 @abc.abstractproperty
148 def shell_channel_class(self):
149 pass
150
151 @abc.abstractproperty
152 def iopub_channel_class(self):
153 pass
154
155 @abc.abstractproperty
156 def hb_channel_class(self):
157 pass
158
159 @abc.abstractproperty
160 def stdin_channel_class(self):
161 pass
162
163 #--------------------------------------------------------------------------
164 # Channel management methods
165 #--------------------------------------------------------------------------
166
167 @abc.abstractmethod
168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 pass
170
171 @abc.abstractmethod
172 def stop_channels(self):
173 pass
174
175 @abc.abstractproperty
176 def channels_running(self):
177 pass
178
179 @abc.abstractproperty
180 def shell_channel(self):
181 pass
182
183 @abc.abstractproperty
184 def iopub_channel(self):
185 pass
186
187 @abc.abstractproperty
188 def stdin_channel(self):
189 pass
190
191 @abc.abstractproperty
192 def hb_channel(self):
193 pass
@@ -6,5 +6,5 b' from . import zmq'
6 6 from .connect import *
7 7 from .launcher import *
8 8 from .manager import KernelManager
9 from .blocking import BlockingKernelManager
9 from .blocking import BlockingKernelClient
10 10 from .multikernelmanager import MultiKernelManager
@@ -1,1 +1,1 b''
1 from .manager import BlockingKernelManager No newline at end of file
1 from .client import BlockingKernelClient No newline at end of file
@@ -1,9 +1,9 b''
1 """ Implements a fully blocking kernel manager.
1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2012 The IPython Development Team
6 # Copyright (C) 2013 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
@@ -15,8 +15,7 b' Useful for test suites and blocking terminal interfaces.'
15 15
16 16 import Queue
17 17
18 from IPython.utils.traitlets import Type
19 from IPython.kernel.manager import KernelManager, IOPubChannel, HBChannel, \
18 from IPython.kernel.channels import IOPubChannel, HBChannel, \
20 19 ShellChannel, StdInChannel
21 20
22 21 #-----------------------------------------------------------------------------
@@ -78,12 +77,3 b' class BlockingHBChannel(HBChannel):'
78 77 def call_handlers(self, since_last_heartbeat):
79 78 """ Pause beating on missed heartbeat. """
80 79 pass
81
82
83 class BlockingKernelManager(KernelManager):
84
85 # The classes to use for the various channels.
86 shell_channel_class = Type(BlockingShellChannel)
87 iopub_channel_class = Type(BlockingIOPubChannel)
88 stdin_channel_class = Type(BlockingStdInChannel)
89 hb_channel_class = Type(BlockingHBChannel)
@@ -30,10 +30,15 b' import tempfile'
30 30 from IPython.external.ssh import tunnel
31 31
32 32 # IPython imports
33 # from IPython.config import Configurable
33 34 from IPython.core.profiledir import ProfileDir
34 35 from IPython.utils.localinterfaces import LOCALHOST
35 36 from IPython.utils.path import filefind, get_ipython_dir
36 37 from IPython.utils.py3compat import str_to_bytes, bytes_to_str
38 from IPython.utils.traitlets import (
39 Bool, Integer, Unicode, CaselessStrEnum,
40 HasTraits,
41 )
37 42
38 43
39 44 #-----------------------------------------------------------------------------
@@ -337,6 +342,106 b' def tunnel_to_kernel(connection_info, sshserver, sshkey=None):'
337 342
338 343 return tuple(lports)
339 344
345
346 #-----------------------------------------------------------------------------
347 # Mixin for classes that workw ith connection files
348 #-----------------------------------------------------------------------------
349
350 class ConnectionFileMixin(HasTraits):
351 """Mixin for configurable classes that work with connection files"""
352
353 # The addresses for the communication channels
354 connection_file = Unicode('')
355 _connection_file_written = Bool(False)
356
357 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
358
359 ip = Unicode(LOCALHOST, config=True,
360 help="""Set the kernel\'s IP address [default localhost].
361 If the IP address is something other than localhost, then
362 Consoles on other machines will be able to connect
363 to the Kernel, so be careful!"""
364 )
365
366 def _ip_default(self):
367 if self.transport == 'ipc':
368 if self.connection_file:
369 return os.path.splitext(self.connection_file)[0] + '-ipc'
370 else:
371 return 'kernel-ipc'
372 else:
373 return LOCALHOST
374
375 def _ip_changed(self, name, old, new):
376 if new == '*':
377 self.ip = '0.0.0.0'
378
379 # protected traits
380
381 shell_port = Integer(0)
382 iopub_port = Integer(0)
383 stdin_port = Integer(0)
384 hb_port = Integer(0)
385
386 #--------------------------------------------------------------------------
387 # Connection and ipc file management
388 #--------------------------------------------------------------------------
389
390 def cleanup_connection_file(self):
391 """Cleanup connection file *if we wrote it*
392
393 Will not raise if the connection file was already removed somehow.
394 """
395 if self._connection_file_written:
396 # cleanup connection files on full shutdown of kernel we started
397 self._connection_file_written = False
398 try:
399 os.remove(self.connection_file)
400 except (IOError, OSError, AttributeError):
401 pass
402
403 def cleanup_ipc_files(self):
404 """Cleanup ipc files if we wrote them."""
405 if self.transport != 'ipc':
406 return
407 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
408 ipcfile = "%s-%i" % (self.ip, port)
409 try:
410 os.remove(ipcfile)
411 except (IOError, OSError):
412 pass
413
414 def write_connection_file(self):
415 """Write connection info to JSON dict in self.connection_file."""
416 if self._connection_file_written:
417 return
418 self.connection_file,cfg = write_connection_file(self.connection_file,
419 transport=self.transport, ip=self.ip, key=self.session.key,
420 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
421 shell_port=self.shell_port, hb_port=self.hb_port)
422 # write_connection_file also sets default ports:
423 self.shell_port = cfg['shell_port']
424 self.stdin_port = cfg['stdin_port']
425 self.iopub_port = cfg['iopub_port']
426 self.hb_port = cfg['hb_port']
427
428 self._connection_file_written = True
429
430 def load_connection_file(self):
431 """Load connection info from JSON dict in self.connection_file."""
432 with open(self.connection_file) as f:
433 cfg = json.loads(f.read())
434
435 self.transport = cfg.get('transport', 'tcp')
436 self.ip = cfg['ip']
437 self.shell_port = cfg['shell_port']
438 self.stdin_port = cfg['stdin_port']
439 self.iopub_port = cfg['iopub_port']
440 self.hb_port = cfg['hb_port']
441 self.session.key = str_to_bytes(cfg['key'])
442
443
444
340 445 __all__ = [
341 446 'write_connection_file',
342 447 'get_connection_file',
@@ -1,4 +1,4 b''
1 """A kernel manager with ioloop based logic."""
1 """A kernel manager with a tornado IOLoop"""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
@@ -20,14 +20,14 b' from IPython.utils.traitlets import ('
20 20 Instance
21 21 )
22 22
23 from IPython.kernel.blocking.manager import BlockingKernelManager
23 from IPython.kernel.manager import KernelManager
24 24 from .restarter import IOLoopKernelRestarter
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Code
28 28 #-----------------------------------------------------------------------------
29 29
30 class IOLoopKernelManager(BlockingKernelManager):
30 class IOLoopKernelManager(KernelManager):
31 31
32 32 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
33 33 def _loop_default(self):
This diff has been collapsed as it changes many lines, (869 lines changed) Show them Hide them
@@ -1,11 +1,8 b''
1 """Base classes to manage the interaction with a running kernel.
2
3 TODO
4 * Create logger to handle debugging and console messages.
1 """Base class to manage a running kernel
5 2 """
6 3
7 4 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
5 # Copyright (C) 2013 The IPython Development Team
9 6 #
10 7 # Distributed under the terms of the BSD License. The full license is in
11 8 # the file COPYING, distributed as part of this software.
@@ -18,659 +15,38 b' TODO'
18 15 from __future__ import absolute_import
19 16
20 17 # Standard library imports
21 import atexit
22 import errno
23 import json
24 import os
25 18 import signal
26 19 import sys
27 from threading import Thread
28 20 import time
29 21
30 22 import zmq
31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 # during garbage collection of threads at exit:
33 from zmq import ZMQError
34 from zmq.eventloop import ioloop, zmqstream
35 23
36 24 # Local imports
37 from IPython.config.configurable import Configurable
38 from IPython.utils.importstring import import_item
39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.localinterfaces import LOCAL_IPS
40 27 from IPython.utils.traitlets import (
41 Any, Instance, Type, Unicode, List, Integer, Bool,
42 CaselessStrEnum, DottedObjectName
28 Any, Instance, Unicode, List, Bool,
43 29 )
44 from IPython.utils.py3compat import str_to_bytes
45 30 from IPython.kernel import (
46 write_connection_file,
47 31 make_ipkernel_cmd,
48 32 launch_kernel,
49 33 )
34 from .connect import ConnectionFileMixin
50 35 from .zmq.session import Session
51 36 from .managerabc import (
52 ShellChannelABC, IOPubChannelABC,
53 HBChannelABC, StdInChannelABC,
54 37 KernelManagerABC
55 38 )
56 39
57 40 #-----------------------------------------------------------------------------
58 # Constants and exceptions
59 #-----------------------------------------------------------------------------
60
61 class InvalidPortNumber(Exception):
62 pass
63
64 #-----------------------------------------------------------------------------
65 # Utility functions
66 #-----------------------------------------------------------------------------
67
68 # some utilities to validate message structure, these might get moved elsewhere
69 # if they prove to have more generic utility
70
71 def validate_string_list(lst):
72 """Validate that the input is a list of strings.
73
74 Raises ValueError if not."""
75 if not isinstance(lst, list):
76 raise ValueError('input %r must be a list' % lst)
77 for x in lst:
78 if not isinstance(x, basestring):
79 raise ValueError('element %r in list must be a string' % x)
80
81
82 def validate_string_dict(dct):
83 """Validate that the input is a dict with string keys and values.
84
85 Raises ValueError if not."""
86 for k,v in dct.iteritems():
87 if not isinstance(k, basestring):
88 raise ValueError('key %r in dict must be a string' % k)
89 if not isinstance(v, basestring):
90 raise ValueError('value %r in dict must be a string' % v)
91
92
93 #-----------------------------------------------------------------------------
94 # ZMQ Socket Channel classes
95 #-----------------------------------------------------------------------------
96
97 class ZMQSocketChannel(Thread):
98 """The base class for the channels that use ZMQ sockets."""
99 context = None
100 session = None
101 socket = None
102 ioloop = None
103 stream = None
104 _address = None
105 _exiting = False
106
107 def __init__(self, context, session, address):
108 """Create a channel.
109
110 Parameters
111 ----------
112 context : :class:`zmq.Context`
113 The ZMQ context to use.
114 session : :class:`session.Session`
115 The session to use.
116 address : zmq url
117 Standard (ip, port) tuple that the kernel is listening on.
118 """
119 super(ZMQSocketChannel, self).__init__()
120 self.daemon = True
121
122 self.context = context
123 self.session = session
124 if isinstance(address, tuple):
125 if address[1] == 0:
126 message = 'The port number for a channel cannot be 0.'
127 raise InvalidPortNumber(message)
128 address = "tcp://%s:%i" % address
129 self._address = address
130 atexit.register(self._notice_exit)
131
132 def _notice_exit(self):
133 self._exiting = True
134
135 def _run_loop(self):
136 """Run my loop, ignoring EINTR events in the poller"""
137 while True:
138 try:
139 self.ioloop.start()
140 except ZMQError as e:
141 if e.errno == errno.EINTR:
142 continue
143 else:
144 raise
145 except Exception:
146 if self._exiting:
147 break
148 else:
149 raise
150 else:
151 break
152
153 def stop(self):
154 """Stop the channel's event loop and join its thread.
155
156 This calls :method:`Thread.join` and returns when the thread
157 terminates. :class:`RuntimeError` will be raised if
158 :method:`self.start` is called again.
159 """
160 self.join()
161
162 @property
163 def address(self):
164 """Get the channel's address as a zmq url string.
165
166 These URLS have the form: 'tcp://127.0.0.1:5555'.
167 """
168 return self._address
169
170 def _queue_send(self, msg):
171 """Queue a message to be sent from the IOLoop's thread.
172
173 Parameters
174 ----------
175 msg : message to send
176
177 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
178 thread control of the action.
179 """
180 def thread_send():
181 self.session.send(self.stream, msg)
182 self.ioloop.add_callback(thread_send)
183
184 def _handle_recv(self, msg):
185 """Callback for stream.on_recv.
186
187 Unpacks message, and calls handlers with it.
188 """
189 ident,smsg = self.session.feed_identities(msg)
190 self.call_handlers(self.session.unserialize(smsg))
191
192
193
194 class ShellChannel(ZMQSocketChannel):
195 """The shell channel for issuing request/replies to the kernel."""
196
197 command_queue = None
198 # flag for whether execute requests should be allowed to call raw_input:
199 allow_stdin = True
200
201 def __init__(self, context, session, address):
202 super(ShellChannel, self).__init__(context, session, address)
203 self.ioloop = ioloop.IOLoop()
204
205 def run(self):
206 """The thread's main activity. Call start() instead."""
207 self.socket = self.context.socket(zmq.DEALER)
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
213 try:
214 self.socket.close()
215 except:
216 pass
217
218 def stop(self):
219 """Stop the channel's event loop and join its thread."""
220 self.ioloop.stop()
221 super(ShellChannel, self).stop()
222
223 def call_handlers(self, msg):
224 """This method is called in the ioloop thread when a message arrives.
225
226 Subclasses should override this method to handle incoming messages.
227 It is important to remember that this method is called in the thread
228 so that some logic must be done to ensure that the application leve
229 handlers are called in the application thread.
230 """
231 raise NotImplementedError('call_handlers must be defined in a subclass.')
232
233 def execute(self, code, silent=False, store_history=True,
234 user_variables=None, user_expressions=None, allow_stdin=None):
235 """Execute code in the kernel.
236
237 Parameters
238 ----------
239 code : str
240 A string of Python code.
241
242 silent : bool, optional (default False)
243 If set, the kernel will execute the code as quietly possible, and
244 will force store_history to be False.
245
246 store_history : bool, optional (default True)
247 If set, the kernel will store command history. This is forced
248 to be False if silent is True.
249
250 user_variables : list, optional
251 A list of variable names to pull from the user's namespace. They
252 will come back as a dict with these names as keys and their
253 :func:`repr` as values.
254
255 user_expressions : dict, optional
256 A dict mapping names to expressions to be evaluated in the user's
257 dict. The expression values are returned as strings formatted using
258 :func:`repr`.
259
260 allow_stdin : bool, optional (default self.allow_stdin)
261 Flag for whether the kernel can send stdin requests to frontends.
262
263 Some frontends (e.g. the Notebook) do not support stdin requests.
264 If raw_input is called from code executed from such a frontend, a
265 StdinNotImplementedError will be raised.
266
267 Returns
268 -------
269 The msg_id of the message sent.
270 """
271 if user_variables is None:
272 user_variables = []
273 if user_expressions is None:
274 user_expressions = {}
275 if allow_stdin is None:
276 allow_stdin = self.allow_stdin
277
278
279 # Don't waste network traffic if inputs are invalid
280 if not isinstance(code, basestring):
281 raise ValueError('code %r must be a string' % code)
282 validate_string_list(user_variables)
283 validate_string_dict(user_expressions)
284
285 # Create class for content/msg creation. Related to, but possibly
286 # not in Session.
287 content = dict(code=code, silent=silent, store_history=store_history,
288 user_variables=user_variables,
289 user_expressions=user_expressions,
290 allow_stdin=allow_stdin,
291 )
292 msg = self.session.msg('execute_request', content)
293 self._queue_send(msg)
294 return msg['header']['msg_id']
295
296 def complete(self, text, line, cursor_pos, block=None):
297 """Tab complete text in the kernel's namespace.
298
299 Parameters
300 ----------
301 text : str
302 The text to complete.
303 line : str
304 The full line of text that is the surrounding context for the
305 text to complete.
306 cursor_pos : int
307 The position of the cursor in the line where the completion was
308 requested.
309 block : str, optional
310 The full block of code in which the completion is being requested.
311
312 Returns
313 -------
314 The msg_id of the message sent.
315 """
316 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
317 msg = self.session.msg('complete_request', content)
318 self._queue_send(msg)
319 return msg['header']['msg_id']
320
321 def object_info(self, oname, detail_level=0):
322 """Get metadata information about an object in the kernel's namespace.
323
324 Parameters
325 ----------
326 oname : str
327 A string specifying the object name.
328 detail_level : int, optional
329 The level of detail for the introspection (0-2)
330
331 Returns
332 -------
333 The msg_id of the message sent.
334 """
335 content = dict(oname=oname, detail_level=detail_level)
336 msg = self.session.msg('object_info_request', content)
337 self._queue_send(msg)
338 return msg['header']['msg_id']
339
340 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
341 """Get entries from the kernel's history list.
342
343 Parameters
344 ----------
345 raw : bool
346 If True, return the raw input.
347 output : bool
348 If True, then return the output as well.
349 hist_access_type : str
350 'range' (fill in session, start and stop params), 'tail' (fill in n)
351 or 'search' (fill in pattern param).
352
353 session : int
354 For a range request, the session from which to get lines. Session
355 numbers are positive integers; negative ones count back from the
356 current session.
357 start : int
358 The first line number of a history range.
359 stop : int
360 The final (excluded) line number of a history range.
361
362 n : int
363 The number of lines of history to get for a tail request.
364
365 pattern : str
366 The glob-syntax pattern for a search request.
367
368 Returns
369 -------
370 The msg_id of the message sent.
371 """
372 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
373 **kwargs)
374 msg = self.session.msg('history_request', content)
375 self._queue_send(msg)
376 return msg['header']['msg_id']
377
378 def kernel_info(self):
379 """Request kernel info."""
380 msg = self.session.msg('kernel_info_request')
381 self._queue_send(msg)
382 return msg['header']['msg_id']
383
384 def shutdown(self, restart=False):
385 """Request an immediate kernel shutdown.
386
387 Upon receipt of the (empty) reply, client code can safely assume that
388 the kernel has shut down and it's safe to forcefully terminate it if
389 it's still alive.
390
391 The kernel will send the reply via a function registered with Python's
392 atexit module, ensuring it's truly done as the kernel is done with all
393 normal operation.
394 """
395 # Send quit message to kernel. Once we implement kernel-side setattr,
396 # this should probably be done that way, but for now this will do.
397 msg = self.session.msg('shutdown_request', {'restart':restart})
398 self._queue_send(msg)
399 return msg['header']['msg_id']
400
401
402
403 class IOPubChannel(ZMQSocketChannel):
404 """The iopub channel which listens for messages that the kernel publishes.
405
406 This channel is where all output is published to frontends.
407 """
408
409 def __init__(self, context, session, address):
410 super(IOPubChannel, self).__init__(context, session, address)
411 self.ioloop = ioloop.IOLoop()
412
413 def run(self):
414 """The thread's main activity. Call start() instead."""
415 self.socket = self.context.socket(zmq.SUB)
416 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
417 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
418 self.socket.connect(self.address)
419 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
420 self.stream.on_recv(self._handle_recv)
421 self._run_loop()
422 try:
423 self.socket.close()
424 except:
425 pass
426
427 def stop(self):
428 """Stop the channel's event loop and join its thread."""
429 self.ioloop.stop()
430 super(IOPubChannel, self).stop()
431
432 def call_handlers(self, msg):
433 """This method is called in the ioloop thread when a message arrives.
434
435 Subclasses should override this method to handle incoming messages.
436 It is important to remember that this method is called in the thread
437 so that some logic must be done to ensure that the application leve
438 handlers are called in the application thread.
439 """
440 raise NotImplementedError('call_handlers must be defined in a subclass.')
441
442 def flush(self, timeout=1.0):
443 """Immediately processes all pending messages on the iopub channel.
444
445 Callers should use this method to ensure that :method:`call_handlers`
446 has been called for all messages that have been received on the
447 0MQ SUB socket of this channel.
448
449 This method is thread safe.
450
451 Parameters
452 ----------
453 timeout : float, optional
454 The maximum amount of time to spend flushing, in seconds. The
455 default is one second.
456 """
457 # We do the IOLoop callback process twice to ensure that the IOLoop
458 # gets to perform at least one full poll.
459 stop_time = time.time() + timeout
460 for i in xrange(2):
461 self._flushed = False
462 self.ioloop.add_callback(self._flush)
463 while not self._flushed and time.time() < stop_time:
464 time.sleep(0.01)
465
466 def _flush(self):
467 """Callback for :method:`self.flush`."""
468 self.stream.flush()
469 self._flushed = True
470
471
472 class StdInChannel(ZMQSocketChannel):
473 """The stdin channel to handle raw_input requests that the kernel makes."""
474
475 msg_queue = None
476
477 def __init__(self, context, session, address):
478 super(StdInChannel, self).__init__(context, session, address)
479 self.ioloop = ioloop.IOLoop()
480
481 def run(self):
482 """The thread's main activity. Call start() instead."""
483 self.socket = self.context.socket(zmq.DEALER)
484 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
485 self.socket.connect(self.address)
486 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
487 self.stream.on_recv(self._handle_recv)
488 self._run_loop()
489 try:
490 self.socket.close()
491 except:
492 pass
493
494 def stop(self):
495 """Stop the channel's event loop and join its thread."""
496 self.ioloop.stop()
497 super(StdInChannel, self).stop()
498
499 def call_handlers(self, msg):
500 """This method is called in the ioloop thread when a message arrives.
501
502 Subclasses should override this method to handle incoming messages.
503 It is important to remember that this method is called in the thread
504 so that some logic must be done to ensure that the application leve
505 handlers are called in the application thread.
506 """
507 raise NotImplementedError('call_handlers must be defined in a subclass.')
508
509 def input(self, string):
510 """Send a string of raw input to the kernel."""
511 content = dict(value=string)
512 msg = self.session.msg('input_reply', content)
513 self._queue_send(msg)
514
515
516 class HBChannel(ZMQSocketChannel):
517 """The heartbeat channel which monitors the kernel heartbeat.
518
519 Note that the heartbeat channel is paused by default. As long as you start
520 this channel, the kernel manager will ensure that it is paused and un-paused
521 as appropriate.
522 """
523
524 time_to_dead = 3.0
525 socket = None
526 poller = None
527 _running = None
528 _pause = None
529 _beating = None
530
531 def __init__(self, context, session, address):
532 super(HBChannel, self).__init__(context, session, address)
533 self._running = False
534 self._pause =True
535 self.poller = zmq.Poller()
536
537 def _create_socket(self):
538 if self.socket is not None:
539 # close previous socket, before opening a new one
540 self.poller.unregister(self.socket)
541 self.socket.close()
542 self.socket = self.context.socket(zmq.REQ)
543 self.socket.setsockopt(zmq.LINGER, 0)
544 self.socket.connect(self.address)
545
546 self.poller.register(self.socket, zmq.POLLIN)
547
548 def _poll(self, start_time):
549 """poll for heartbeat replies until we reach self.time_to_dead.
550
551 Ignores interrupts, and returns the result of poll(), which
552 will be an empty list if no messages arrived before the timeout,
553 or the event tuple if there is a message to receive.
554 """
555
556 until_dead = self.time_to_dead - (time.time() - start_time)
557 # ensure poll at least once
558 until_dead = max(until_dead, 1e-3)
559 events = []
560 while True:
561 try:
562 events = self.poller.poll(1000 * until_dead)
563 except ZMQError as e:
564 if e.errno == errno.EINTR:
565 # ignore interrupts during heartbeat
566 # this may never actually happen
567 until_dead = self.time_to_dead - (time.time() - start_time)
568 until_dead = max(until_dead, 1e-3)
569 pass
570 else:
571 raise
572 except Exception:
573 if self._exiting:
574 break
575 else:
576 raise
577 else:
578 break
579 return events
580
581 def run(self):
582 """The thread's main activity. Call start() instead."""
583 self._create_socket()
584 self._running = True
585 self._beating = True
586
587 while self._running:
588 if self._pause:
589 # just sleep, and skip the rest of the loop
590 time.sleep(self.time_to_dead)
591 continue
592
593 since_last_heartbeat = 0.0
594 # io.rprint('Ping from HB channel') # dbg
595 # no need to catch EFSM here, because the previous event was
596 # either a recv or connect, which cannot be followed by EFSM
597 self.socket.send(b'ping')
598 request_time = time.time()
599 ready = self._poll(request_time)
600 if ready:
601 self._beating = True
602 # the poll above guarantees we have something to recv
603 self.socket.recv()
604 # sleep the remainder of the cycle
605 remainder = self.time_to_dead - (time.time() - request_time)
606 if remainder > 0:
607 time.sleep(remainder)
608 continue
609 else:
610 # nothing was received within the time limit, signal heart failure
611 self._beating = False
612 since_last_heartbeat = time.time() - request_time
613 self.call_handlers(since_last_heartbeat)
614 # and close/reopen the socket, because the REQ/REP cycle has been broken
615 self._create_socket()
616 continue
617 try:
618 self.socket.close()
619 except:
620 pass
621
622 def pause(self):
623 """Pause the heartbeat."""
624 self._pause = True
625
626 def unpause(self):
627 """Unpause the heartbeat."""
628 self._pause = False
629
630 def is_beating(self):
631 """Is the heartbeat running and responsive (and not paused)."""
632 if self.is_alive() and not self._pause and self._beating:
633 return True
634 else:
635 return False
636
637 def stop(self):
638 """Stop the channel's event loop and join its thread."""
639 self._running = False
640 super(HBChannel, self).stop()
641
642 def call_handlers(self, since_last_heartbeat):
643 """This method is called in the ioloop thread when a message arrives.
644
645 Subclasses should override this method to handle incoming messages.
646 It is important to remember that this method is called in the thread
647 so that some logic must be done to ensure that the application level
648 handlers are called in the application thread.
649 """
650 raise NotImplementedError('call_handlers must be defined in a subclass.')
651
652
653 #-----------------------------------------------------------------------------
654 41 # Main kernel manager class
655 42 #-----------------------------------------------------------------------------
656 43
657 class KernelManager(Configurable):
658 """Manages a single kernel on this host along with its channels.
659
660 There are four channels associated with each kernel:
661
662 * shell: for request/reply calls to the kernel.
663 * iopub: for the kernel to publish results to frontends.
664 * hb: for monitoring the kernel's heartbeat.
665 * stdin: for frontends to reply to raw_input calls in the kernel.
666
667 The usage of the channels that this class manages is optional. It is
668 entirely possible to connect to the kernels directly using ZeroMQ
669 sockets. These channels are useful primarily for talking to a kernel
670 whose :class:`KernelManager` is in the same process.
44 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
45 """Manages a single kernel in a subprocess on this host.
671 46
672 This version manages kernels started using Popen.
47 This version starts kernels with Popen.
673 48 """
49
674 50 # The PyZMQ Context to use for communication with the kernel.
675 51 context = Instance(zmq.Context)
676 52 def _context_default(self):
@@ -696,49 +72,8 b' class KernelManager(Configurable):'
696 72
697 73 ipython_kernel = Bool(True)
698 74
699 # The addresses for the communication channels.
700 connection_file = Unicode('')
701
702 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
703
704 ip = Unicode(LOCALHOST, config=True,
705 help="""Set the kernel\'s IP address [default localhost].
706 If the IP address is something other than localhost, then
707 Consoles on other machines will be able to connect
708 to the Kernel, so be careful!"""
709 )
710
711 def _ip_default(self):
712 if self.transport == 'ipc':
713 if self.connection_file:
714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 else:
716 return 'kernel-ipc'
717 else:
718 return LOCALHOST
719
720 def _ip_changed(self, name, old, new):
721 if new == '*':
722 self.ip = '0.0.0.0'
723
724 shell_port = Integer(0)
725 iopub_port = Integer(0)
726 stdin_port = Integer(0)
727 hb_port = Integer(0)
728
729 # The classes to use for the various channels.
730 shell_channel_class = Type(ShellChannel)
731 iopub_channel_class = Type(IOPubChannel)
732 stdin_channel_class = Type(StdInChannel)
733 hb_channel_class = Type(HBChannel)
734
735 # Protected traits.
75 # Protected traits
736 76 _launch_args = Any
737 _shell_channel = Any
738 _iopub_channel = Any
739 _stdin_channel = Any
740 _hb_channel = Any
741 _connection_file_written=Bool(False)
742 77
743 78 autorestart = Bool(False, config=True,
744 79 help="""Should we autorestart the kernel if it dies."""
@@ -748,159 +83,6 b' class KernelManager(Configurable):'
748 83 self.cleanup_connection_file()
749 84
750 85 #--------------------------------------------------------------------------
751 # Channel management methods:
752 #--------------------------------------------------------------------------
753
754 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
755 """Starts the channels for this kernel.
756
757 This will create the channels if they do not exist and then start
758 them (their activity runs in a thread). If port numbers of 0 are
759 being used (random ports) then you must first call
760 :method:`start_kernel`. If the channels have been stopped and you
761 call this, :class:`RuntimeError` will be raised.
762 """
763 if shell:
764 self.shell_channel.start()
765 if iopub:
766 self.iopub_channel.start()
767 if stdin:
768 self.stdin_channel.start()
769 self.shell_channel.allow_stdin = True
770 else:
771 self.shell_channel.allow_stdin = False
772 if hb:
773 self.hb_channel.start()
774
775 def stop_channels(self):
776 """Stops all the running channels for this kernel.
777
778 This stops their event loops and joins their threads.
779 """
780 if self.shell_channel.is_alive():
781 self.shell_channel.stop()
782 if self.iopub_channel.is_alive():
783 self.iopub_channel.stop()
784 if self.stdin_channel.is_alive():
785 self.stdin_channel.stop()
786 if self.hb_channel.is_alive():
787 self.hb_channel.stop()
788
789 @property
790 def channels_running(self):
791 """Are any of the channels created and running?"""
792 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
793 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
794
795 def _make_url(self, port):
796 """Make a zmq url with a port.
797
798 There are two cases that this handles:
799
800 * tcp: tcp://ip:port
801 * ipc: ipc://ip-port
802 """
803 if self.transport == 'tcp':
804 return "tcp://%s:%i" % (self.ip, port)
805 else:
806 return "%s://%s-%s" % (self.transport, self.ip, port)
807
808 @property
809 def shell_channel(self):
810 """Get the shell channel object for this kernel."""
811 if self._shell_channel is None:
812 self._shell_channel = self.shell_channel_class(
813 self.context, self.session, self._make_url(self.shell_port)
814 )
815 return self._shell_channel
816
817 @property
818 def iopub_channel(self):
819 """Get the iopub channel object for this kernel."""
820 if self._iopub_channel is None:
821 self._iopub_channel = self.iopub_channel_class(
822 self.context, self.session, self._make_url(self.iopub_port)
823 )
824 return self._iopub_channel
825
826 @property
827 def stdin_channel(self):
828 """Get the stdin channel object for this kernel."""
829 if self._stdin_channel is None:
830 self._stdin_channel = self.stdin_channel_class(
831 self.context, self.session, self._make_url(self.stdin_port)
832 )
833 return self._stdin_channel
834
835 @property
836 def hb_channel(self):
837 """Get the hb channel object for this kernel."""
838 if self._hb_channel is None:
839 self._hb_channel = self.hb_channel_class(
840 self.context, self.session, self._make_url(self.hb_port)
841 )
842 return self._hb_channel
843
844 #--------------------------------------------------------------------------
845 # Connection and ipc file management
846 #--------------------------------------------------------------------------
847
848 def cleanup_connection_file(self):
849 """Cleanup connection file *if we wrote it*
850
851 Will not raise if the connection file was already removed somehow.
852 """
853 if self._connection_file_written:
854 # cleanup connection files on full shutdown of kernel we started
855 self._connection_file_written = False
856 try:
857 os.remove(self.connection_file)
858 except (IOError, OSError, AttributeError):
859 pass
860
861 def cleanup_ipc_files(self):
862 """Cleanup ipc files if we wrote them."""
863 if self.transport != 'ipc':
864 return
865 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
866 ipcfile = "%s-%i" % (self.ip, port)
867 try:
868 os.remove(ipcfile)
869 except (IOError, OSError):
870 pass
871
872 def load_connection_file(self):
873 """Load connection info from JSON dict in self.connection_file."""
874 with open(self.connection_file) as f:
875 cfg = json.loads(f.read())
876
877 from pprint import pprint
878 pprint(cfg)
879 self.transport = cfg.get('transport', 'tcp')
880 self.ip = cfg['ip']
881 self.shell_port = cfg['shell_port']
882 self.stdin_port = cfg['stdin_port']
883 self.iopub_port = cfg['iopub_port']
884 self.hb_port = cfg['hb_port']
885 self.session.key = str_to_bytes(cfg['key'])
886
887 def write_connection_file(self):
888 """Write connection info to JSON dict in self.connection_file."""
889 if self._connection_file_written:
890 return
891 self.connection_file,cfg = write_connection_file(self.connection_file,
892 transport=self.transport, ip=self.ip, key=self.session.key,
893 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
894 shell_port=self.shell_port, hb_port=self.hb_port)
895 # write_connection_file also sets default ports:
896 self.shell_port = cfg['shell_port']
897 self.stdin_port = cfg['stdin_port']
898 self.iopub_port = cfg['iopub_port']
899 self.hb_port = cfg['hb_port']
900
901 self._connection_file_written = True
902
903 #--------------------------------------------------------------------------
904 86 # Kernel restarter
905 87 #--------------------------------------------------------------------------
906 88
@@ -966,6 +148,10 b' class KernelManager(Configurable):'
966 148 **kw)
967 149 self.start_restarter()
968 150
151 def _send_shutdown_request(self, restart=False):
152 """TODO: send a shutdown request via control channel"""
153 raise NotImplementedError("Soft shutdown needs control channel")
154
969 155 def shutdown_kernel(self, now=False, restart=False):
970 156 """Attempts to the stop the kernel process cleanly.
971 157
@@ -985,10 +171,6 b' class KernelManager(Configurable):'
985 171 is True, connection files will not be cleaned up.
986 172 """
987 173
988 # Pause the heart beat channel if it exists.
989 if self._hb_channel is not None:
990 self._hb_channel.pause()
991
992 174 # Stop monitoring for restarting while we shutdown.
993 175 self.stop_restarter()
994 176
@@ -997,6 +179,10 b' class KernelManager(Configurable):'
997 179 self._kill_kernel()
998 180 return
999 181
182 # bypass clean shutdown while
183 # FIXME: add control channel for clean shutdown
184 now = True
185
1000 186 if now:
1001 187 if self.has_kernel:
1002 188 self._kill_kernel()
@@ -1004,7 +190,8 b' class KernelManager(Configurable):'
1004 190 # Don't send any additional kernel kill messages immediately, to give
1005 191 # the kernel a chance to properly execute shutdown actions. Wait for at
1006 192 # most 1s, checking every 0.1s.
1007 self.shell_channel.shutdown(restart=restart)
193 # FIXME: this method is not yet implemented (need Control channel)
194 self._send_shutdown_request(restart=restart)
1008 195 for i in range(10):
1009 196 if self.is_alive():
1010 197 time.sleep(0.1)
@@ -1125,22 +312,14 b' class KernelManager(Configurable):'
1125 312 return True
1126 313 else:
1127 314 return False
1128 elif self._hb_channel is not None:
1129 # We didn't start the kernel with this KernelManager so we
1130 # use the heartbeat.
1131 return self._hb_channel.is_beating()
1132 315 else:
1133 # no heartbeat and not local, we can't tell if it's running,
1134 # so naively return True
1135 return True
316 # we don't have a kernel
317 return False
1136 318
1137 319
1138 320 #-----------------------------------------------------------------------------
1139 321 # ABC Registration
1140 322 #-----------------------------------------------------------------------------
1141 323
1142 ShellChannelABC.register(ShellChannel)
1143 IOPubChannelABC.register(IOPubChannel)
1144 HBChannelABC.register(HBChannel)
1145 StdInChannelABC.register(StdInChannel)
1146 324 KernelManagerABC.register(KernelManager)
325
@@ -97,8 +97,6 b' class MultiKernelManager(LoggingConfigurable):'
97 97 config=self.config, autorestart=True, log=self.log
98 98 )
99 99 km.start_kernel(**kwargs)
100 # start just the shell channel, needed for graceful restart
101 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
102 100 self._kernels[kernel_id] = km
103 101 return kernel_id
104 102
@@ -114,7 +112,6 b' class MultiKernelManager(LoggingConfigurable):'
114 112 """
115 113 k = self.get_kernel(kernel_id)
116 114 k.shutdown_kernel(now=now)
117 k.shell_channel.stop()
118 115 del self._kernels[kernel_id]
119 116
120 117 def shutdown_all(self, now=False):
General Comments 0
You need to be logged in to leave comments. Login now