diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 6ab6e08..3b6a931 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -90,8 +90,7 @@ def validate_string_dict(dct): #----------------------------------------------------------------------------- class ZMQSocketChannel(Thread): - """The base class for the channels that use ZMQ sockets. - """ + """The base class for the channels that use ZMQ sockets.""" context = None session = None socket = None @@ -101,7 +100,7 @@ class ZMQSocketChannel(Thread): _exiting = False def __init__(self, context, session, address): - """Create a channel + """Create a channel. Parameters ---------- @@ -147,7 +146,7 @@ class ZMQSocketChannel(Thread): break def stop(self): - """Stop the channel's activity. + """Stop the channel's event loop and join its thread. This calls :method:`Thread.join` and returns when the thread terminates. :class:`RuntimeError` will be raised if @@ -157,7 +156,9 @@ class ZMQSocketChannel(Thread): @property def address(self): - """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555'). + """Get the channel's address as a zmq url string. + + These URLS have the form: 'tcp://127.0.0.1:5555'. """ return self._address @@ -176,9 +177,9 @@ class ZMQSocketChannel(Thread): self.ioloop.add_callback(thread_send) def _handle_recv(self, msg): - """callback for stream.on_recv - - unpacks message, and calls handlers with it. + """Callback for stream.on_recv. + + Unpacks message, and calls handlers with it. """ ident,smsg = self.session.feed_identities(msg) self.call_handlers(self.session.unserialize(smsg)) @@ -186,8 +187,7 @@ class ZMQSocketChannel(Thread): class ShellChannel(ZMQSocketChannel): - """The DEALER channel for issues request/replies to the kernel. - """ + """The shell channel for issuing request/replies to the kernel.""" command_queue = None # flag for whether execute requests should be allowed to call raw_input: @@ -212,6 +212,7 @@ class ShellChannel(ZMQSocketChannel): pass def stop(self): + """Stop the channel's event loop and join its thread.""" self.ioloop.stop() super(ShellChannel, self).stop() @@ -314,7 +315,7 @@ class ShellChannel(ZMQSocketChannel): return msg['header']['msg_id'] def object_info(self, oname, detail_level=0): - """Get metadata information about an object. + """Get metadata information about an object in the kernel's namespace. Parameters ---------- @@ -333,7 +334,7 @@ class ShellChannel(ZMQSocketChannel): return msg['header']['msg_id'] def history(self, raw=True, output=False, hist_access_type='range', **kwargs): - """Get entries from the history list. + """Get entries from the kernel's history list. Parameters ---------- @@ -396,7 +397,9 @@ class ShellChannel(ZMQSocketChannel): class IOPubChannel(ZMQSocketChannel): - """The SUB channel which listens for messages that the kernel publishes. + """The iopub channel which listens for messages that the kernel publishes. + + This channel is where all output is published to frontends. """ def __init__(self, context, session, address): @@ -418,6 +421,7 @@ class IOPubChannel(ZMQSocketChannel): pass def stop(self): + """Stop the channel's event loop and join its thread.""" self.ioloop.stop() super(IOPubChannel, self).stop() @@ -432,7 +436,7 @@ class IOPubChannel(ZMQSocketChannel): raise NotImplementedError('call_handlers must be defined in a subclass.') def flush(self, timeout=1.0): - """Immediately processes all pending messages on the SUB channel. + """Immediately processes all pending messages on the iopub channel. Callers should use this method to ensure that :method:`call_handlers` has been called for all messages that have been received on the @@ -462,7 +466,7 @@ class IOPubChannel(ZMQSocketChannel): class StdInChannel(ZMQSocketChannel): - """A reply channel to handle raw_input requests that the kernel makes.""" + """The stdin channel to handle raw_input requests that the kernel makes.""" msg_queue = None @@ -482,9 +486,9 @@ class StdInChannel(ZMQSocketChannel): self.socket.close() except: pass - def stop(self): + """Stop the channel's event loop and join its thread.""" self.ioloop.stop() super(StdInChannel, self).stop() @@ -538,7 +542,7 @@ class HBChannel(ZMQSocketChannel): self.poller.register(self.socket, zmq.POLLIN) def _poll(self, start_time): - """poll for heartbeat replies until we reach self.time_to_dead + """poll for heartbeat replies until we reach self.time_to_dead. Ignores interrupts, and returns the result of poll(), which will be an empty list if no messages arrived before the timeout, @@ -627,6 +631,7 @@ class HBChannel(ZMQSocketChannel): return False def stop(self): + """Stop the channel's event loop and join its thread.""" self._running = False super(HBChannel, self).stop() @@ -646,15 +651,21 @@ class HBChannel(ZMQSocketChannel): #----------------------------------------------------------------------------- class KernelManager(Configurable): - """ Manages a kernel for a frontend. + """Manages a single kernel on this host along with its channels. - The SUB channel is for the frontend to receive messages published by the - kernel. + There are four channels associated with each kernel: - The REQ channel is for the frontend to make requests of the kernel. + * shell: for request/reply calls to the kernel. + * iopub: for the kernel to publish results to frontends. + * hb: for monitoring the kernel's heartbeat. + * stdin: for frontends to reply to raw_input calls in the kernel. - The REP channel is for the kernel to request stdin (raw_input) from the - frontend. + The usage of the channels that this class manages is optional. It is + entirely possible to connect to the kernels directly using ZeroMQ + sockets. These channels are useful primarily for talking to a kernel + whose :class:`KernelManager` is in the same process. + + This version manages kernels started using Popen. """ # The PyZMQ Context to use for communication with the kernel. context = Instance(zmq.Context) @@ -708,9 +719,10 @@ class KernelManager(Configurable): """Starts the channels for this kernel. This will create the channels if they do not exist and then start - them. If port numbers of 0 are being used (random ports) then you - must first call :method:`start_kernel`. If the channels have been - stopped and you call this, :class:`RuntimeError` will be raised. + them (their activity runs in a thread). If port numbers of 0 are + being used (random ports) then you must first call + :method:`start_kernel`. If the channels have been stopped and you + call this, :class:`RuntimeError` will be raised. """ if shell: self.shell_channel.start() @@ -726,6 +738,8 @@ class KernelManager(Configurable): def stop_channels(self): """Stops all the running channels for this kernel. + + This stops their event loops and joins their threads. """ if self.shell_channel.is_alive(): self.shell_channel.stop() @@ -743,7 +757,13 @@ class KernelManager(Configurable): self.stdin_channel.is_alive() or self.hb_channel.is_alive()) def _make_url(self, port): - """make a zmq url with a port""" + """Make a zmq url with a port. + + There are two cases that this handles: + + * tcp: tcp://ip:port + * ipc: ipc://ip-port + """ if self.transport == 'tcp': return "tcp://%s:%i" % (self.ip, port) else: @@ -751,42 +771,37 @@ class KernelManager(Configurable): @property def shell_channel(self): - """Get the REQ socket channel object to make requests of the kernel.""" + """Get the shell channel object for this kernel.""" if self._shell_channel is None: - self._shell_channel = self.shell_channel_class(self.context, - self.session, - self._make_url(self.shell_port), + self._shell_channel = self.shell_channel_class( + self.context, self.session, self._make_url(self.shell_port) ) return self._shell_channel @property def iopub_channel(self): - """Get the SUB socket channel object.""" + """Get the iopub channel object for this kernel.""" if self._iopub_channel is None: - self._iopub_channel = self.iopub_channel_class(self.context, - self.session, - self._make_url(self.iopub_port), + self._iopub_channel = self.iopub_channel_class( + self.context, self.session, self._make_url(self.iopub_port) ) return self._iopub_channel @property def stdin_channel(self): - """Get the REP socket channel object to handle stdin (raw_input).""" + """Get the stdin channel object for this kernel.""" if self._stdin_channel is None: - self._stdin_channel = self.stdin_channel_class(self.context, - self.session, - self._make_url(self.stdin_port), + self._stdin_channel = self.stdin_channel_class( + self.context, self.session, self._make_url(self.stdin_port) ) return self._stdin_channel @property def hb_channel(self): - """Get the heartbeat socket channel object to check that the - kernel is alive.""" + """Get the hb channel object for this kernel.""" if self._hb_channel is None: - self._hb_channel = self.hb_channel_class(self.context, - self.session, - self._make_url(self.hb_port), + self._hb_channel = self.hb_channel_class( + self.context, self.session, self._make_url(self.hb_port) ) return self._hb_channel @@ -795,7 +810,7 @@ class KernelManager(Configurable): #-------------------------------------------------------------------------- def cleanup_connection_file(self): - """cleanup connection file *if we wrote it* + """Cleanup connection file *if we wrote it* Will not raise if the connection file was already removed somehow. """ @@ -810,7 +825,7 @@ class KernelManager(Configurable): self.cleanup_ipc_files() def cleanup_ipc_files(self): - """cleanup ipc files if we wrote them""" + """Cleanup ipc files if we wrote them.""" if self.transport != 'ipc': return for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): @@ -821,7 +836,7 @@ class KernelManager(Configurable): pass def load_connection_file(self): - """load connection info from JSON dict in self.connection_file""" + """Load connection info from JSON dict in self.connection_file.""" with open(self.connection_file) as f: cfg = json.loads(f.read()) @@ -836,7 +851,7 @@ class KernelManager(Configurable): self.session.key = str_to_bytes(cfg['key']) def write_connection_file(self): - """write connection info to JSON dict in self.connection_file""" + """Write connection info to JSON dict in self.connection_file.""" if self._connection_file_written: return self.connection_file,cfg = write_connection_file(self.connection_file, @@ -856,7 +871,7 @@ class KernelManager(Configurable): #-------------------------------------------------------------------------- def start_kernel(self, **kw): - """Starts a kernel process and configures the manager to use it. + """Starts a kernel on this host in a separate process. If random ports (port=0) are being used, this method must be called before the channels are created. @@ -869,7 +884,8 @@ class KernelManager(Configurable): it should not be necessary to use this parameter. **kw : optional - See respective options for IPython and Python kernels. + keyword arguments that are passed down into the launcher + callable. """ if self.transport == 'tcp' and self.ip not in LOCAL_IPS: raise RuntimeError("Can only launch a kernel on a local interface. " @@ -888,9 +904,22 @@ class KernelManager(Configurable): self.kernel = launch_kernel(fname=self.connection_file, **kw) def shutdown_kernel(self, now=False, restart=False): - """ Attempts to the stop the kernel process cleanly. + """Attempts to the stop the kernel process cleanly. + + This attempts to shutdown the kernels cleanly by: + + 1. Sending it a shutdown message over the shell channel. + 2. If that fails, the kernel is shutdown forcibly by sending it + a signal. - If the kernel cannot be stopped and the kernel is local, it is killed. + Parameters: + ----------- + now : bool + Should the kernel be forcible killed *now*. This skips the + first, nice shutdown attempt. + restart: bool + Will this kernel be restarted after it is shutdown. When this + is True, connection files will not be cleaned up. """ # FIXME: Shutdown does not work on Windows due to ZMQ errors! if sys.platform == 'win32': @@ -928,7 +957,7 @@ class KernelManager(Configurable): """Restarts a kernel with the arguments that were used to launch it. If the old kernel was launched with random ports, the same ports will be - used for the new kernel. + used for the new kernel. The same connection file is used again. Parameters ---------- @@ -941,7 +970,7 @@ class KernelManager(Configurable): it is given a chance to perform a clean shutdown or not. **kw : optional - Any options specified here will replace those used to launch the + Any options specified here will overwrite those used to launch the kernel. """ if self._launch_args is None: @@ -962,15 +991,13 @@ class KernelManager(Configurable): @property def has_kernel(self): - """Returns whether a kernel process has been specified for the kernel - manager. - """ + """Has a kernel been started that we are managing.""" return self.kernel is not None def kill_kernel(self): - """ Kill the running kernel. + """Kill the running kernel. - This method blocks until the kernel process has terminated. + This is a private method, callers should use shutdown_kernel(now=True). """ if self.has_kernel: # Pause the heart beat channel if it exists. @@ -1001,7 +1028,7 @@ class KernelManager(Configurable): raise RuntimeError("Cannot kill kernel. No kernel is running!") def interrupt_kernel(self): - """ Interrupts the kernel. + """Interrupts the kernel by sending it a signal. Unlike ``signal_kernel``, this operation is well supported on all platforms. @@ -1016,7 +1043,7 @@ class KernelManager(Configurable): raise RuntimeError("Cannot interrupt kernel. No kernel is running!") def signal_kernel(self, signum): - """ Sends a signal to the kernel. + """Sends a signal to the kernel. Note that since only SIGTERM is supported on Windows, this function is only useful on Unix systems. @@ -1053,3 +1080,4 @@ IOPubChannelABC.register(IOPubChannel) HBChannelABC.register(HBChannel) StdInChannelABC.register(StdInChannel) KernelManagerABC.register(KernelManager) +