Show More
@@ -30,6 +30,7 b' import stat' | |||||
30 | import sys |
|
30 | import sys | |
31 |
|
31 | |||
32 | from multiprocessing import Process |
|
32 | from multiprocessing import Process | |
|
33 | from signal import signal, SIGINT, SIGABRT, SIGTERM | |||
33 |
|
34 | |||
34 | import zmq |
|
35 | import zmq | |
35 | from zmq.devices import ProcessMonitoredQueue |
|
36 | from zmq.devices import ProcessMonitoredQueue | |
@@ -55,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory' | |||||
55 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
56 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | |
56 | from IPython.parallel.controller.sqlitedb import SQLiteDB |
|
57 | from IPython.parallel.controller.sqlitedb import SQLiteDB | |
57 |
|
58 | |||
58 |
from IPython.parallel.util import |
|
59 | from IPython.parallel.util import split_url, disambiguate_url | |
59 |
|
60 | |||
60 | # conditional import of MongoDB backend class |
|
61 | # conditional import of MongoDB backend class | |
61 |
|
62 | |||
@@ -151,7 +152,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
151 | help="""Whether to create profile dir if it doesn't exist.""") |
|
152 | help="""Whether to create profile dir if it doesn't exist.""") | |
152 |
|
153 | |||
153 | reuse_files = Bool(False, config=True, |
|
154 | reuse_files = Bool(False, config=True, | |
154 |
help= |
|
155 | help="""Whether to reuse existing json connection files. | |
|
156 | If False, connection files will be removed on a clean exit. | |||
|
157 | """ | |||
155 | ) |
|
158 | ) | |
156 | ssh_server = Unicode(u'', config=True, |
|
159 | ssh_server = Unicode(u'', config=True, | |
157 | help="""ssh url for clients to use when connecting to the Controller |
|
160 | help="""ssh url for clients to use when connecting to the Controller | |
@@ -192,6 +195,12 b' class IPControllerApp(BaseParallelApplication):' | |||||
192 |
|
195 | |||
193 | def _use_threads_changed(self, name, old, new): |
|
196 | def _use_threads_changed(self, name, old, new): | |
194 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') |
|
197 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') | |
|
198 | ||||
|
199 | write_connection_files = Bool(True, | |||
|
200 | help="""Whether to write connection files to disk. | |||
|
201 | True in all cases other than runs with `reuse_files=True` *after the first* | |||
|
202 | """ | |||
|
203 | ) | |||
195 |
|
204 | |||
196 | aliases = Dict(aliases) |
|
205 | aliases = Dict(aliases) | |
197 | flags = Dict(flags) |
|
206 | flags = Dict(flags) | |
@@ -256,6 +265,20 b' class IPControllerApp(BaseParallelApplication):' | |||||
256 | self.ssh_server = cfg['ssh'] |
|
265 | self.ssh_server = cfg['ssh'] | |
257 | assert int(ports) == c.HubFactory.regport, "regport mismatch" |
|
266 | assert int(ports) == c.HubFactory.regport, "regport mismatch" | |
258 |
|
267 | |||
|
268 | def cleanup_connection_files(self): | |||
|
269 | if self.reuse_files: | |||
|
270 | self.log.debug("leaving JSON connection files for reuse") | |||
|
271 | return | |||
|
272 | self.log.debug("cleaning up JSON connection files") | |||
|
273 | for f in (self.client_json_file, self.engine_json_file): | |||
|
274 | f = os.path.join(self.profile_dir.security_dir, f) | |||
|
275 | try: | |||
|
276 | os.remove(f) | |||
|
277 | except Exception as e: | |||
|
278 | self.log.error("Failed to cleanup connection file: %s", e) | |||
|
279 | else: | |||
|
280 | self.log.debug(u"removed %s", f) | |||
|
281 | ||||
259 | def load_secondary_config(self): |
|
282 | def load_secondary_config(self): | |
260 | """secondary config, loading from JSON and setting defaults""" |
|
283 | """secondary config, loading from JSON and setting defaults""" | |
261 | if self.reuse_files: |
|
284 | if self.reuse_files: | |
@@ -263,7 +286,11 b' class IPControllerApp(BaseParallelApplication):' | |||||
263 | self.load_config_from_json() |
|
286 | self.load_config_from_json() | |
264 | except (AssertionError,IOError) as e: |
|
287 | except (AssertionError,IOError) as e: | |
265 | self.log.error("Could not load config from JSON: %s" % e) |
|
288 | self.log.error("Could not load config from JSON: %s" % e) | |
266 | self.reuse_files=False |
|
289 | else: | |
|
290 | # successfully loaded config from JSON, and reuse=True | |||
|
291 | # no need to wite back the same file | |||
|
292 | self.write_connection_files = False | |||
|
293 | ||||
267 | # switch Session.key default to secure |
|
294 | # switch Session.key default to secure | |
268 | default_secure(self.config) |
|
295 | default_secure(self.config) | |
269 | self.log.debug("Config changed") |
|
296 | self.log.debug("Config changed") | |
@@ -284,7 +311,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
284 | self.log.error("Couldn't construct the Controller", exc_info=True) |
|
311 | self.log.error("Couldn't construct the Controller", exc_info=True) | |
285 | self.exit(1) |
|
312 | self.exit(1) | |
286 |
|
313 | |||
287 |
if |
|
314 | if self.write_connection_files: | |
288 | # save to new json config files |
|
315 | # save to new json config files | |
289 | f = self.factory |
|
316 | f = self.factory | |
290 | cdict = {'exec_key' : f.session.key.decode('ascii'), |
|
317 | cdict = {'exec_key' : f.session.key.decode('ascii'), | |
@@ -298,7 +325,6 b' class IPControllerApp(BaseParallelApplication):' | |||||
298 | edict['ssh'] = self.engine_ssh_server |
|
325 | edict['ssh'] = self.engine_ssh_server | |
299 | self.save_connection_dict(self.engine_json_file, edict) |
|
326 | self.save_connection_dict(self.engine_json_file, edict) | |
300 |
|
327 | |||
301 | # |
|
|||
302 | def init_schedulers(self): |
|
328 | def init_schedulers(self): | |
303 | children = self.children |
|
329 | children = self.children | |
304 | mq = import_item(str(self.mq_class)) |
|
330 | mq = import_item(str(self.mq_class)) | |
@@ -367,21 +393,31 b' class IPControllerApp(BaseParallelApplication):' | |||||
367 | kwargs['in_thread'] = True |
|
393 | kwargs['in_thread'] = True | |
368 | launch_scheduler(*sargs, **kwargs) |
|
394 | launch_scheduler(*sargs, **kwargs) | |
369 |
|
395 | |||
|
396 | def terminate_children(self): | |||
|
397 | child_procs = [] | |||
|
398 | for child in self.children: | |||
|
399 | if isinstance(child, ProcessMonitoredQueue): | |||
|
400 | child_procs.append(child.launcher) | |||
|
401 | elif isinstance(child, Process): | |||
|
402 | child_procs.append(child) | |||
|
403 | if child_procs: | |||
|
404 | self.log.critical("terminating children...") | |||
|
405 | for child in child_procs: | |||
|
406 | try: | |||
|
407 | child.terminate() | |||
|
408 | except OSError: | |||
|
409 | # already dead | |||
|
410 | pass | |||
370 |
|
411 | |||
371 | def save_urls(self): |
|
412 | def handle_signal(self, sig, frame): | |
372 | """save the registration urls to files.""" |
|
413 | self.log.critical("Received signal %i, shutting down", sig) | |
373 | c = self.config |
|
414 | self.terminate_children() | |
374 |
|
415 | self.loop.stop() | ||
375 | sec_dir = self.profile_dir.security_dir |
|
|||
376 | cf = self.factory |
|
|||
377 |
|
||||
378 | with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: |
|
|||
379 | f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) |
|
|||
380 |
|
||||
381 | with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: |
|
|||
382 | f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) |
|
|||
383 |
|
||||
384 |
|
416 | |||
|
417 | def init_signal(self): | |||
|
418 | for sig in (SIGINT, SIGABRT, SIGTERM): | |||
|
419 | signal(sig, self.handle_signal) | |||
|
420 | ||||
385 | def do_import_statements(self): |
|
421 | def do_import_statements(self): | |
386 | statements = self.import_statements |
|
422 | statements = self.import_statements | |
387 | for s in statements: |
|
423 | for s in statements: | |
@@ -415,15 +451,11 b' class IPControllerApp(BaseParallelApplication):' | |||||
415 | def start(self): |
|
451 | def start(self): | |
416 | # Start the subprocesses: |
|
452 | # Start the subprocesses: | |
417 | self.factory.start() |
|
453 | self.factory.start() | |
418 | child_procs = [] |
|
454 | # children must be started before signals are setup, | |
|
455 | # otherwise signal-handling will fire multiple times | |||
419 | for child in self.children: |
|
456 | for child in self.children: | |
420 | child.start() |
|
457 | child.start() | |
421 | if isinstance(child, ProcessMonitoredQueue): |
|
458 | self.init_signal() | |
422 | child_procs.append(child.launcher) |
|
|||
423 | elif isinstance(child, Process): |
|
|||
424 | child_procs.append(child) |
|
|||
425 | if child_procs: |
|
|||
426 | signal_children(child_procs) |
|
|||
427 |
|
459 | |||
428 | self.write_pid_file(overwrite=True) |
|
460 | self.write_pid_file(overwrite=True) | |
429 |
|
461 | |||
@@ -431,6 +463,8 b' class IPControllerApp(BaseParallelApplication):' | |||||
431 | self.factory.loop.start() |
|
463 | self.factory.loop.start() | |
432 | except KeyboardInterrupt: |
|
464 | except KeyboardInterrupt: | |
433 | self.log.critical("Interrupted, Exiting...\n") |
|
465 | self.log.critical("Interrupted, Exiting...\n") | |
|
466 | finally: | |||
|
467 | self.cleanup_connection_files() | |||
434 |
|
468 | |||
435 |
|
469 | |||
436 |
|
470 |
@@ -722,5 +722,5 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
722 | try: |
|
722 | try: | |
723 | loop.start() |
|
723 | loop.start() | |
724 | except KeyboardInterrupt: |
|
724 | except KeyboardInterrupt: | |
725 |
|
|
725 | scheduler.log.critical("Interrupted, exiting...") | |
726 |
|
726 |
General Comments 0
You need to be logged in to leave comments.
Login now