Show More
@@ -30,6 +30,7 b' import stat' | |||
|
30 | 30 | import sys |
|
31 | 31 | |
|
32 | 32 | from multiprocessing import Process |
|
33 | from signal import signal, SIGINT, SIGABRT, SIGTERM | |
|
33 | 34 | |
|
34 | 35 | import zmq |
|
35 | 36 | from zmq.devices import ProcessMonitoredQueue |
@@ -55,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory' | |||
|
55 | 56 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
56 | 57 | from IPython.parallel.controller.sqlitedb import SQLiteDB |
|
57 | 58 | |
|
58 |
from IPython.parallel.util import |
|
|
59 | from IPython.parallel.util import split_url, disambiguate_url | |
|
59 | 60 | |
|
60 | 61 | # conditional import of MongoDB backend class |
|
61 | 62 | |
@@ -151,7 +152,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
151 | 152 | help="""Whether to create profile dir if it doesn't exist.""") |
|
152 | 153 | |
|
153 | 154 | reuse_files = Bool(False, config=True, |
|
154 |
help= |
|
|
155 | help="""Whether to reuse existing json connection files. | |
|
156 | If False, connection files will be removed on a clean exit. | |
|
157 | """ | |
|
155 | 158 | ) |
|
156 | 159 | ssh_server = Unicode(u'', config=True, |
|
157 | 160 | help="""ssh url for clients to use when connecting to the Controller |
@@ -193,6 +196,12 b' class IPControllerApp(BaseParallelApplication):' | |||
|
193 | 196 | def _use_threads_changed(self, name, old, new): |
|
194 | 197 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') |
|
195 | 198 | |
|
199 | write_connection_files = Bool(True, | |
|
200 | help="""Whether to write connection files to disk. | |
|
201 | True in all cases other than runs with `reuse_files=True` *after the first* | |
|
202 | """ | |
|
203 | ) | |
|
204 | ||
|
196 | 205 | aliases = Dict(aliases) |
|
197 | 206 | flags = Dict(flags) |
|
198 | 207 | |
@@ -256,6 +265,20 b' class IPControllerApp(BaseParallelApplication):' | |||
|
256 | 265 | self.ssh_server = cfg['ssh'] |
|
257 | 266 | assert int(ports) == c.HubFactory.regport, "regport mismatch" |
|
258 | 267 | |
|
268 | def cleanup_connection_files(self): | |
|
269 | if self.reuse_files: | |
|
270 | self.log.debug("leaving JSON connection files for reuse") | |
|
271 | return | |
|
272 | self.log.debug("cleaning up JSON connection files") | |
|
273 | for f in (self.client_json_file, self.engine_json_file): | |
|
274 | f = os.path.join(self.profile_dir.security_dir, f) | |
|
275 | try: | |
|
276 | os.remove(f) | |
|
277 | except Exception as e: | |
|
278 | self.log.error("Failed to cleanup connection file: %s", e) | |
|
279 | else: | |
|
280 | self.log.debug(u"removed %s", f) | |
|
281 | ||
|
259 | 282 | def load_secondary_config(self): |
|
260 | 283 | """secondary config, loading from JSON and setting defaults""" |
|
261 | 284 | if self.reuse_files: |
@@ -263,7 +286,11 b' class IPControllerApp(BaseParallelApplication):' | |||
|
263 | 286 | self.load_config_from_json() |
|
264 | 287 | except (AssertionError,IOError) as e: |
|
265 | 288 | self.log.error("Could not load config from JSON: %s" % e) |
|
266 | self.reuse_files=False | |
|
289 | else: | |
|
290 | # successfully loaded config from JSON, and reuse=True | |
|
291 | # no need to wite back the same file | |
|
292 | self.write_connection_files = False | |
|
293 | ||
|
267 | 294 | # switch Session.key default to secure |
|
268 | 295 | default_secure(self.config) |
|
269 | 296 | self.log.debug("Config changed") |
@@ -284,7 +311,7 b' class IPControllerApp(BaseParallelApplication):' | |||
|
284 | 311 | self.log.error("Couldn't construct the Controller", exc_info=True) |
|
285 | 312 | self.exit(1) |
|
286 | 313 | |
|
287 |
if |
|
|
314 | if self.write_connection_files: | |
|
288 | 315 | # save to new json config files |
|
289 | 316 | f = self.factory |
|
290 | 317 | cdict = {'exec_key' : f.session.key.decode('ascii'), |
@@ -298,7 +325,6 b' class IPControllerApp(BaseParallelApplication):' | |||
|
298 | 325 | edict['ssh'] = self.engine_ssh_server |
|
299 | 326 | self.save_connection_dict(self.engine_json_file, edict) |
|
300 | 327 | |
|
301 | # | |
|
302 | 328 | def init_schedulers(self): |
|
303 | 329 | children = self.children |
|
304 | 330 | mq = import_item(str(self.mq_class)) |
@@ -367,20 +393,30 b' class IPControllerApp(BaseParallelApplication):' | |||
|
367 | 393 | kwargs['in_thread'] = True |
|
368 | 394 | launch_scheduler(*sargs, **kwargs) |
|
369 | 395 | |
|
396 | def terminate_children(self): | |
|
397 | child_procs = [] | |
|
398 | for child in self.children: | |
|
399 | if isinstance(child, ProcessMonitoredQueue): | |
|
400 | child_procs.append(child.launcher) | |
|
401 | elif isinstance(child, Process): | |
|
402 | child_procs.append(child) | |
|
403 | if child_procs: | |
|
404 | self.log.critical("terminating children...") | |
|
405 | for child in child_procs: | |
|
406 | try: | |
|
407 | child.terminate() | |
|
408 | except OSError: | |
|
409 | # already dead | |
|
410 | pass | |
|
370 | 411 | |
|
371 | def save_urls(self): | |
|
372 | """save the registration urls to files.""" | |
|
373 | c = self.config | |
|
374 | ||
|
375 | sec_dir = self.profile_dir.security_dir | |
|
376 | cf = self.factory | |
|
377 | ||
|
378 | with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: | |
|
379 | f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) | |
|
380 | ||
|
381 | with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: | |
|
382 | f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) | |
|
412 | def handle_signal(self, sig, frame): | |
|
413 | self.log.critical("Received signal %i, shutting down", sig) | |
|
414 | self.terminate_children() | |
|
415 | self.loop.stop() | |
|
383 | 416 |
|
|
417 | def init_signal(self): | |
|
418 | for sig in (SIGINT, SIGABRT, SIGTERM): | |
|
419 | signal(sig, self.handle_signal) | |
|
384 | 420 | |
|
385 | 421 | def do_import_statements(self): |
|
386 | 422 | statements = self.import_statements |
@@ -415,15 +451,11 b' class IPControllerApp(BaseParallelApplication):' | |||
|
415 | 451 | def start(self): |
|
416 | 452 | # Start the subprocesses: |
|
417 | 453 | self.factory.start() |
|
418 | child_procs = [] | |
|
454 | # children must be started before signals are setup, | |
|
455 | # otherwise signal-handling will fire multiple times | |
|
419 | 456 | for child in self.children: |
|
420 | 457 | child.start() |
|
421 | if isinstance(child, ProcessMonitoredQueue): | |
|
422 | child_procs.append(child.launcher) | |
|
423 | elif isinstance(child, Process): | |
|
424 | child_procs.append(child) | |
|
425 | if child_procs: | |
|
426 | signal_children(child_procs) | |
|
458 | self.init_signal() | |
|
427 | 459 | |
|
428 | 460 | self.write_pid_file(overwrite=True) |
|
429 | 461 | |
@@ -431,6 +463,8 b' class IPControllerApp(BaseParallelApplication):' | |||
|
431 | 463 | self.factory.loop.start() |
|
432 | 464 | except KeyboardInterrupt: |
|
433 | 465 | self.log.critical("Interrupted, Exiting...\n") |
|
466 | finally: | |
|
467 | self.cleanup_connection_files() | |
|
434 | 468 | |
|
435 | 469 | |
|
436 | 470 |
General Comments 0
You need to be logged in to leave comments.
Login now