From c2e05b576c739528861aeb610280ad2fd1cefa2f 2013-04-24 04:47:45 From: MinRK Date: 2013-04-24 04:47:45 Subject: [PATCH] add control channel clean shutdown is back --- diff --git a/IPython/kernel/connect.py b/IPython/kernel/connect.py index cece6a9..329c7b9 100644 --- a/IPython/kernel/connect.py +++ b/IPython/kernel/connect.py @@ -46,7 +46,7 @@ from IPython.utils.traitlets import ( #----------------------------------------------------------------------------- def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0, - ip=LOCALHOST, key=b'', transport='tcp'): + control_port=0, ip=LOCALHOST, key=b'', transport='tcp'): """Generates a JSON config file, including the selection of random ports. Parameters @@ -62,7 +62,10 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, The port to use for the SUB channel. stdin_port : int, optional - The port to use for the REQ (raw input) channel. + The port to use for the ROUTER (raw input) channel. + + control_port : int, optional + The port to use for the ROUTER (raw input) channel. hb_port : int, optional The port to use for the hearbeat REP channel. @@ -81,8 +84,11 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, # Find open ports as necessary. ports = [] - ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \ - int(stdin_port <= 0) + int(hb_port <= 0) + ports_needed = int(shell_port <= 0) + \ + int(iopub_port <= 0) + \ + int(stdin_port <= 0) + \ + int(control_port <= 0) + \ + int(hb_port <= 0) if transport == 'tcp': for i in range(ports_needed): sock = socket.socket() @@ -105,12 +111,15 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, iopub_port = ports.pop(0) if stdin_port <= 0: stdin_port = ports.pop(0) + if control_port <= 0: + control_port = ports.pop(0) if hb_port <= 0: hb_port = ports.pop(0) cfg = dict( shell_port=shell_port, iopub_port=iopub_port, stdin_port=stdin_port, + control_port=control_port, hb_port=hb_port, ) cfg['ip'] = ip @@ -346,6 +355,7 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None): #----------------------------------------------------------------------------- # Mixin for classes that workw ith connection files #----------------------------------------------------------------------------- +port_names = [ "%s_port" % channel for channel in ('shell', 'stdin', 'iopub', 'hb', 'control')] class ConnectionFileMixin(HasTraits): """Mixin for configurable classes that work with connection files""" @@ -381,12 +391,29 @@ class ConnectionFileMixin(HasTraits): shell_port = Integer(0) iopub_port = Integer(0) stdin_port = Integer(0) + control_port = Integer(0) hb_port = Integer(0) + @property + def ports(self): + return [ getattr(self, name) for name in port_names ] + #-------------------------------------------------------------------------- # Connection and ipc file management #-------------------------------------------------------------------------- + def get_connection_info(self): + """return the connection info as a dict""" + return dict( + transport=self.transport, + ip=self.ip, + shell_port=self.shell_port, + iopub_port=self.iopub_port, + stdin_port=self.stdin_port, + hb_port=self.hb_port, + control_port=self.control_port, + ) + def cleanup_connection_file(self): """Cleanup connection file *if we wrote it* @@ -404,7 +431,7 @@ class ConnectionFileMixin(HasTraits): """Cleanup ipc files if we wrote them.""" if self.transport != 'ipc': return - for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): + for port in self.ports: ipcfile = "%s-%i" % (self.ip, port) try: os.remove(ipcfile) @@ -415,15 +442,16 @@ class ConnectionFileMixin(HasTraits): """Write connection info to JSON dict in self.connection_file.""" if self._connection_file_written: return - self.connection_file,cfg = write_connection_file(self.connection_file, + + self.connection_file, cfg = write_connection_file(self.connection_file, transport=self.transport, ip=self.ip, key=self.session.key, stdin_port=self.stdin_port, iopub_port=self.iopub_port, - shell_port=self.shell_port, hb_port=self.hb_port) + shell_port=self.shell_port, hb_port=self.hb_port, + control_port=self.control_port, + ) # write_connection_file also sets default ports: - self.shell_port = cfg['shell_port'] - self.stdin_port = cfg['stdin_port'] - self.iopub_port = cfg['iopub_port'] - self.hb_port = cfg['hb_port'] + for name in port_names: + setattr(self, name, cfg[name]) self._connection_file_written = True @@ -434,10 +462,8 @@ class ConnectionFileMixin(HasTraits): self.transport = cfg.get('transport', 'tcp') self.ip = cfg['ip'] - self.shell_port = cfg['shell_port'] - self.stdin_port = cfg['stdin_port'] - self.iopub_port = cfg['iopub_port'] - self.hb_port = cfg['hb_port'] + for name in port_names: + setattr(self, name, cfg[name]) self.session.key = str_to_bytes(cfg['key']) diff --git a/IPython/kernel/manager.py b/IPython/kernel/manager.py index dfe635d..75f7345 100644 --- a/IPython/kernel/manager.py +++ b/IPython/kernel/manager.py @@ -46,6 +46,7 @@ _socket_types = { 'shell' : zmq.DEALER, 'iopub' : zmq.SUB, 'stdin' : zmq.DEALER, + 'control': zmq.DEALER, } class KernelManager(LoggingConfigurable, ConnectionFileMixin): @@ -80,13 +81,15 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): ipython_kernel = Bool(True) # Protected traits - _launch_args = Any + _launch_args = Any() + _control_socket = Any() autorestart = Bool(False, config=True, help="""Should we autorestart the kernel if it dies.""" ) def __del__(self): + self._close_control_socket() self.cleanup_connection_file() #-------------------------------------------------------------------------- @@ -103,17 +106,6 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): # Connection info #-------------------------------------------------------------------------- - def get_connection_info(self): - """return the connection info as a dict""" - return dict( - transport=self.transport, - ip=self.ip, - shell_port=self.shell_port, - iopub_port=self.iopub_port, - stdin_port=self.stdin_port, - hb_port=self.hb_port, - ) - def _make_url(self, channel): """Make a ZeroMQ URL for a given channel.""" transport = self.transport @@ -152,6 +144,10 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): """return zmq Socket connected to the Heartbeat channel""" return self._create_connected_socket('hb') + def connect_control(self): + """return zmq Socket connected to the Heartbeat channel""" + return self._create_connected_socket('control') + #-------------------------------------------------------------------------- # Kernel management #-------------------------------------------------------------------------- @@ -176,6 +172,18 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): """ return launch_kernel(kernel_cmd, **kw) + def _connect_control_socket(self): + if self._control_socket is None: + self._control_socket = self.connect_control() + + def _close_control_socket(self): + if self._control_socket is None: + return + self._control_socket.linger = 100 + self._control_socket.close() + self._control_socket = None + + def start_kernel(self, **kw): """Starts a kernel on this host in a separate process. @@ -207,10 +215,13 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): ipython_kernel=self.ipython_kernel, **kw) self.start_restarter() + self._connect_control_socket() def _send_shutdown_request(self, restart=False): """TODO: send a shutdown request via control channel""" - raise NotImplementedError("Soft shutdown needs control channel") + content = dict(restart=restart) + msg = self.session.msg("shutdown_request", content=content) + self.session.send(self._control_socket, msg) def shutdown_kernel(self, now=False, restart=False): """Attempts to the stop the kernel process cleanly. @@ -230,7 +241,6 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): Will this kernel be restarted after it is shutdown. When this is True, connection files will not be cleaned up. """ - # Stop monitoring for restarting while we shutdown. self.stop_restarter() @@ -239,10 +249,6 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): self._kill_kernel() return - # bypass clean shutdown while - # FIXME: add control channel for clean shutdown - now = True - if now: if self.has_kernel: self._kill_kernel() @@ -250,7 +256,6 @@ class KernelManager(LoggingConfigurable, ConnectionFileMixin): # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. - # FIXME: this method is not yet implemented (need Control channel) self._send_shutdown_request(restart=restart) for i in range(10): if self.is_alive(): diff --git a/IPython/kernel/zmq/kernelapp.py b/IPython/kernel/zmq/kernelapp.py index fafc7b1..5e64edf 100644 --- a/IPython/kernel/zmq/kernelapp.py +++ b/IPython/kernel/zmq/kernelapp.py @@ -69,6 +69,7 @@ kernel_aliases.update({ 'shell' : 'IPKernelApp.shell_port', 'iopub' : 'IPKernelApp.iopub_port', 'stdin' : 'IPKernelApp.stdin_port', + 'control' : 'IPKernelApp.control_port', 'f' : 'IPKernelApp.connection_file', 'parent': 'IPKernelApp.parent', 'transport': 'IPKernelApp.transport', @@ -145,7 +146,8 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]") shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]") iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]") - stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]") + stdin_port = Integer(0, config=True, help="set the stdin (ROUTER) port [default: random]") + control_port = Integer(0, config=True, help="set the control (ROUTER) port [default: random]") connection_file = Unicode('', config=True, help="""JSON file in which to store connection info [default: kernel-.json] @@ -227,7 +229,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): if self.ip == self._ip_default() and 'ip' in cfg: # not overridden by config or cl_args self.ip = cfg['ip'] - for channel in ('hb', 'shell', 'iopub', 'stdin'): + for channel in ('hb', 'shell', 'iopub', 'stdin', 'control'): name = channel + '_port' if getattr(self, name) == 0 and name in cfg: # not overridden by config or cl_args @@ -241,7 +243,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.log.debug("Writing connection file: %s", cf) write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport, shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, - iopub_port=self.iopub_port) + iopub_port=self.iopub_port, control_port=self.control_port) def cleanup_connection_file(self): cf = self.abs_connection_file @@ -257,7 +259,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): """cleanup ipc files if we wrote them""" if self.transport != 'ipc': return - for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): + for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port): ipcfile = "%s-%i" % (self.ip, port) try: os.remove(ipcfile) @@ -282,15 +284,19 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.shell_socket = context.socket(zmq.ROUTER) self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) - self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port) + self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) self.iopub_socket = context.socket(zmq.PUB) self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) - self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port) + self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) - self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) + self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) + + self.control_socket = context.socket(zmq.ROUTER) + self.control_port = self._bind_socket(self.control_socket, self.control_port) + self.log.debug("control ROUTER Channel on port: %i" % self.control_port) def init_heartbeat(self): """start the heart beating""" @@ -299,7 +305,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): hb_ctx = zmq.Context() self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port)) self.hb_port = self.heartbeat.port - self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) + self.log.debug("Heartbeat REP Channel on port: %i" % self.hb_port) self.heartbeat.start() # Helper to make it easier to connect to an existing kernel. @@ -321,7 +327,8 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.ports = dict(shell=self.shell_port, iopub=self.iopub_port, - stdin=self.stdin_port, hb=self.hb_port) + stdin=self.stdin_port, hb=self.hb_port, + control=self.control_port) def init_session(self): """create our session object""" @@ -353,11 +360,12 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): def init_kernel(self): """Create the Kernel object itself""" shell_stream = ZMQStream(self.shell_socket) + control_stream = ZMQStream(self.control_socket) kernel_factory = import_item(str(self.kernel_class)) kernel = kernel_factory(config=self.config, session=self.session, - shell_streams=[shell_stream], + shell_streams=[shell_stream, control_stream], iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, log=self.log,