ipcontrollerapp.py
548 lines
| 20.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython controller application. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Brian Granger | ||||
* MinRK | ||||
MinRK
|
r3604 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r12591 | # Copyright (C) 2008 The IPython Development Team | ||
MinRK
|
r3604 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import with_statement | ||||
MinRK
|
r5429 | import json | ||
MinRK
|
r3604 | import os | ||
MinRK
|
r3631 | import stat | ||
import sys | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | from multiprocessing import Process | ||
MinRK
|
r6072 | from signal import signal, SIGINT, SIGABRT, SIGTERM | ||
MinRK
|
r3985 | |||
MinRK
|
r3604 | import zmq | ||
MinRK
|
r3985 | from zmq.devices import ProcessMonitoredQueue | ||
MinRK
|
r3604 | from zmq.log.handlers import PUBHandler | ||
Fernando Perez
|
r5417 | |||
MinRK
|
r4024 | from IPython.core.profiledir import ProfileDir | ||
MinRK
|
r3688 | |||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import ( | ||
MinRK
|
r3992 | BaseParallelApplication, | ||
MinRK
|
r4115 | base_aliases, | ||
base_flags, | ||||
MinRK
|
r5214 | catch_config_error, | ||
MinRK
|
r3604 | ) | ||
MinRK
|
r3985 | from IPython.utils.importstring import import_item | ||
MinRK
|
r12591 | from IPython.utils.localinterfaces import localhost, public_ips | ||
MinRK
|
r5172 | from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError | ||
MinRK
|
r3985 | |||
MinRK
|
r9372 | from IPython.kernel.zmq.session import ( | ||
Min RK
|
r20570 | Session, session_aliases, session_flags, | ||
MinRK
|
r4962 | ) | ||
MinRK
|
r3985 | from IPython.parallel.controller.heartmonitor import HeartMonitor | ||
MinRK
|
r3992 | from IPython.parallel.controller.hub import HubFactory | ||
MinRK
|
r3985 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | ||
MinRK
|
r9866 | from IPython.parallel.controller.dictdb import DictDB | ||
MinRK
|
r3985 | |||
MinRK
|
r10614 | from IPython.parallel.util import split_url, disambiguate_url, set_hwm | ||
MinRK
|
r3604 | |||
MinRK
|
r9866 | # conditional import of SQLiteDB / MongoDB backend class | ||
real_dbs = [] | ||||
try: | ||||
from IPython.parallel.controller.sqlitedb import SQLiteDB | ||||
except ImportError: | ||||
pass | ||||
else: | ||||
real_dbs.append(SQLiteDB) | ||||
MinRK
|
r3985 | |||
try: | ||||
from IPython.parallel.controller.mongodb import MongoDB | ||||
except ImportError: | ||||
MinRK
|
r9866 | pass | ||
MinRK
|
r3985 | else: | ||
MinRK
|
r9866 | real_dbs.append(MongoDB) | ||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
_description = """Start the IPython controller for parallel computing. | ||||
The IPython controller provides a gateway between the IPython engines and | ||||
clients. The controller needs to be started before the engines and can be | ||||
configured using command line options or using a cluster directory. Cluster | ||||
directories contain config, log and security files and are usually located in | ||||
MinRK
|
r4024 | your ipython directory and named as "profile_name". See the `profile` | ||
Brian E. Granger
|
r4218 | and `profile-dir` options for details. | ||
MinRK
|
r3604 | """ | ||
Brian Granger
|
r4216 | _examples = """ | ||
ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines | ||||
ipcontroller --scheme=pure # use the pure zeromq scheduler | ||||
""" | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# The main application | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | flags = {} | ||
flags.update(base_flags) | ||||
flags.update({ | ||||
MinRK
|
r3990 | 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}}, | ||
MinRK
|
r3985 | 'Use threads instead of processes for the schedulers'), | ||
MinRK
|
r3994 | 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}}, | ||
MinRK
|
r3985 | 'use the SQLiteDB backend'), | ||
MinRK
|
r3994 | 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}}, | ||
MinRK
|
r3985 | 'use the MongoDB backend'), | ||
MinRK
|
r3994 | 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}}, | ||
MinRK
|
r3985 | 'use the in-memory DictDB backend'), | ||
MinRK
|
r5892 | 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}}, | ||
"""use dummy DB backend, which doesn't store any information. | ||||
MinRK
|
r7509 | This is the default as of IPython 0.13. | ||
To enable delayed or repeated retrieval of results from the Hub, | ||||
select one of the true db backends. | ||||
"""), | ||||
MinRK
|
r3994 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, | ||
MinRK
|
r7891 | 'reuse existing json connection files'), | ||
'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, | ||||
'Attempt to restore engines from a JSON file. ' | ||||
'For use when resuming a crashed controller'), | ||||
MinRK
|
r3985 | }) | ||
MinRK
|
r4962 | flags.update(session_flags) | ||
MinRK
|
r4115 | aliases = dict( | ||
ssh = 'IPControllerApp.ssh_server', | ||||
MinRK
|
r4585 | enginessh = 'IPControllerApp.engine_ssh_server', | ||
MinRK
|
r4115 | location = 'IPControllerApp.location', | ||
url = 'HubFactory.url', | ||||
ip = 'HubFactory.ip', | ||||
transport = 'HubFactory.transport', | ||||
port = 'HubFactory.regport', | ||||
ping = 'HeartMonitor.period', | ||||
scheme = 'TaskScheduler.scheme_name', | ||||
hwm = 'TaskScheduler.hwm', | ||||
) | ||||
aliases.update(base_aliases) | ||||
MinRK
|
r4962 | aliases.update(session_aliases) | ||
MinRK
|
r3985 | |||
MinRK
|
r3992 | class IPControllerApp(BaseParallelApplication): | ||
MinRK
|
r3604 | |||
MinRK
|
r3672 | name = u'ipcontroller' | ||
MinRK
|
r3604 | description = _description | ||
Brian Granger
|
r4216 | examples = _examples | ||
MinRK
|
r9866 | classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs | ||
MinRK
|
r3992 | |||
# change default to True | ||||
auto_create = Bool(True, config=True, | ||||
MinRK
|
r4000 | help="""Whether to create profile dir if it doesn't exist.""") | ||
MinRK
|
r3614 | |||
MinRK
|
r3985 | reuse_files = Bool(False, config=True, | ||
MinRK
|
r6072 | help="""Whether to reuse existing json connection files. | ||
If False, connection files will be removed on a clean exit. | ||||
""" | ||||
MinRK
|
r3985 | ) | ||
MinRK
|
r7891 | restore_engines = Bool(False, config=True, | ||
help="""Reload engine state from JSON file | ||||
""" | ||||
) | ||||
MinRK
|
r3985 | ssh_server = Unicode(u'', config=True, | ||
help="""ssh url for clients to use when connecting to the Controller | ||||
processes. It should be of the form: [user@]server[:port]. The | ||||
MinRK
|
r4000 | Controller's listening addresses must be accessible from the ssh server""", | ||
MinRK
|
r3985 | ) | ||
MinRK
|
r4585 | engine_ssh_server = Unicode(u'', config=True, | ||
help="""ssh url for engines to use when connecting to the Controller | ||||
processes. It should be of the form: [user@]server[:port]. The | ||||
Controller's listening addresses must be accessible from the ssh server""", | ||||
) | ||||
MinRK
|
r3985 | location = Unicode(u'', config=True, | ||
help="""The external IP or domain name of the Controller, used for disambiguating | ||||
engine and client connections.""", | ||||
) | ||||
import_statements = List([], config=True, | ||||
help="import statements to be run at startup. Necessary in some environments" | ||||
) | ||||
MinRK
|
r3990 | use_threads = Bool(False, config=True, | ||
MinRK
|
r3985 | help='Use threads instead of processes for the schedulers', | ||
MinRK
|
r4847 | ) | ||
engine_json_file = Unicode('ipcontroller-engine.json', config=True, | ||||
help="JSON filename where engine connection info will be stored.") | ||||
client_json_file = Unicode('ipcontroller-client.json', config=True, | ||||
help="JSON filename where client connection info will be stored.") | ||||
def _cluster_id_changed(self, name, old, new): | ||||
super(IPControllerApp, self)._cluster_id_changed(name, old, new) | ||||
MinRK
|
r4850 | self.engine_json_file = "%s-engine.json" % self.name | ||
self.client_json_file = "%s-client.json" % self.name | ||||
MinRK
|
r4847 | |||
MinRK
|
r3985 | |||
# internal | ||||
children = List() | ||||
MinRK
|
r3988 | mq_class = Unicode('zmq.devices.ProcessMonitoredQueue') | ||
MinRK
|
r3985 | |||
MinRK
|
r3990 | def _use_threads_changed(self, name, old, new): | ||
MinRK
|
r3985 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') | ||
MinRK
|
r6072 | |||
write_connection_files = Bool(True, | ||||
help="""Whether to write connection files to disk. | ||||
True in all cases other than runs with `reuse_files=True` *after the first* | ||||
""" | ||||
) | ||||
MinRK
|
r3985 | |||
MinRK
|
r4115 | aliases = Dict(aliases) | ||
MinRK
|
r3985 | flags = Dict(flags) | ||
MinRK
|
r3614 | |||
MinRK
|
r3985 | |||
MinRK
|
r3614 | def save_connection_dict(self, fname, cdict): | ||
"""save a connection dict to json file.""" | ||||
MinRK
|
r3985 | c = self.config | ||
MinRK
|
r7889 | url = cdict['registration'] | ||
MinRK
|
r3614 | location = cdict['location'] | ||
MinRK
|
r7890 | |||
MinRK
|
r3614 | if not location: | ||
MinRK
|
r12591 | if public_ips(): | ||
location = public_ips()[-1] | ||||
W. Trevor King
|
r9251 | else: | ||
self.log.warn("Could not identify this machine's IP, assuming %s." | ||||
MinRK
|
r7890 | " You may need to specify '--location=<external_ip_address>' to help" | ||
MinRK
|
r12591 | " IPython decide when to connect via loopback." % localhost() ) | ||
location = localhost() | ||||
MinRK
|
r3614 | cdict['location'] = location | ||
MinRK
|
r3992 | fname = os.path.join(self.profile_dir.security_dir, fname) | ||
MinRK
|
r5483 | self.log.info("writing connection info to %s", fname) | ||
MinRK
|
r5429 | with open(fname, 'w') as f: | ||
MinRK
|
r3614 | f.write(json.dumps(cdict, indent=2)) | ||
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) | ||||
MinRK
|
r3630 | |||
def load_config_from_json(self): | ||||
"""load config from existing json connector files.""" | ||||
MinRK
|
r3985 | c = self.config | ||
MinRK
|
r4962 | self.log.debug("loading config from JSON") | ||
MinRK
|
r7890 | |||
# load engine config | ||||
MinRK
|
r5483 | fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) | ||
self.log.info("loading connection info from %s", fname) | ||||
with open(fname) as f: | ||||
MinRK
|
r7890 | ecfg = json.loads(f.read()) | ||
MinRK
|
r5626 | # json gives unicode, Session.key wants bytes | ||
MinRK
|
r11657 | c.Session.key = ecfg['key'].encode('ascii') | ||
MinRK
|
r7890 | |||
xport,ip = ecfg['interface'].split('://') | ||||
MinRK
|
r3630 | c.HubFactory.engine_ip = ip | ||
MinRK
|
r7890 | c.HubFactory.engine_transport = xport | ||
self.location = ecfg['location'] | ||||
MinRK
|
r4585 | if not self.engine_ssh_server: | ||
MinRK
|
r7890 | self.engine_ssh_server = ecfg['ssh'] | ||
MinRK
|
r3630 | # load client config | ||
MinRK
|
r7890 | |||
MinRK
|
r5483 | fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) | ||
self.log.info("loading connection info from %s", fname) | ||||
with open(fname) as f: | ||||
MinRK
|
r7890 | ccfg = json.loads(f.read()) | ||
MinRK
|
r11657 | for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'): | ||
MinRK
|
r7890 | assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key | ||
xport,addr = ccfg['interface'].split('://') | ||||
MinRK
|
r3630 | c.HubFactory.client_transport = xport | ||
c.HubFactory.client_ip = ip | ||||
MinRK
|
r4585 | if not self.ssh_server: | ||
MinRK
|
r7890 | self.ssh_server = ccfg['ssh'] | ||
# load port config: | ||||
c.HubFactory.regport = ecfg['registration'] | ||||
c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong']) | ||||
c.HubFactory.control = (ccfg['control'], ecfg['control']) | ||||
c.HubFactory.mux = (ccfg['mux'], ecfg['mux']) | ||||
c.HubFactory.task = (ccfg['task'], ecfg['task']) | ||||
c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub']) | ||||
c.HubFactory.notifier_port = ccfg['notification'] | ||||
MinRK
|
r3630 | |||
MinRK
|
r6072 | def cleanup_connection_files(self): | ||
if self.reuse_files: | ||||
self.log.debug("leaving JSON connection files for reuse") | ||||
return | ||||
self.log.debug("cleaning up JSON connection files") | ||||
for f in (self.client_json_file, self.engine_json_file): | ||||
f = os.path.join(self.profile_dir.security_dir, f) | ||||
try: | ||||
os.remove(f) | ||||
except Exception as e: | ||||
self.log.error("Failed to cleanup connection file: %s", e) | ||||
else: | ||||
self.log.debug(u"removed %s", f) | ||||
MinRK
|
r4962 | def load_secondary_config(self): | ||
"""secondary config, loading from JSON and setting defaults""" | ||||
if self.reuse_files: | ||||
try: | ||||
self.load_config_from_json() | ||||
except (AssertionError,IOError) as e: | ||||
self.log.error("Could not load config from JSON: %s" % e) | ||||
MinRK
|
r6072 | else: | ||
# successfully loaded config from JSON, and reuse=True | ||||
# no need to wite back the same file | ||||
self.write_connection_files = False | ||||
MinRK
|
r4962 | self.log.debug("Config changed") | ||
self.log.debug(repr(self.config)) | ||||
MinRK
|
r3985 | def init_hub(self): | ||
c = self.config | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | self.do_import_statements() | ||
MinRK
|
r3604 | |||
try: | ||||
MinRK
|
r3985 | self.factory = HubFactory(config=c, log=self.log) | ||
# self.start_logging() | ||||
self.factory.init_hub() | ||||
MinRK
|
r5172 | except TraitError: | ||
raise | ||||
except Exception: | ||||
MinRK
|
r3604 | self.log.error("Couldn't construct the Controller", exc_info=True) | ||
self.exit(1) | ||||
MinRK
|
r3614 | |||
MinRK
|
r6072 | if self.write_connection_files: | ||
MinRK
|
r3630 | # save to new json config files | ||
f = self.factory | ||||
MinRK
|
r7889 | base = { | ||
MinRK
|
r11657 | 'key' : f.session.key.decode('ascii'), | ||
MinRK
|
r7889 | 'location' : self.location, | ||
'pack' : f.session.packer, | ||||
'unpack' : f.session.unpacker, | ||||
MinRK
|
r11657 | 'signature_scheme' : f.session.signature_scheme, | ||
MinRK
|
r7889 | } | ||
cdict = {'ssh' : self.ssh_server} | ||||
cdict.update(f.client_info) | ||||
cdict.update(base) | ||||
MinRK
|
r4847 | self.save_connection_dict(self.client_json_file, cdict) | ||
MinRK
|
r7889 | |||
edict = {'ssh' : self.engine_ssh_server} | ||||
edict.update(f.engine_info) | ||||
edict.update(base) | ||||
MinRK
|
r4847 | self.save_connection_dict(self.engine_json_file, edict) | ||
MinRK
|
r3985 | |||
MinRK
|
r7891 | fname = "engines%s.json" % self.cluster_id | ||
self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) | ||||
if self.restore_engines: | ||||
self.factory.hub._load_engine_state() | ||||
Min RK
|
r20589 | # load key into config so other sessions in this process (TaskScheduler) | ||
# have the same value | ||||
self.config.Session.key = self.factory.session.key | ||||
MinRK
|
r7891 | |||
MinRK
|
r3985 | def init_schedulers(self): | ||
children = self.children | ||||
MinRK
|
r3989 | mq = import_item(str(self.mq_class)) | ||
MinRK
|
r3614 | |||
MinRK
|
r7890 | f = self.factory | ||
MinRK
|
r7891 | ident = f.session.bsession | ||
MinRK
|
r5170 | # disambiguate url, in case of * | ||
MinRK
|
r7890 | monitor_url = disambiguate_url(f.monitor_url) | ||
MinRK
|
r5170 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url | ||
MinRK
|
r3985 | # IOPub relay (in a Process) | ||
MinRK
|
r4155 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | ||
MinRK
|
r7890 | q.bind_in(f.client_url('iopub')) | ||
MinRK
|
r7893 | q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub") | ||
MinRK
|
r7890 | q.bind_out(f.engine_url('iopub')) | ||
MinRK
|
r4155 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | ||
MinRK
|
r5170 | q.connect_mon(monitor_url) | ||
MinRK
|
r3985 | q.daemon=True | ||
children.append(q) | ||||
# Multiplexer Queue (in a Process) | ||||
MinRK
|
r4725 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') | ||
MinRK
|
r10614 | |||
MinRK
|
r7890 | q.bind_in(f.client_url('mux')) | ||
MinRK
|
r7891 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | ||
MinRK
|
r7890 | q.bind_out(f.engine_url('mux')) | ||
MinRK
|
r7891 | q.setsockopt_out(zmq.IDENTITY, b'mux_out') | ||
MinRK
|
r5170 | q.connect_mon(monitor_url) | ||
MinRK
|
r3985 | q.daemon=True | ||
children.append(q) | ||||
# Control Queue (in a Process) | ||||
MinRK
|
r4725 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') | ||
MinRK
|
r7890 | q.bind_in(f.client_url('control')) | ||
MinRK
|
r7891 | q.setsockopt_in(zmq.IDENTITY, b'control_in') | ||
MinRK
|
r7890 | q.bind_out(f.engine_url('control')) | ||
MinRK
|
r7891 | q.setsockopt_out(zmq.IDENTITY, b'control_out') | ||
MinRK
|
r5170 | q.connect_mon(monitor_url) | ||
MinRK
|
r3985 | q.daemon=True | ||
children.append(q) | ||||
MinRK
|
r12796 | if 'TaskScheduler.scheme_name' in self.config: | ||
MinRK
|
r3985 | scheme = self.config.TaskScheduler.scheme_name | ||
MinRK
|
r12796 | else: | ||
MinRK
|
r3985 | scheme = TaskScheduler.scheme_name.get_default_value() | ||
# Task Queue (in a Process) | ||||
if scheme == 'pure': | ||||
MinRK
|
r7538 | self.log.warn("task::using pure DEALER Task scheduler") | ||
MinRK
|
r4725 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') | ||
MinRK
|
r3985 | # q.setsockopt_out(zmq.HWM, hub.hwm) | ||
MinRK
|
r7890 | q.bind_in(f.client_url('task')) | ||
MinRK
|
r7891 | q.setsockopt_in(zmq.IDENTITY, b'task_in') | ||
MinRK
|
r7890 | q.bind_out(f.engine_url('task')) | ||
MinRK
|
r7891 | q.setsockopt_out(zmq.IDENTITY, b'task_out') | ||
MinRK
|
r5170 | q.connect_mon(monitor_url) | ||
MinRK
|
r3985 | q.daemon=True | ||
children.append(q) | ||||
elif scheme == 'none': | ||||
self.log.warn("task::using no Task scheduler") | ||||
else: | ||||
self.log.info("task::using Python %s Task scheduler"%scheme) | ||||
MinRK
|
r7890 | sargs = (f.client_url('task'), f.engine_url('task'), | ||
MinRK
|
r7891 | monitor_url, disambiguate_url(f.client_url('notification')), | ||
disambiguate_url(f.client_url('registration')), | ||||
) | ||||
MinRK
|
r3989 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | ||
log_url = self.log_url, config=dict(self.config)) | ||||
MinRK
|
r4092 | if 'Process' in self.mq_class: | ||
# run the Python scheduler in a Process | ||||
q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | ||||
q.daemon=True | ||||
children.append(q) | ||||
else: | ||||
# single-threaded Controller | ||||
kwargs['in_thread'] = True | ||||
launch_scheduler(*sargs, **kwargs) | ||||
MinRK
|
r10614 | |||
# set unlimited HWM for all relay devices | ||||
if hasattr(zmq, 'SNDHWM'): | ||||
q = children[0] | ||||
q.setsockopt_in(zmq.RCVHWM, 0) | ||||
q.setsockopt_out(zmq.SNDHWM, 0) | ||||
for q in children[1:]: | ||||
if not hasattr(q, 'setsockopt_in'): | ||||
continue | ||||
q.setsockopt_in(zmq.SNDHWM, 0) | ||||
q.setsockopt_in(zmq.RCVHWM, 0) | ||||
q.setsockopt_out(zmq.SNDHWM, 0) | ||||
q.setsockopt_out(zmq.RCVHWM, 0) | ||||
q.setsockopt_mon(zmq.SNDHWM, 0) | ||||
MinRK
|
r3985 | |||
MinRK
|
r6072 | def terminate_children(self): | ||
child_procs = [] | ||||
for child in self.children: | ||||
if isinstance(child, ProcessMonitoredQueue): | ||||
child_procs.append(child.launcher) | ||||
elif isinstance(child, Process): | ||||
child_procs.append(child) | ||||
if child_procs: | ||||
self.log.critical("terminating children...") | ||||
for child in child_procs: | ||||
try: | ||||
child.terminate() | ||||
except OSError: | ||||
# already dead | ||||
pass | ||||
MinRK
|
r3605 | |||
MinRK
|
r6072 | def handle_signal(self, sig, frame): | ||
self.log.critical("Received signal %i, shutting down", sig) | ||||
self.terminate_children() | ||||
self.loop.stop() | ||||
MinRK
|
r3604 | |||
MinRK
|
r6072 | def init_signal(self): | ||
for sig in (SIGINT, SIGABRT, SIGTERM): | ||||
signal(sig, self.handle_signal) | ||||
MinRK
|
r3985 | def do_import_statements(self): | ||
statements = self.import_statements | ||||
MinRK
|
r3604 | for s in statements: | ||
try: | ||||
self.log.msg("Executing statement: '%s'" % s) | ||||
Thomas Kluyver
|
r13350 | exec(s, globals(), locals()) | ||
MinRK
|
r3604 | except: | ||
self.log.msg("Error running statement: %s" % s) | ||||
MinRK
|
r3989 | def forward_logging(self): | ||
if self.log_url: | ||||
self.log.info("Forwarding logging to %s"%self.log_url) | ||||
context = zmq.Context.instance() | ||||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.log_url) | ||||
handler = PUBHandler(lsock) | ||||
handler.root_topic = 'controller' | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
MinRK
|
r3986 | |||
MinRK
|
r5214 | @catch_config_error | ||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
super(IPControllerApp, self).initialize(argv) | ||||
MinRK
|
r3989 | self.forward_logging() | ||
MinRK
|
r4962 | self.load_secondary_config() | ||
MinRK
|
r3986 | self.init_hub() | ||
self.init_schedulers() | ||||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3605 | # Start the subprocesses: | ||
MinRK
|
r3604 | self.factory.start() | ||
MinRK
|
r6072 | # children must be started before signals are setup, | ||
# otherwise signal-handling will fire multiple times | ||||
MinRK
|
r3985 | for child in self.children: | ||
child.start() | ||||
MinRK
|
r6072 | self.init_signal() | ||
MinRK
|
r3985 | |||
MinRK
|
r3604 | self.write_pid_file(overwrite=True) | ||
MinRK
|
r3985 | |||
MinRK
|
r3604 | try: | ||
self.factory.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Interrupted, Exiting...\n") | ||||
MinRK
|
r6072 | finally: | ||
self.cleanup_connection_files() | ||||
MinRK
|
r3986 | |||
MinRK
|
r3604 | |||
MinRK
|
r11172 | def launch_new_instance(*args, **kwargs): | ||
MinRK
|
r3604 | """Create and run the IPython controller""" | ||
MinRK
|
r4095 | if sys.platform == 'win32': | ||
# make sure we don't get called from a multiprocessing subprocess | ||||
# this can result in infinite Controllers being started on Windows | ||||
# which doesn't have a proper fork, so multiprocessing is wonky | ||||
# this only comes up when IPython has been installed using vanilla | ||||
# setuptools, and *not* distribute. | ||||
MinRK
|
r4096 | import multiprocessing | ||
p = multiprocessing.current_process() | ||||
# the main process has name 'MainProcess' | ||||
# subprocesses will have names like 'Process-1' | ||||
if p.name != 'MainProcess': | ||||
# we are a subprocess, don't start another Controller! | ||||
return | ||||
MinRK
|
r11176 | return IPControllerApp.launch_instance(*args, **kwargs) | ||
MinRK
|
r3604 | |||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||