##// END OF EJS Templates
Merge pull request #1372 from minrk/reuse-cleanup...
Min RK -
r6080:c1c517a0 merge
parent child Browse files
Show More
@@ -30,6 +30,7 b' import stat'
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 from signal import signal, SIGINT, SIGABRT, SIGTERM
33 34
34 35 import zmq
35 36 from zmq.devices import ProcessMonitoredQueue
@@ -55,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory'
55 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 58
58 from IPython.parallel.util import signal_children, split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url
59 60
60 61 # conditional import of MongoDB backend class
61 62
@@ -151,7 +152,9 b' class IPControllerApp(BaseParallelApplication):'
151 152 help="""Whether to create profile dir if it doesn't exist.""")
152 153
153 154 reuse_files = Bool(False, config=True,
154 help='Whether to reuse existing json connection files.'
155 help="""Whether to reuse existing json connection files.
156 If False, connection files will be removed on a clean exit.
157 """
155 158 )
156 159 ssh_server = Unicode(u'', config=True,
157 160 help="""ssh url for clients to use when connecting to the Controller
@@ -192,6 +195,12 b' class IPControllerApp(BaseParallelApplication):'
192 195
193 196 def _use_threads_changed(self, name, old, new):
194 197 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
198
199 write_connection_files = Bool(True,
200 help="""Whether to write connection files to disk.
201 True in all cases other than runs with `reuse_files=True` *after the first*
202 """
203 )
195 204
196 205 aliases = Dict(aliases)
197 206 flags = Dict(flags)
@@ -256,6 +265,20 b' class IPControllerApp(BaseParallelApplication):'
256 265 self.ssh_server = cfg['ssh']
257 266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
258 267
268 def cleanup_connection_files(self):
269 if self.reuse_files:
270 self.log.debug("leaving JSON connection files for reuse")
271 return
272 self.log.debug("cleaning up JSON connection files")
273 for f in (self.client_json_file, self.engine_json_file):
274 f = os.path.join(self.profile_dir.security_dir, f)
275 try:
276 os.remove(f)
277 except Exception as e:
278 self.log.error("Failed to cleanup connection file: %s", e)
279 else:
280 self.log.debug(u"removed %s", f)
281
259 282 def load_secondary_config(self):
260 283 """secondary config, loading from JSON and setting defaults"""
261 284 if self.reuse_files:
@@ -263,7 +286,11 b' class IPControllerApp(BaseParallelApplication):'
263 286 self.load_config_from_json()
264 287 except (AssertionError,IOError) as e:
265 288 self.log.error("Could not load config from JSON: %s" % e)
266 self.reuse_files=False
289 else:
290 # successfully loaded config from JSON, and reuse=True
291 # no need to wite back the same file
292 self.write_connection_files = False
293
267 294 # switch Session.key default to secure
268 295 default_secure(self.config)
269 296 self.log.debug("Config changed")
@@ -284,7 +311,7 b' class IPControllerApp(BaseParallelApplication):'
284 311 self.log.error("Couldn't construct the Controller", exc_info=True)
285 312 self.exit(1)
286 313
287 if not self.reuse_files:
314 if self.write_connection_files:
288 315 # save to new json config files
289 316 f = self.factory
290 317 cdict = {'exec_key' : f.session.key.decode('ascii'),
@@ -298,7 +325,6 b' class IPControllerApp(BaseParallelApplication):'
298 325 edict['ssh'] = self.engine_ssh_server
299 326 self.save_connection_dict(self.engine_json_file, edict)
300 327
301 #
302 328 def init_schedulers(self):
303 329 children = self.children
304 330 mq = import_item(str(self.mq_class))
@@ -367,21 +393,31 b' class IPControllerApp(BaseParallelApplication):'
367 393 kwargs['in_thread'] = True
368 394 launch_scheduler(*sargs, **kwargs)
369 395
396 def terminate_children(self):
397 child_procs = []
398 for child in self.children:
399 if isinstance(child, ProcessMonitoredQueue):
400 child_procs.append(child.launcher)
401 elif isinstance(child, Process):
402 child_procs.append(child)
403 if child_procs:
404 self.log.critical("terminating children...")
405 for child in child_procs:
406 try:
407 child.terminate()
408 except OSError:
409 # already dead
410 pass
370 411
371 def save_urls(self):
372 """save the registration urls to files."""
373 c = self.config
374
375 sec_dir = self.profile_dir.security_dir
376 cf = self.factory
377
378 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
379 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
380
381 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
382 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
383
412 def handle_signal(self, sig, frame):
413 self.log.critical("Received signal %i, shutting down", sig)
414 self.terminate_children()
415 self.loop.stop()
384 416
417 def init_signal(self):
418 for sig in (SIGINT, SIGABRT, SIGTERM):
419 signal(sig, self.handle_signal)
420
385 421 def do_import_statements(self):
386 422 statements = self.import_statements
387 423 for s in statements:
@@ -415,15 +451,11 b' class IPControllerApp(BaseParallelApplication):'
415 451 def start(self):
416 452 # Start the subprocesses:
417 453 self.factory.start()
418 child_procs = []
454 # children must be started before signals are setup,
455 # otherwise signal-handling will fire multiple times
419 456 for child in self.children:
420 457 child.start()
421 if isinstance(child, ProcessMonitoredQueue):
422 child_procs.append(child.launcher)
423 elif isinstance(child, Process):
424 child_procs.append(child)
425 if child_procs:
426 signal_children(child_procs)
458 self.init_signal()
427 459
428 460 self.write_pid_file(overwrite=True)
429 461
@@ -431,6 +463,8 b' class IPControllerApp(BaseParallelApplication):'
431 463 self.factory.loop.start()
432 464 except KeyboardInterrupt:
433 465 self.log.critical("Interrupted, Exiting...\n")
466 finally:
467 self.cleanup_connection_files()
434 468
435 469
436 470
@@ -722,5 +722,5 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
722 722 try:
723 723 loop.start()
724 724 except KeyboardInterrupt:
725 print ("interrupted, exiting...", file=sys.__stderr__)
725 scheduler.log.critical("Interrupted, exiting...")
726 726
General Comments 0
You need to be logged in to leave comments. Login now