##// END OF EJS Templates
Fix for %run -d on Python 3...
Fix for %run -d on Python 3 Closes gh-1421

File last commit:

r6072:2150bd36
r6487:3e1a5092
Show More
ipcontrollerapp.py
493 lines | 18.3 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
MinRK
json/jsonapi cleanup...
r5429 import json
MinRK
Refactor newparallel to use Config system...
r3604 import os
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
resort imports in a cleaner order
r3631 import stat
import sys
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from multiprocessing import Process
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 from signal import signal, SIGINT, SIGABRT, SIGTERM
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 import zmq
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from zmq.devices import ProcessMonitoredQueue
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.log.handlers import PUBHandler
Fernando Perez
Add more informative message around json api mismatch imports.
r5417
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 from IPython.core.profiledir import ProfileDir
MinRK
fix residual import issues with IPython.parallel reorganization
r3688
MinRK
rename clusterdir to more descriptive baseapp...
r3993 from IPython.parallel.apps.baseapp import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseParallelApplication,
MinRK
cleanup aliases in parallel apps...
r4115 base_aliases,
base_flags,
MinRK
catch_config -> catch_config_error
r5214 catch_config_error,
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.importstring import import_item
MinRK
Show invalid config message on TraitErrors during initialization...
r5172 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
enable HMAC message signing by default in kernels...
r4962 from IPython.zmq.session import (
Session, session_aliases, session_flags, default_secure
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.heartmonitor import HeartMonitor
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.parallel.controller.hub import HubFactory
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 from IPython.parallel.util import split_url, disambiguate_url
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # conditional import of MongoDB backend class
try:
from IPython.parallel.controller.mongodb import MongoDB
except ImportError:
maybe_mongo = []
else:
maybe_mongo = [MongoDB]
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipcontroller_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 your ipython directory and named as "profile_name". See the `profile`
Brian E. Granger
Finishing up help string work.
r4218 and `profile-dir` options for details.
MinRK
Refactor newparallel to use Config system...
r3604 """
Brian Granger
More work on adding examples to help strings.
r4216 _examples = """
ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
ipcontroller --scheme=pure # use the pure zeromq scheduler
"""
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = {}
flags.update(base_flags)
flags.update({
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'Use threads instead of processes for the schedulers'),
MinRK
remove uneccesary Config objects from flags.
r3994 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the SQLiteDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the MongoDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the in-memory DictDB backend'),
MinRK
add NoDB for non-recording Hub...
r5892 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
"""use dummy DB backend, which doesn't store any information.
This can be used to prevent growth of the memory footprint of the Hub
in cases where its record-keeping is not required. Requesting results
of tasks submitted by other clients, db_queries, and task resubmission
will not be available."""),
MinRK
remove uneccesary Config objects from flags.
r3994 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'reuse existing json connection files')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 })
MinRK
enable HMAC message signing by default in kernels...
r4962 flags.update(session_flags)
MinRK
cleanup aliases in parallel apps...
r4115 aliases = dict(
ssh = 'IPControllerApp.ssh_server',
MinRK
add ssh tunneling to Engine...
r4585 enginessh = 'IPControllerApp.engine_ssh_server',
MinRK
cleanup aliases in parallel apps...
r4115 location = 'IPControllerApp.location',
url = 'HubFactory.url',
ip = 'HubFactory.ip',
transport = 'HubFactory.transport',
port = 'HubFactory.regport',
ping = 'HeartMonitor.period',
scheme = 'TaskScheduler.scheme_name',
hwm = 'TaskScheduler.hwm',
)
aliases.update(base_aliases)
MinRK
enable HMAC message signing by default in kernels...
r4962 aliases.update(session_aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
Brian Granger
More work on adding examples to help strings.
r4216
MinRK
update parallel apps to use ProfileDir
r3992 class IPControllerApp(BaseParallelApplication):
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipcontroller'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
Brian Granger
More work on adding examples to help strings.
r4216 examples = _examples
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 config_file_name = Unicode(default_config_file_name)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
MinRK
update parallel apps to use ProfileDir
r3992
# change default to True
auto_create = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help="""Whether to create profile dir if it doesn't exist.""")
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = Bool(False, config=True,
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 help="""Whether to reuse existing json connection files.
If False, connection files will be removed on a clean exit.
"""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 Controller's listening addresses must be accessible from the ssh server""",
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
MinRK
add ssh tunneling to Engine...
r4585 engine_ssh_server = Unicode(u'', config=True,
help="""ssh url for engines to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
Controller's listening addresses must be accessible from the ssh server""",
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 location = Unicode(u'', config=True,
help="""The external IP or domain name of the Controller, used for disambiguating
engine and client connections.""",
)
import_statements = List([], config=True,
help="import statements to be run at startup. Necessary in some environments"
)
MinRK
parallel docs, tests, default config updated to newconfig
r3990 use_threads = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='Use threads instead of processes for the schedulers',
MinRK
add cluster_id to parallel apps...
r4847 )
engine_json_file = Unicode('ipcontroller-engine.json', config=True,
help="JSON filename where engine connection info will be stored.")
client_json_file = Unicode('ipcontroller-client.json', config=True,
help="JSON filename where client connection info will be stored.")
def _cluster_id_changed(self, name, old, new):
super(IPControllerApp, self)._cluster_id_changed(name, old, new)
MinRK
parallel.apps cleanup per review...
r4850 self.engine_json_file = "%s-engine.json" % self.name
self.client_json_file = "%s-client.json" % self.name
MinRK
add cluster_id to parallel apps...
r4847
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
# internal
children = List()
MinRK
cleanup parallel traits...
r3988 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
parallel docs, tests, default config updated to newconfig
r3990 def _use_threads_changed(self, name, old, new):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072
write_connection_files = Bool(True,
help="""Whether to write connection files to disk.
True in all cases other than runs with `reuse_files=True` *after the first*
"""
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
cleanup aliases in parallel apps...
r4115 aliases = Dict(aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = Dict(flags)
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
persist connection data to disk as json
r3614 def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
persist connection data to disk as json
r3614 url = cdict['url']
location = cdict['location']
if not location:
try:
proto,ip,port = split_url(url)
except AssertionError:
pass
else:
MinRK
don't allow gethostbyname(gethostname()) failure to crash ipcontroller...
r4239 try:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
except (socket.gaierror, IndexError):
self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
" You may need to specify '--location=<external_ip_address>' to help"
" IPython decide when to connect via loopback.")
location = '127.0.0.1'
MinRK
persist connection data to disk as json
r3614 cdict['location'] = location
MinRK
update parallel apps to use ProfileDir
r3992 fname = os.path.join(self.profile_dir.security_dir, fname)
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 self.log.info("writing connection info to %s", fname)
MinRK
json/jsonapi cleanup...
r5429 with open(fname, 'w') as f:
MinRK
persist connection data to disk as json
r3614 f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
MinRK
add default ip<x>z_config files
r3630
def load_config_from_json(self):
"""load config from existing json connector files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
enable HMAC message signing by default in kernels...
r4962 self.log.debug("loading config from JSON")
MinRK
add default ip<x>z_config files
r3630 # load from engine config
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
self.log.info("loading connection info from %s", fname)
with open(fname) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
MinRK
minor py3 fixes in IPython.parallel...
r5626 key = cfg['exec_key']
# json gives unicode, Session.key wants bytes
c.Session.key = key.encode('ascii')
MinRK
add default ip<x>z_config files
r3630 xport,addr = cfg['url'].split('://')
c.HubFactory.engine_transport = xport
ip,ports = addr.split(':')
c.HubFactory.engine_ip = ip
c.HubFactory.regport = int(ports)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.location = cfg['location']
MinRK
add ssh tunneling to Engine...
r4585 if not self.engine_ssh_server:
self.engine_ssh_server = cfg['ssh']
MinRK
add default ip<x>z_config files
r3630 # load client config
MinRK
minor cleanup in ipcontroller/ipengine...
r5483 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
self.log.info("loading connection info from %s", fname)
with open(fname) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
c.HubFactory.client_transport = xport
ip,ports = addr.split(':')
c.HubFactory.client_ip = ip
MinRK
add ssh tunneling to Engine...
r4585 if not self.ssh_server:
self.ssh_server = cfg['ssh']
MinRK
add default ip<x>z_config files
r3630 assert int(ports) == c.HubFactory.regport, "regport mismatch"
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def cleanup_connection_files(self):
if self.reuse_files:
self.log.debug("leaving JSON connection files for reuse")
return
self.log.debug("cleaning up JSON connection files")
for f in (self.client_json_file, self.engine_json_file):
f = os.path.join(self.profile_dir.security_dir, f)
try:
os.remove(f)
except Exception as e:
self.log.error("Failed to cleanup connection file: %s", e)
else:
self.log.debug(u"removed %s", f)
MinRK
enable HMAC message signing by default in kernels...
r4962 def load_secondary_config(self):
"""secondary config, loading from JSON and setting defaults"""
if self.reuse_files:
try:
self.load_config_from_json()
except (AssertionError,IOError) as e:
self.log.error("Could not load config from JSON: %s" % e)
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 else:
# successfully loaded config from JSON, and reuse=True
# no need to wite back the same file
self.write_connection_files = False
MinRK
enable HMAC message signing by default in kernels...
r4962 # switch Session.key default to secure
default_secure(self.config)
self.log.debug("Config changed")
self.log.debug(repr(self.config))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
c = self.config
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.do_import_statements()
MinRK
Refactor newparallel to use Config system...
r3604
try:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.factory = HubFactory(config=c, log=self.log)
# self.start_logging()
self.factory.init_hub()
MinRK
Show invalid config message on TraitErrors during initialization...
r5172 except TraitError:
raise
except Exception:
MinRK
Refactor newparallel to use Config system...
r3604 self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
MinRK
persist connection data to disk as json
r3614
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 if self.write_connection_files:
MinRK
add default ip<x>z_config files
r3630 # save to new json config files
f = self.factory
Thomas Kluyver
Fixes for parallel code on Python 3.
r5287 cdict = {'exec_key' : f.session.key.decode('ascii'),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'ssh' : self.ssh_server,
MinRK
add default ip<x>z_config files
r3630 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'location' : self.location
MinRK
add default ip<x>z_config files
r3630 }
MinRK
add cluster_id to parallel apps...
r4847 self.save_connection_dict(self.client_json_file, cdict)
MinRK
add default ip<x>z_config files
r3630 edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
MinRK
add ssh tunneling to Engine...
r4585 edict['ssh'] = self.engine_ssh_server
MinRK
add cluster_id to parallel apps...
r4847 self.save_connection_dict(self.engine_json_file, edict)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
def init_schedulers(self):
children = self.children
MinRK
re-enable log forwarding and iplogger
r3989 mq = import_item(str(self.mq_class))
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 hub = self.factory
MinRK
fix --ip='*' argument in various apps...
r5170 # disambiguate url, in case of *
monitor_url = disambiguate_url(hub.monitor_url)
# maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # IOPub relay (in a Process)
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['iopub'])
q.bind_out(hub.engine_info['iopub'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_out(zmq.SUBSCRIBE, b'')
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['mux'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'mux')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['mux'])
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
# Control Queue (in a Process)
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['control'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'control')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['control'])
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
scheme = TaskScheduler.scheme_name.get_default_value()
# Task Queue (in a Process)
if scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # q.setsockopt_out(zmq.HWM, hub.hwm)
q.bind_in(hub.client_info['task'][1])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'task')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['task'])
MinRK
fix --ip='*' argument in various apps...
r5170 q.connect_mon(monitor_url)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.daemon=True
children.append(q)
elif scheme == 'none':
self.log.warn("task::using no Task scheduler")
else:
self.log.info("task::using Python %s Task scheduler"%scheme)
sargs = (hub.client_info['task'][1], hub.engine_info['task'],
MinRK
fix --ip='*' argument in various apps...
r5170 monitor_url, disambiguate_url(hub.client_info['notification']))
MinRK
re-enable log forwarding and iplogger
r3989 kwargs = dict(logname='scheduler', loglevel=self.log_level,
log_url = self.log_url, config=dict(self.config))
MinRK
allow true single-threaded Controller...
r4092 if 'Process' in self.mq_class:
# run the Python scheduler in a Process
q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
else:
# single-threaded Controller
kwargs['in_thread'] = True
launch_scheduler(*sargs, **kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def terminate_children(self):
child_procs = []
for child in self.children:
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
self.log.critical("terminating children...")
for child in child_procs:
try:
child.terminate()
except OSError:
# already dead
pass
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def handle_signal(self, sig, frame):
self.log.critical("Received signal %i, shutting down", sig)
self.terminate_children()
self.loop.stop()
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 def init_signal(self):
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, self.handle_signal)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def do_import_statements(self):
statements = self.import_statements
MinRK
Refactor newparallel to use Config system...
r3604 for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
MinRK
re-enable log forwarding and iplogger
r3989 def forward_logging(self):
if self.log_url:
self.log.info("Forwarding logging to %s"%self.log_url)
context = zmq.Context.instance()
lsock = context.socket(zmq.PUB)
lsock.connect(self.log_url)
handler = PUBHandler(lsock)
self.log.removeHandler(self._log_handler)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
self._log_handler = handler
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
catch_config -> catch_config_error
r5214 @catch_config_error
MinRK
ipcluster implemented with new subcommands
r3986 def initialize(self, argv=None):
super(IPControllerApp, self).initialize(argv)
MinRK
re-enable log forwarding and iplogger
r3989 self.forward_logging()
MinRK
enable HMAC message signing by default in kernels...
r4962 self.load_secondary_config()
MinRK
ipcluster implemented with new subcommands
r3986 self.init_hub()
self.init_schedulers()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def start(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Start the subprocesses:
MinRK
Refactor newparallel to use Config system...
r3604 self.factory.start()
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 # children must be started before signals are setup,
# otherwise signal-handling will fire multiple times
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 for child in self.children:
child.start()
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 self.init_signal()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 self.write_pid_file(overwrite=True)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
MinRK
ipcontroller cleans up connection files unless reuse=True...
r6072 finally:
self.cleanup_connection_files()
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604
def launch_new_instance():
"""Create and run the IPython controller"""
MinRK
prevent infinite Controllers on Windows...
r4095 if sys.platform == 'win32':
# make sure we don't get called from a multiprocessing subprocess
# this can result in infinite Controllers being started on Windows
# which doesn't have a proper fork, so multiprocessing is wonky
# this only comes up when IPython has been installed using vanilla
# setuptools, and *not* distribute.
MinRK
use multiprocessing instead of inspect to prevent spurious controllers...
r4096 import multiprocessing
p = multiprocessing.current_process()
# the main process has name 'MainProcess'
# subprocesses will have names like 'Process-1'
if p.name != 'MainProcess':
# we are a subprocess, don't start another Controller!
return
MinRK
use App.instance() in launch_new_instance (parallel apps)...
r3999 app = IPControllerApp.instance()
MinRK
ipcluster implemented with new subcommands
r3986 app.initialize()
MinRK
Refactor newparallel to use Config system...
r3604 app.start()
if __name__ == '__main__':
launch_new_instance()