diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 74e67ae..ead40a7 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -30,6 +30,7 @@ import stat import sys from multiprocessing import Process +from signal import signal, SIGINT, SIGABRT, SIGTERM import zmq from zmq.devices import ProcessMonitoredQueue @@ -55,7 +56,7 @@ from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.sqlitedb import SQLiteDB -from IPython.parallel.util import signal_children, split_url, disambiguate_url +from IPython.parallel.util import split_url, disambiguate_url # conditional import of MongoDB backend class @@ -151,7 +152,9 @@ class IPControllerApp(BaseParallelApplication): help="""Whether to create profile dir if it doesn't exist.""") reuse_files = Bool(False, config=True, - help='Whether to reuse existing json connection files.' + help="""Whether to reuse existing json connection files. + If False, connection files will be removed on a clean exit. + """ ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller @@ -192,6 +195,12 @@ class IPControllerApp(BaseParallelApplication): def _use_threads_changed(self, name, old, new): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') + + write_connection_files = Bool(True, + help="""Whether to write connection files to disk. + True in all cases other than runs with `reuse_files=True` *after the first* + """ + ) aliases = Dict(aliases) flags = Dict(flags) @@ -256,6 +265,20 @@ class IPControllerApp(BaseParallelApplication): self.ssh_server = cfg['ssh'] assert int(ports) == c.HubFactory.regport, "regport mismatch" + def cleanup_connection_files(self): + if self.reuse_files: + self.log.debug("leaving JSON connection files for reuse") + return + self.log.debug("cleaning up JSON connection files") + for f in (self.client_json_file, self.engine_json_file): + f = os.path.join(self.profile_dir.security_dir, f) + try: + os.remove(f) + except Exception as e: + self.log.error("Failed to cleanup connection file: %s", e) + else: + self.log.debug(u"removed %s", f) + def load_secondary_config(self): """secondary config, loading from JSON and setting defaults""" if self.reuse_files: @@ -263,7 +286,11 @@ class IPControllerApp(BaseParallelApplication): self.load_config_from_json() except (AssertionError,IOError) as e: self.log.error("Could not load config from JSON: %s" % e) - self.reuse_files=False + else: + # successfully loaded config from JSON, and reuse=True + # no need to wite back the same file + self.write_connection_files = False + # switch Session.key default to secure default_secure(self.config) self.log.debug("Config changed") @@ -284,7 +311,7 @@ class IPControllerApp(BaseParallelApplication): self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) - if not self.reuse_files: + if self.write_connection_files: # save to new json config files f = self.factory cdict = {'exec_key' : f.session.key.decode('ascii'), @@ -298,7 +325,6 @@ class IPControllerApp(BaseParallelApplication): edict['ssh'] = self.engine_ssh_server self.save_connection_dict(self.engine_json_file, edict) - # def init_schedulers(self): children = self.children mq = import_item(str(self.mq_class)) @@ -367,21 +393,31 @@ class IPControllerApp(BaseParallelApplication): kwargs['in_thread'] = True launch_scheduler(*sargs, **kwargs) + def terminate_children(self): + child_procs = [] + for child in self.children: + if isinstance(child, ProcessMonitoredQueue): + child_procs.append(child.launcher) + elif isinstance(child, Process): + child_procs.append(child) + if child_procs: + self.log.critical("terminating children...") + for child in child_procs: + try: + child.terminate() + except OSError: + # already dead + pass - def save_urls(self): - """save the registration urls to files.""" - c = self.config - - sec_dir = self.profile_dir.security_dir - cf = self.factory - - with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: - f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) - - with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: - f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) - + def handle_signal(self, sig, frame): + self.log.critical("Received signal %i, shutting down", sig) + self.terminate_children() + self.loop.stop() + def init_signal(self): + for sig in (SIGINT, SIGABRT, SIGTERM): + signal(sig, self.handle_signal) + def do_import_statements(self): statements = self.import_statements for s in statements: @@ -415,15 +451,11 @@ class IPControllerApp(BaseParallelApplication): def start(self): # Start the subprocesses: self.factory.start() - child_procs = [] + # children must be started before signals are setup, + # otherwise signal-handling will fire multiple times for child in self.children: child.start() - if isinstance(child, ProcessMonitoredQueue): - child_procs.append(child.launcher) - elif isinstance(child, Process): - child_procs.append(child) - if child_procs: - signal_children(child_procs) + self.init_signal() self.write_pid_file(overwrite=True) @@ -431,6 +463,8 @@ class IPControllerApp(BaseParallelApplication): self.factory.loop.start() except KeyboardInterrupt: self.log.critical("Interrupted, Exiting...\n") + finally: + self.cleanup_connection_files() diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 7c2c13d..bf9a32e 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -722,5 +722,5 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, try: loop.start() except KeyboardInterrupt: - print ("interrupted, exiting...", file=sys.__stderr__) + scheduler.log.critical("Interrupted, exiting...")