From c1c517a0c509105e5b7194380ac50b9a7232b4b6 2012-02-06 22:03:18 From: Min RK Date: 2012-02-06 22:03:18 Subject: [PATCH] Merge pull request #1372 from minrk/reuse-cleanup ipcontroller cleans up connection files unless reuse=True Connection files are not valid across sessions if reuse is False, but were previously preserved, which could cause engines to try to connect to the wrong ports if the Controller took too long to start and/or write new connection files. Also shuffles signal handling around a bit, so that a clean exit occurs, rather than calling sys.exit directly in the signal handler. --- diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 74e67ae..ead40a7 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -30,6 +30,7 @@ import stat import sys from multiprocessing import Process +from signal import signal, SIGINT, SIGABRT, SIGTERM import zmq from zmq.devices import ProcessMonitoredQueue @@ -55,7 +56,7 @@ from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.sqlitedb import SQLiteDB -from IPython.parallel.util import signal_children, split_url, disambiguate_url +from IPython.parallel.util import split_url, disambiguate_url # conditional import of MongoDB backend class @@ -151,7 +152,9 @@ class IPControllerApp(BaseParallelApplication): help="""Whether to create profile dir if it doesn't exist.""") reuse_files = Bool(False, config=True, - help='Whether to reuse existing json connection files.' + help="""Whether to reuse existing json connection files. + If False, connection files will be removed on a clean exit. + """ ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller @@ -192,6 +195,12 @@ class IPControllerApp(BaseParallelApplication): def _use_threads_changed(self, name, old, new): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') + + write_connection_files = Bool(True, + help="""Whether to write connection files to disk. + True in all cases other than runs with `reuse_files=True` *after the first* + """ + ) aliases = Dict(aliases) flags = Dict(flags) @@ -256,6 +265,20 @@ class IPControllerApp(BaseParallelApplication): self.ssh_server = cfg['ssh'] assert int(ports) == c.HubFactory.regport, "regport mismatch" + def cleanup_connection_files(self): + if self.reuse_files: + self.log.debug("leaving JSON connection files for reuse") + return + self.log.debug("cleaning up JSON connection files") + for f in (self.client_json_file, self.engine_json_file): + f = os.path.join(self.profile_dir.security_dir, f) + try: + os.remove(f) + except Exception as e: + self.log.error("Failed to cleanup connection file: %s", e) + else: + self.log.debug(u"removed %s", f) + def load_secondary_config(self): """secondary config, loading from JSON and setting defaults""" if self.reuse_files: @@ -263,7 +286,11 @@ class IPControllerApp(BaseParallelApplication): self.load_config_from_json() except (AssertionError,IOError) as e: self.log.error("Could not load config from JSON: %s" % e) - self.reuse_files=False + else: + # successfully loaded config from JSON, and reuse=True + # no need to wite back the same file + self.write_connection_files = False + # switch Session.key default to secure default_secure(self.config) self.log.debug("Config changed") @@ -284,7 +311,7 @@ class IPControllerApp(BaseParallelApplication): self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) - if not self.reuse_files: + if self.write_connection_files: # save to new json config files f = self.factory cdict = {'exec_key' : f.session.key.decode('ascii'), @@ -298,7 +325,6 @@ class IPControllerApp(BaseParallelApplication): edict['ssh'] = self.engine_ssh_server self.save_connection_dict(self.engine_json_file, edict) - # def init_schedulers(self): children = self.children mq = import_item(str(self.mq_class)) @@ -367,21 +393,31 @@ class IPControllerApp(BaseParallelApplication): kwargs['in_thread'] = True launch_scheduler(*sargs, **kwargs) + def terminate_children(self): + child_procs = [] + for child in self.children: + if isinstance(child, ProcessMonitoredQueue): + child_procs.append(child.launcher) + elif isinstance(child, Process): + child_procs.append(child) + if child_procs: + self.log.critical("terminating children...") + for child in child_procs: + try: + child.terminate() + except OSError: + # already dead + pass - def save_urls(self): - """save the registration urls to files.""" - c = self.config - - sec_dir = self.profile_dir.security_dir - cf = self.factory - - with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: - f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) - - with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: - f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) - + def handle_signal(self, sig, frame): + self.log.critical("Received signal %i, shutting down", sig) + self.terminate_children() + self.loop.stop() + def init_signal(self): + for sig in (SIGINT, SIGABRT, SIGTERM): + signal(sig, self.handle_signal) + def do_import_statements(self): statements = self.import_statements for s in statements: @@ -415,15 +451,11 @@ class IPControllerApp(BaseParallelApplication): def start(self): # Start the subprocesses: self.factory.start() - child_procs = [] + # children must be started before signals are setup, + # otherwise signal-handling will fire multiple times for child in self.children: child.start() - if isinstance(child, ProcessMonitoredQueue): - child_procs.append(child.launcher) - elif isinstance(child, Process): - child_procs.append(child) - if child_procs: - signal_children(child_procs) + self.init_signal() self.write_pid_file(overwrite=True) @@ -431,6 +463,8 @@ class IPControllerApp(BaseParallelApplication): self.factory.loop.start() except KeyboardInterrupt: self.log.critical("Interrupted, Exiting...\n") + finally: + self.cleanup_connection_files() diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 7c2c13d..bf9a32e 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -722,5 +722,5 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, try: loop.start() except KeyboardInterrupt: - print ("interrupted, exiting...", file=sys.__stderr__) + scheduler.log.critical("Interrupted, exiting...")