"""KernelStarter class that intercepts Control Queue messages, and handles process management.""" from zmq.eventloop import ioloop from streamsession import StreamSession class KernelStarter(object): """Object for resetting/killing the Kernel.""" def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs): self.session = session self.upstream = upstream self.downstream = downstream self.kernel_args = kernel_args self.kernel_kwargs = kernel_kwargs self.handlers = {} for method in 'shutdown_request shutdown_reply'.split(): self.handlers[method] = getattr(self, method) def start(self): self.upstream.on_recv(self.dispatch_request) self.downstream.on_recv(self.dispatch_reply) #-------------------------------------------------------------------------- # Dispatch methods #-------------------------------------------------------------------------- def dispatch_request(self, raw_msg): idents, msg = self.session.feed_identities() try: msg = self.session.unpack_message(msg, content=False) except: print ("bad msg: %s"%msg) msgtype = msg['msg_type'] handler = self.handlers.get(msgtype, None) if handler is None: self.downstream.send_multipart(raw_msg, copy=False) else: handler(msg) def dispatch_reply(self, raw_msg): idents, msg = self.session.feed_identities() try: msg = self.session.unpack_message(msg, content=False) except: print ("bad msg: %s"%msg) msgtype = msg['msg_type'] handler = self.handlers.get(msgtype, None) if handler is None: self.upstream.send_multipart(raw_msg, copy=False) else: handler(msg) #-------------------------------------------------------------------------- # Handlers #-------------------------------------------------------------------------- def shutdown_request(self, msg): """""" self.downstream.send_multipart(msg) #-------------------------------------------------------------------------- # Kernel process management methods, from KernelManager: #-------------------------------------------------------------------------- def _check_local(addr): if isinstance(addr, tuple): addr = addr[0] return addr in LOCAL_IPS def start_kernel(self, **kw): """Starts a kernel process and configures the manager to use it. If random ports (port=0) are being used, this method must be called before the channels are created. Parameters: ----------- ipython : bool, optional (default True) Whether to use an IPython kernel instead of a plain Python kernel. """ self.kernel = Process(target=make_kernel, args=self.kernel_args, kwargs=self.kernel_kwargs) def shutdown_kernel(self, restart=False): """ Attempts to the stop the kernel process cleanly. If the kernel cannot be stopped, it is killed, if possible. """ # FIXME: Shutdown does not work on Windows due to ZMQ errors! if sys.platform == 'win32': self.kill_kernel() return # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. self.xreq_channel.shutdown(restart=restart) for i in range(10): if self.is_alive: time.sleep(0.1) else: break else: # OK, we've waited long enough. if self.has_kernel: self.kill_kernel() def restart_kernel(self, now=False): """Restarts a kernel with the same arguments that were used to launch it. If the old kernel was launched with random ports, the same ports will be used for the new kernel. Parameters ---------- now : bool, optional If True, the kernel is forcefully restarted *immediately*, without having a chance to do any cleanup action. Otherwise the kernel is given 1s to clean up before a forceful restart is issued. In all cases the kernel is restarted, the only difference is whether it is given a chance to perform a clean shutdown or not. """ if self._launch_args is None: raise RuntimeError("Cannot restart the kernel. " "No previous call to 'start_kernel'.") else: if self.has_kernel: if now: self.kill_kernel() else: self.shutdown_kernel(restart=True) self.start_kernel(**self._launch_args) # FIXME: Messages get dropped in Windows due to probable ZMQ bug # unless there is some delay here. if sys.platform == 'win32': time.sleep(0.2) @property def has_kernel(self): """Returns whether a kernel process has been specified for the kernel manager. """ return self.kernel is not None def kill_kernel(self): """ Kill the running kernel. """ if self.has_kernel: # Pause the heart beat channel if it exists. if self._hb_channel is not None: self._hb_channel.pause() # Attempt to kill the kernel. try: self.kernel.kill() except OSError, e: # In Windows, we will get an Access Denied error if the process # has already terminated. Ignore it. if not (sys.platform == 'win32' and e.winerror == 5): raise self.kernel = None else: raise RuntimeError("Cannot kill kernel. No kernel is running!") def interrupt_kernel(self): """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is well supported on all platforms. """ if self.has_kernel: if sys.platform == 'win32': from parentpoller import ParentPollerWindows as Poller Poller.send_interrupt(self.kernel.win32_interrupt_event) else: self.kernel.send_signal(signal.SIGINT) else: raise RuntimeError("Cannot interrupt kernel. No kernel is running!") def signal_kernel(self, signum): """ Sends a signal to the kernel. Note that since only SIGTERM is supported on Windows, this function is only useful on Unix systems. """ if self.has_kernel: self.kernel.send_signal(signum) else: raise RuntimeError("Cannot signal kernel. No kernel is running!") @property def is_alive(self): """Is the kernel process still running?""" # FIXME: not using a heartbeat means this method is broken for any # remote kernel, it's only capable of handling local kernels. if self.has_kernel: if self.kernel.poll() is None: return True else: return False else: # We didn't start the kernel with this KernelManager so we don't # know if it is running. We should use a heartbeat for this case. return True def make_starter(up_addr, down_addr, *args, **kwargs): """entry point function for launching a kernelstarter in a subprocess""" loop = ioloop.IOLoop.instance() ctx = zmq.Context() session = StreamSession() upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) upstream.connect(up_addr) downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) downstream.connect(down_addr) starter = KernelStarter(session, upstream, downstream, *args, **kwargs) starter.start() loop.start()