##// END OF EJS Templates
Merge pull request #1372 from minrk/reuse-cleanup...
Min RK -
r6080:c1c517a0 merge
parent child Browse files
Show More
@@ -30,6 +30,7 b' import stat'
30 import sys
30 import sys
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33 from signal import signal, SIGINT, SIGABRT, SIGTERM
33
34
34 import zmq
35 import zmq
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.devices import ProcessMonitoredQueue
@@ -55,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory'
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57
58
58 from IPython.parallel.util import signal_children, split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url
59
60
60 # conditional import of MongoDB backend class
61 # conditional import of MongoDB backend class
61
62
@@ -151,7 +152,9 b' class IPControllerApp(BaseParallelApplication):'
151 help="""Whether to create profile dir if it doesn't exist.""")
152 help="""Whether to create profile dir if it doesn't exist.""")
152
153
153 reuse_files = Bool(False, config=True,
154 reuse_files = Bool(False, config=True,
154 help='Whether to reuse existing json connection files.'
155 help="""Whether to reuse existing json connection files.
156 If False, connection files will be removed on a clean exit.
157 """
155 )
158 )
156 ssh_server = Unicode(u'', config=True,
159 ssh_server = Unicode(u'', config=True,
157 help="""ssh url for clients to use when connecting to the Controller
160 help="""ssh url for clients to use when connecting to the Controller
@@ -193,6 +196,12 b' class IPControllerApp(BaseParallelApplication):'
193 def _use_threads_changed(self, name, old, new):
196 def _use_threads_changed(self, name, old, new):
194 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
197 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
195
198
199 write_connection_files = Bool(True,
200 help="""Whether to write connection files to disk.
201 True in all cases other than runs with `reuse_files=True` *after the first*
202 """
203 )
204
196 aliases = Dict(aliases)
205 aliases = Dict(aliases)
197 flags = Dict(flags)
206 flags = Dict(flags)
198
207
@@ -256,6 +265,20 b' class IPControllerApp(BaseParallelApplication):'
256 self.ssh_server = cfg['ssh']
265 self.ssh_server = cfg['ssh']
257 assert int(ports) == c.HubFactory.regport, "regport mismatch"
266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
258
267
268 def cleanup_connection_files(self):
269 if self.reuse_files:
270 self.log.debug("leaving JSON connection files for reuse")
271 return
272 self.log.debug("cleaning up JSON connection files")
273 for f in (self.client_json_file, self.engine_json_file):
274 f = os.path.join(self.profile_dir.security_dir, f)
275 try:
276 os.remove(f)
277 except Exception as e:
278 self.log.error("Failed to cleanup connection file: %s", e)
279 else:
280 self.log.debug(u"removed %s", f)
281
259 def load_secondary_config(self):
282 def load_secondary_config(self):
260 """secondary config, loading from JSON and setting defaults"""
283 """secondary config, loading from JSON and setting defaults"""
261 if self.reuse_files:
284 if self.reuse_files:
@@ -263,7 +286,11 b' class IPControllerApp(BaseParallelApplication):'
263 self.load_config_from_json()
286 self.load_config_from_json()
264 except (AssertionError,IOError) as e:
287 except (AssertionError,IOError) as e:
265 self.log.error("Could not load config from JSON: %s" % e)
288 self.log.error("Could not load config from JSON: %s" % e)
266 self.reuse_files=False
289 else:
290 # successfully loaded config from JSON, and reuse=True
291 # no need to wite back the same file
292 self.write_connection_files = False
293
267 # switch Session.key default to secure
294 # switch Session.key default to secure
268 default_secure(self.config)
295 default_secure(self.config)
269 self.log.debug("Config changed")
296 self.log.debug("Config changed")
@@ -284,7 +311,7 b' class IPControllerApp(BaseParallelApplication):'
284 self.log.error("Couldn't construct the Controller", exc_info=True)
311 self.log.error("Couldn't construct the Controller", exc_info=True)
285 self.exit(1)
312 self.exit(1)
286
313
287 if not self.reuse_files:
314 if self.write_connection_files:
288 # save to new json config files
315 # save to new json config files
289 f = self.factory
316 f = self.factory
290 cdict = {'exec_key' : f.session.key.decode('ascii'),
317 cdict = {'exec_key' : f.session.key.decode('ascii'),
@@ -298,7 +325,6 b' class IPControllerApp(BaseParallelApplication):'
298 edict['ssh'] = self.engine_ssh_server
325 edict['ssh'] = self.engine_ssh_server
299 self.save_connection_dict(self.engine_json_file, edict)
326 self.save_connection_dict(self.engine_json_file, edict)
300
327
301 #
302 def init_schedulers(self):
328 def init_schedulers(self):
303 children = self.children
329 children = self.children
304 mq = import_item(str(self.mq_class))
330 mq = import_item(str(self.mq_class))
@@ -367,20 +393,30 b' class IPControllerApp(BaseParallelApplication):'
367 kwargs['in_thread'] = True
393 kwargs['in_thread'] = True
368 launch_scheduler(*sargs, **kwargs)
394 launch_scheduler(*sargs, **kwargs)
369
395
396 def terminate_children(self):
397 child_procs = []
398 for child in self.children:
399 if isinstance(child, ProcessMonitoredQueue):
400 child_procs.append(child.launcher)
401 elif isinstance(child, Process):
402 child_procs.append(child)
403 if child_procs:
404 self.log.critical("terminating children...")
405 for child in child_procs:
406 try:
407 child.terminate()
408 except OSError:
409 # already dead
410 pass
370
411
371 def save_urls(self):
412 def handle_signal(self, sig, frame):
372 """save the registration urls to files."""
413 self.log.critical("Received signal %i, shutting down", sig)
373 c = self.config
414 self.terminate_children()
374
415 self.loop.stop()
375 sec_dir = self.profile_dir.security_dir
376 cf = self.factory
377
378 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
379 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
380
381 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
382 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
383
416
417 def init_signal(self):
418 for sig in (SIGINT, SIGABRT, SIGTERM):
419 signal(sig, self.handle_signal)
384
420
385 def do_import_statements(self):
421 def do_import_statements(self):
386 statements = self.import_statements
422 statements = self.import_statements
@@ -415,15 +451,11 b' class IPControllerApp(BaseParallelApplication):'
415 def start(self):
451 def start(self):
416 # Start the subprocesses:
452 # Start the subprocesses:
417 self.factory.start()
453 self.factory.start()
418 child_procs = []
454 # children must be started before signals are setup,
455 # otherwise signal-handling will fire multiple times
419 for child in self.children:
456 for child in self.children:
420 child.start()
457 child.start()
421 if isinstance(child, ProcessMonitoredQueue):
458 self.init_signal()
422 child_procs.append(child.launcher)
423 elif isinstance(child, Process):
424 child_procs.append(child)
425 if child_procs:
426 signal_children(child_procs)
427
459
428 self.write_pid_file(overwrite=True)
460 self.write_pid_file(overwrite=True)
429
461
@@ -431,6 +463,8 b' class IPControllerApp(BaseParallelApplication):'
431 self.factory.loop.start()
463 self.factory.loop.start()
432 except KeyboardInterrupt:
464 except KeyboardInterrupt:
433 self.log.critical("Interrupted, Exiting...\n")
465 self.log.critical("Interrupted, Exiting...\n")
466 finally:
467 self.cleanup_connection_files()
434
468
435
469
436
470
@@ -722,5 +722,5 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
722 try:
722 try:
723 loop.start()
723 loop.start()
724 except KeyboardInterrupt:
724 except KeyboardInterrupt:
725 print ("interrupted, exiting...", file=sys.__stderr__)
725 scheduler.log.critical("Interrupted, exiting...")
726
726
General Comments 0
You need to be logged in to leave comments. Login now