##// END OF EJS Templates
test user_variables/expressions in message spec
test user_variables/expressions in message spec

File last commit:

r10614:00757118
r10637:7953182c
Show More
ipcontrollerapp.py
554 lines | 20.0 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
MinRK
json/jsonapi cleanup...
r5429 import json
MinRK
Refactor newparallel to use Config system...
r3604 import os
MinRK
resort imports in a cleaner order
r3631 import stat
import sys
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from multiprocessing import Process
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 from signal import signal, SIGINT, SIGABRT, SIGTERM
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 import zmq
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from zmq.devices import ProcessMonitoredQueue
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.log.handlers import PUBHandler
Fernando Perez
Add more informative message around json api mismatch imports.
r5417
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 from IPython.core.profiledir import ProfileDir
MinRK
fix residual import issues with IPython.parallel reorganization
r3688
MinRK
rename clusterdir to more descriptive baseapp...
r3993 from IPython.parallel.apps.baseapp import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseParallelApplication,
MinRK
cleanup aliases in parallel apps...
r4115 base_aliases,
base_flags,
MinRK
catch_config -> catch_config_error
r5214 catch_config_error,
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.importstring import import_item
W. Trevor King
parallel.apps.ipcontrollerapp: Use utils.localinterfaces...
r9251 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
MinRK
Show invalid config message on TraitErrors during initialization...
r5172 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 from IPython.kernel.zmq.session import (
MinRK
enable HMAC message signing by default in kernels...
r4962 Session, session_aliases, session_flags, default_secure
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.heartmonitor import HeartMonitor
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.parallel.controller.hub import HubFactory
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
MinRK
add DictDB to autogenerated config / help...
r9866 from IPython.parallel.controller.dictdb import DictDB
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
set unlimited HWM for all relay devices...
r10614 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add DictDB to autogenerated config / help...
r9866 # conditional import of SQLiteDB / MongoDB backend class
real_dbs = []
try:
from IPython.parallel.controller.sqlitedb import SQLiteDB
except ImportError:
pass
else:
real_dbs.append(SQLiteDB)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
try:
from IPython.parallel.controller.mongodb import MongoDB
except ImportError:
MinRK
add DictDB to autogenerated config / help...
r9866 pass
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 else:
MinRK
add DictDB to autogenerated config / help...
r9866 real_dbs.append(MongoDB)
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipcontroller_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 your ipython directory and named as "profile_name". See the `profile`
Brian E. Granger
Finishing up help string work.
r4218 and `profile-dir` options for details.
MinRK
Refactor newparallel to use Config system...
r3604 """
Brian Granger
More work on adding examples to help strings.
r4216 _examples = """
ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
ipcontroller --scheme=pure # use the pure zeromq scheduler
"""
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = {}
flags.update(base_flags)
flags.update({
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'Use threads instead of processes for the schedulers'),
MinRK
remove uneccesary Config objects from flags.
r3994 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the SQLiteDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the MongoDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the in-memory DictDB backend'),
MinRK
add NoDB for non-recording Hub...
r5892 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
"""use dummy DB backend, which doesn't store any information.
MinRK
Use NoDB by default...
r7509 This is the default as of IPython 0.13.
To enable delayed or repeated retrieval of results from the Hub,
select one of the true db backends.
"""),
MinRK
remove uneccesary Config objects from flags.
r3994 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
MinRK
enables resume of ipcontroller...
r7891 'reuse existing json connection files'),
'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
'Attempt to restore engines from a JSON file. '
'For use when resuming a crashed controller'),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 })
MinRK
enable HMAC message signing by default in kernels...
r4962 flags.update(session_flags)
MinRK
cleanup aliases in parallel apps...
r4115 aliases = dict(
ssh = 'IPControllerApp.ssh_server',
MinRK
add ssh tunneling to Engine...
r4585 enginessh = 'IPControllerApp.engine_ssh_server',
MinRK
cleanup aliases in parallel apps...
r4115 location = 'IPControllerApp.location',
url = 'HubFactory.url',
ip = 'HubFactory.ip',
transport = 'HubFactory.transport',
port = 'HubFactory.regport',
ping = 'HeartMonitor.period',
scheme = 'TaskScheduler.scheme_name',
hwm = 'TaskScheduler.hwm',
)
aliases.update(base_aliases)
MinRK
enable HMAC message signing by default in kernels...
r4962 aliases.update(session_aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
update parallel apps to use ProfileDir
r3992 class IPControllerApp(BaseParallelApplication):
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipcontroller'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
Brian Granger
More work on adding examples to help strings.
r4216 examples = _examples
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 config_file_name = Unicode(default_config_file_name)
MinRK
add DictDB to autogenerated config / help...
r9866 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
MinRK
update parallel apps to use ProfileDir
r3992
# change default to True
auto_create = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help="""Whether to create profile dir if it doesn't exist.""")
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = Bool(False, config=True,
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 help="""Whether to reuse existing json connection files.
If False, connection files will be removed on a clean exit.
"""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
MinRK
enables resume of ipcontroller...
r7891 restore_engines = Bool(False, config=True,
help="""Reload engine state from JSON file
"""
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 Controller's listening addresses must be accessible from the ssh server""",
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
MinRK
add ssh tunneling to Engine...
r4585 engine_ssh_server = Unicode(u'', config=True,
help="""ssh url for engines to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
Controller's listening addresses must be accessible from the ssh server""",
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 location = Unicode(u'', config=True,
help="""The external IP or domain name of the Controller, used for disambiguating
engine and client connections.""",
)
import_statements = List([], config=True,
help="import statements to be run at startup. Necessary in some environments"
)
MinRK
parallel docs, tests, default config updated to newconfig
r3990 use_threads = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='Use threads instead of processes for the schedulers',
MinRK
add cluster_id to parallel apps...
r4847 )
engine_json_file = Unicode('ipcontroller-engine.json', config=True,
help="JSON filename where engine connection info will be stored.")
client_json_file = Unicode('ipcontroller-client.json', config=True,
help="JSON filename where client connection info will be stored.")
def _cluster_id_changed(self, name, old, new):
super(IPControllerApp, self)._cluster_id_changed(name, old, new)
MinRK
parallel.apps cleanup per review...
r4850 self.engine_json_file = "%s-engine.json" % self.name
self.client_json_file = "%s-client.json" % self.name
MinRK
add cluster_id to parallel apps...
r4847
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
# internal
children = List()
MinRK
cleanup parallel traits...
r3988 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
parallel docs, tests, default config updated to newconfig
r3990 def _use_threads_changed(self, name, old, new):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072
write_connection_files = Bool(True,
help="""Whether to write connection files to disk.
True in all cases other than runs with `reuse_files=True` *after the first*
"""
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
cleanup aliases in parallel apps...
r4115 aliases = Dict(aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = Dict(flags)
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
persist connection data to disk as json
r3614 def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
simplify IPython.parallel connections...
r7889 url = cdict['registration']
MinRK
persist connection data to disk as json
r3614 location = cdict['location']
MinRK
use individual ports, rather than full urls in connection files
r7890
MinRK
persist connection data to disk as json
r3614 if not location:
W. Trevor King
parallel.apps.ipcontrollerapp: Use utils.localinterfaces...
r9251 if PUBLIC_IPS:
location = PUBLIC_IPS[-1]
else:
self.log.warn("Could not identify this machine's IP, assuming %s."
MinRK
use individual ports, rather than full urls in connection files
r7890 " You may need to specify '--location=<external_ip_address>' to help"
W. Trevor King
parallel.apps.ipcontrollerapp: Use utils.localinterfaces...
r9251 " IPython decide when to connect via loopback." % LOCALHOST)
location = LOCALHOST
MinRK
persist connection data to disk as json
r3614 cdict['location'] = location
MinRK
update parallel apps to use ProfileDir
r3992 fname = os.path.join(self.profile_dir.security_dir, fname)
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 self.log.info("writing connection info to %s", fname)
MinRK
json/jsonapi cleanup...
r5429 with open(fname, 'w') as f:
MinRK
persist connection data to disk as json
r3614 f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
MinRK
add default ip<x>z_config files
r3630
def load_config_from_json(self):
"""load config from existing json connector files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
enable HMAC message signing by default in kernels...
r4962 self.log.debug("loading config from JSON")
MinRK
use individual ports, rather than full urls in connection files
r7890
# load engine config
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
self.log.info("loading connection info from %s", fname)
with open(fname) as f:
MinRK
use individual ports, rather than full urls in connection files
r7890 ecfg = json.loads(f.read())
MinRK
minor py3 fixes in IPython.parallel...
r5626 # json gives unicode, Session.key wants bytes
MinRK
use individual ports, rather than full urls in connection files
r7890 c.Session.key = ecfg['exec_key'].encode('ascii')
xport,ip = ecfg['interface'].split('://')
MinRK
add default ip<x>z_config files
r3630 c.HubFactory.engine_ip = ip
MinRK
use individual ports, rather than full urls in connection files
r7890 c.HubFactory.engine_transport = xport
self.location = ecfg['location']
MinRK
add ssh tunneling to Engine...
r4585 if not self.engine_ssh_server:
MinRK
use individual ports, rather than full urls in connection files
r7890 self.engine_ssh_server = ecfg['ssh']
MinRK
add default ip<x>z_config files
r3630 # load client config
MinRK
use individual ports, rather than full urls in connection files
r7890
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
self.log.info("loading connection info from %s", fname)
with open(fname) as f:
MinRK
use individual ports, rather than full urls in connection files
r7890 ccfg = json.loads(f.read())
for key in ('exec_key', 'registration', 'pack', 'unpack'):
assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
xport,addr = ccfg['interface'].split('://')
MinRK
add default ip<x>z_config files
r3630 c.HubFactory.client_transport = xport
c.HubFactory.client_ip = ip
MinRK
add ssh tunneling to Engine...
r4585 if not self.ssh_server:
MinRK
use individual ports, rather than full urls in connection files
r7890 self.ssh_server = ccfg['ssh']
# load port config:
c.HubFactory.regport = ecfg['registration']
c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
c.HubFactory.control = (ccfg['control'], ecfg['control'])
c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
c.HubFactory.task = (ccfg['task'], ecfg['task'])
c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
c.HubFactory.notifier_port = ccfg['notification']
MinRK
add default ip<x>z_config files
r3630
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def cleanup_connection_files(self):
if self.reuse_files:
self.log.debug("leaving JSON connection files for reuse")
return
self.log.debug("cleaning up JSON connection files")
for f in (self.client_json_file, self.engine_json_file):
f = os.path.join(self.profile_dir.security_dir, f)
try:
os.remove(f)
except Exception as e:
self.log.error("Failed to cleanup connection file: %s", e)
else:
self.log.debug(u"removed %s", f)
MinRK
enable HMAC message signing by default in kernels...
r4962 def load_secondary_config(self):
"""secondary config, loading from JSON and setting defaults"""
if self.reuse_files:
try:
self.load_config_from_json()
except (AssertionError,IOError) as e:
self.log.error("Could not load config from JSON: %s" % e)
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 else:
# successfully loaded config from JSON, and reuse=True
# no need to wite back the same file
self.write_connection_files = False
MinRK
enable HMAC message signing by default in kernels...
r4962 # switch Session.key default to secure
default_secure(self.config)
self.log.debug("Config changed")
self.log.debug(repr(self.config))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
c = self.config
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.do_import_statements()
MinRK
Refactor newparallel to use Config system...
r3604
try:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.factory = HubFactory(config=c, log=self.log)
# self.start_logging()
self.factory.init_hub()
MinRK
Show invalid config message on TraitErrors during initialization...
r5172 except TraitError:
raise
except Exception:
MinRK
Refactor newparallel to use Config system...
r3604 self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
MinRK
persist connection data to disk as json
r3614
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 if self.write_connection_files:
MinRK
add default ip<x>z_config files
r3630 # save to new json config files
f = self.factory
MinRK
simplify IPython.parallel connections...
r7889 base = {
'exec_key' : f.session.key.decode('ascii'),
'location' : self.location,
'pack' : f.session.packer,
'unpack' : f.session.unpacker,
}
cdict = {'ssh' : self.ssh_server}
cdict.update(f.client_info)
cdict.update(base)
MinRK
add cluster_id to parallel apps...
r4847 self.save_connection_dict(self.client_json_file, cdict)
MinRK
simplify IPython.parallel connections...
r7889
edict = {'ssh' : self.engine_ssh_server}
edict.update(f.engine_info)
edict.update(base)
MinRK
add cluster_id to parallel apps...
r4847 self.save_connection_dict(self.engine_json_file, edict)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
enables resume of ipcontroller...
r7891 fname = "engines%s.json" % self.cluster_id
self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
if self.restore_engines:
self.factory.hub._load_engine_state()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_schedulers(self):
children = self.children
MinRK
re-enable log forwarding and iplogger
r3989 mq = import_item(str(self.mq_class))
MinRK
persist connection data to disk as json
r3614
MinRK
use individual ports, rather than full urls in connection files
r7890 f = self.factory
MinRK
enables resume of ipcontroller...
r7891 ident = f.session.bsession
MinRK
fix --ip='*' argument in various apps...
r5170 # disambiguate url, in case of *
MinRK
use individual ports, rather than full urls in connection files
r7890 monitor_url = disambiguate_url(f.monitor_url)
MinRK
fix --ip='*' argument in various apps...
r5170 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # IOPub relay (in a Process)
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_in(f.client_url('iopub'))
MinRK
IPython.parallel py3compat
r7893 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_out(f.engine_url('iopub'))
MinRK
update parallel code for py3k...
r4155 q.setsockopt_out(zmq.SUBSCRIBE, b'')
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
MinRK
set unlimited HWM for all relay devices...
r10614
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_in(f.client_url('mux'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_out(f.engine_url('mux'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
# Control Queue (in a Process)
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_in(f.client_url('control'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_in(zmq.IDENTITY, b'control_in')
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_out(f.engine_url('control'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_out(zmq.IDENTITY, b'control_out')
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
scheme = TaskScheduler.scheme_name.get_default_value()
# Task Queue (in a Process)
if scheme == 'pure':
MinRK
remove remaining references to deprecated XREP/XREQ names...
r7538 self.log.warn("task::using pure DEALER Task scheduler")
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # q.setsockopt_out(zmq.HWM, hub.hwm)
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_in(f.client_url('task'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_in(zmq.IDENTITY, b'task_in')
MinRK
use individual ports, rather than full urls in connection files
r7890 q.bind_out(f.engine_url('task'))
MinRK
enables resume of ipcontroller...
r7891 q.setsockopt_out(zmq.IDENTITY, b'task_out')
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
elif scheme == 'none':
self.log.warn("task::using no Task scheduler")
else:
self.log.info("task::using Python %s Task scheduler"%scheme)
MinRK
use individual ports, rather than full urls in connection files
r7890 sargs = (f.client_url('task'), f.engine_url('task'),
MinRK
enables resume of ipcontroller...
r7891 monitor_url, disambiguate_url(f.client_url('notification')),
disambiguate_url(f.client_url('registration')),
)
MinRK
re-enable log forwarding and iplogger
r3989 kwargs = dict(logname='scheduler', loglevel=self.log_level,
log_url = self.log_url, config=dict(self.config))
MinRK
allow true single-threaded Controller...
r4092 if 'Process' in self.mq_class:
# run the Python scheduler in a Process
q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
else:
# single-threaded Controller
kwargs['in_thread'] = True
launch_scheduler(*sargs, **kwargs)
MinRK
set unlimited HWM for all relay devices...
r10614
# set unlimited HWM for all relay devices
if hasattr(zmq, 'SNDHWM'):
q = children[0]
q.setsockopt_in(zmq.RCVHWM, 0)
q.setsockopt_out(zmq.SNDHWM, 0)
for q in children[1:]:
if not hasattr(q, 'setsockopt_in'):
continue
q.setsockopt_in(zmq.SNDHWM, 0)
q.setsockopt_in(zmq.RCVHWM, 0)
q.setsockopt_out(zmq.SNDHWM, 0)
q.setsockopt_out(zmq.RCVHWM, 0)
q.setsockopt_mon(zmq.SNDHWM, 0)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def terminate_children(self):
child_procs = []
for child in self.children:
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
self.log.critical("terminating children...")
for child in child_procs:
try:
child.terminate()
except OSError:
# already dead
pass
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def handle_signal(self, sig, frame):
self.log.critical("Received signal %i, shutting down", sig)
self.terminate_children()
self.loop.stop()
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def init_signal(self):
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, self.handle_signal)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def do_import_statements(self):
statements = self.import_statements
MinRK
Refactor newparallel to use Config system...
r3604 for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
MinRK
re-enable log forwarding and iplogger
r3989 def forward_logging(self):
if self.log_url:
self.log.info("Forwarding logging to %s"%self.log_url)
context = zmq.Context.instance()
lsock = context.socket(zmq.PUB)
lsock.connect(self.log_url)
handler = PUBHandler(lsock)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
catch_config -> catch_config_error
r5214 @catch_config_error
MinRK
ipcluster implemented with new subcommands
r3986 def initialize(self, argv=None):
super(IPControllerApp, self).initialize(argv)
MinRK
re-enable log forwarding and iplogger
r3989 self.forward_logging()
MinRK
enable HMAC message signing by default in kernels...
r4962 self.load_secondary_config()
MinRK
ipcluster implemented with new subcommands
r3986 self.init_hub()
self.init_schedulers()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def start(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Start the subprocesses:
MinRK
Refactor newparallel to use Config system...
r3604 self.factory.start()
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 # children must be started before signals are setup,
# otherwise signal-handling will fire multiple times
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 for child in self.children:
child.start()
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 self.init_signal()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 self.write_pid_file(overwrite=True)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 finally:
self.cleanup_connection_files()
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604
def launch_new_instance():
"""Create and run the IPython controller"""
MinRK
prevent infinite Controllers on Windows...
r4095 if sys.platform == 'win32':
# make sure we don't get called from a multiprocessing subprocess
# this can result in infinite Controllers being started on Windows
# which doesn't have a proper fork, so multiprocessing is wonky
# this only comes up when IPython has been installed using vanilla
# setuptools, and *not* distribute.
MinRK
use multiprocessing instead of inspect to prevent spurious controllers...
r4096 import multiprocessing
p = multiprocessing.current_process()
# the main process has name 'MainProcess'
# subprocesses will have names like 'Process-1'
if p.name != 'MainProcess':
# we are a subprocess, don't start another Controller!
return
MinRK
use App.instance() in launch_new_instance (parallel apps)...
r3999 app = IPControllerApp.instance()
MinRK
ipcluster implemented with new subcommands
r3986 app.initialize()
MinRK
Refactor newparallel to use Config system...
r3604 app.start()
if __name__ == '__main__':
launch_new_instance()