##// END OF EJS Templates
re-enable log forwarding and iplogger
re-enable log forwarding and iplogger

File last commit:

r3989:2e5a1161
r3989:2e5a1161
Show More
ipcontrollerapp.py
403 lines | 14.2 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import copy
import os
import logging
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
resort imports in a cleaner order
r3631 import stat
import sys
MinRK
persist connection data to disk as json
r3614 import uuid
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from multiprocessing import Process
MinRK
Refactor newparallel to use Config system...
r3604 import zmq
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from zmq.devices import ProcessMonitoredQueue
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.log.handlers import PUBHandler
MinRK
persist connection data to disk as json
r3614 from zmq.utils import jsonapi as json
MinRK
Refactor newparallel to use Config system...
r3604
from IPython.config.loader import Config
MinRK
organize IPython.parallel into subpackages
r3673
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import factory
MinRK
fix residual import issues with IPython.parallel reorganization
r3688
from IPython.parallel.apps.clusterdir import (
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ClusterDir,
MinRK
ipcluster implemented with new subcommands
r3986 ClusterApplication,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 base_flags
# ClusterDirConfigLoader
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.importstring import import_item
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
# from IPython.parallel.controller.controller import ControllerFactory
from IPython.parallel.streamsession import StreamSession
from IPython.parallel.controller.heartmonitor import HeartMonitor
from IPython.parallel.controller.hub import Hub, HubFactory
from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB
from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # conditional import of MongoDB backend class
try:
from IPython.parallel.controller.mongodb import MongoDB
except ImportError:
maybe_mongo = []
else:
maybe_mongo = [MongoDB]
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipcontroller_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 your ipython directory and named as "cluster_<profile>". See the --profile
MinRK
Refactor newparallel to use Config system...
r3604 and --cluster-dir options for details.
"""
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = {}
flags.update(base_flags)
flags.update({
'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
'Use threads instead of processes for the schedulers'),
'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
'use the SQLiteDB backend'),
'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
'use the MongoDB backend'),
'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
'use the in-memory DictDB backend'),
})
flags.update()
MinRK
ipcluster implemented with new subcommands
r3986 class IPControllerApp(ClusterApplication):
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipcontroller'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # command_line_loader = IPControllerAppConfigLoader
MinRK
Refactor newparallel to use Config system...
r3604 default_config_file_name = default_config_file_name
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
MinRK
persist connection data to disk as json
r3614
MinRK
restore auto_create behavior
r3987 auto_create_cluster_dir = Bool(True, config=True,
help="Whether to create cluster_dir if it exists.")
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = Bool(False, config=True,
help='Whether to reuse existing json connection files [default: False]'
)
secure = Bool(True, config=True,
help='Whether to use exec_keys for extra authentication [default: True]'
)
ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
Controller\'s listening addresses must be accessible from the ssh server""",
)
location = Unicode(u'', config=True,
help="""The external IP or domain name of the Controller, used for disambiguating
engine and client connections.""",
)
import_statements = List([], config=True,
help="import statements to be run at startup. Necessary in some environments"
)
usethreads = Bool(False, config=True,
help='Use threads instead of processes for the schedulers',
)
# internal
children = List()
MinRK
cleanup parallel traits...
r3988 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
def _usethreads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
aliases = Dict(dict(
config = 'IPControllerApp.config_file',
# file = 'IPControllerApp.url_file',
log_level = 'IPControllerApp.log_level',
MinRK
re-enable log forwarding and iplogger
r3989 log_url = 'IPControllerApp.log_url',
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = 'IPControllerApp.reuse_files',
secure = 'IPControllerApp.secure',
ssh = 'IPControllerApp.ssh_server',
usethreads = 'IPControllerApp.usethreads',
import_statements = 'IPControllerApp.import_statements',
location = 'IPControllerApp.location',
ident = 'StreamSession.session',
user = 'StreamSession.username',
exec_key = 'StreamSession.keyfile',
url = 'HubFactory.url',
ip = 'HubFactory.ip',
transport = 'HubFactory.transport',
port = 'HubFactory.regport',
ping = 'HeartMonitor.period',
scheme = 'TaskScheduler.scheme_name',
hwm = 'TaskScheduler.hwm',
profile = "ClusterDir.profile",
cluster_dir = 'ClusterDir.location',
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ))
flags = Dict(flags)
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
persist connection data to disk as json
r3614 def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
persist connection data to disk as json
r3614 url = cdict['url']
location = cdict['location']
if not location:
try:
proto,ip,port = split_url(url)
except AssertionError:
pass
else:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
cdict['location'] = location
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 fname = os.path.join(self.cluster_dir.security_dir, fname)
MinRK
persist connection data to disk as json
r3614 with open(fname, 'w') as f:
f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
MinRK
add default ip<x>z_config files
r3630
def load_config_from_json(self):
"""load config from existing json connector files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
add default ip<x>z_config files
r3630 # load from engine config
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 key = c.StreamSession.key = cfg['exec_key']
MinRK
add default ip<x>z_config files
r3630 xport,addr = cfg['url'].split('://')
c.HubFactory.engine_transport = xport
ip,ports = addr.split(':')
c.HubFactory.engine_ip = ip
c.HubFactory.regport = int(ports)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.location = cfg['location']
MinRK
persist connection data to disk as json
r3614
MinRK
add default ip<x>z_config files
r3630 # load client config
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
c.HubFactory.client_transport = xport
ip,ports = addr.split(':')
c.HubFactory.client_ip = ip
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ssh_server = cfg['ssh']
MinRK
add default ip<x>z_config files
r3630 assert int(ports) == c.HubFactory.regport, "regport mismatch"
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
c = self.config
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.do_import_statements()
reusing = self.reuse_files
MinRK
add default ip<x>z_config files
r3630 if reusing:
try:
self.load_config_from_json()
except (AssertionError,IOError):
reusing=False
# check again, because reusing may have failed:
if reusing:
pass
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 elif self.secure:
MinRK
add default ip<x>z_config files
r3630 key = str(uuid.uuid4())
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
# with open(keyfile, 'w') as f:
# f.write(key)
# os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
c.StreamSession.key = key
MinRK
Refactor newparallel to use Config system...
r3604 else:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 key = c.StreamSession.key = ''
MinRK
Refactor newparallel to use Config system...
r3604
try:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.factory = HubFactory(config=c, log=self.log)
# self.start_logging()
self.factory.init_hub()
MinRK
Refactor newparallel to use Config system...
r3604 except:
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
MinRK
persist connection data to disk as json
r3614
MinRK
add default ip<x>z_config files
r3630 if not reusing:
# save to new json config files
f = self.factory
cdict = {'exec_key' : key,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'ssh' : self.ssh_server,
MinRK
add default ip<x>z_config files
r3630 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'location' : self.location
MinRK
add default ip<x>z_config files
r3630 }
self.save_connection_dict('ipcontroller-client.json', cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
self.save_connection_dict('ipcontroller-engine.json', edict)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
#
def init_schedulers(self):
children = self.children
MinRK
re-enable log forwarding and iplogger
r3989 mq = import_item(str(self.mq_class))
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 hub = self.factory
# maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(hub.client_info['iopub'])
q.bind_out(hub.engine_info['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(hub.client_info['mux'])
q.setsockopt_in(zmq.IDENTITY, 'mux')
q.bind_out(hub.engine_info['mux'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(hub.client_info['control'])
q.setsockopt_in(zmq.IDENTITY, 'control')
q.bind_out(hub.engine_info['control'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
scheme = TaskScheduler.scheme_name.get_default_value()
# Task Queue (in a Process)
if scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
# q.setsockopt_out(zmq.HWM, hub.hwm)
q.bind_in(hub.client_info['task'][1])
q.setsockopt_in(zmq.IDENTITY, 'task')
q.bind_out(hub.engine_info['task'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
elif scheme == 'none':
self.log.warn("task::using no Task scheduler")
else:
self.log.info("task::using Python %s Task scheduler"%scheme)
sargs = (hub.client_info['task'][1], hub.engine_info['task'],
hub.monitor_url, hub.client_info['notification'])
MinRK
re-enable log forwarding and iplogger
r3989 kwargs = dict(logname='scheduler', loglevel=self.log_level,
log_url = self.log_url, config=dict(self.config))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def save_urls(self):
"""save the registration urls to files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 sec_dir = self.cluster_dir.security_dir
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 cf = self.factory
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def do_import_statements(self):
statements = self.import_statements
MinRK
Refactor newparallel to use Config system...
r3604 for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
MinRK
re-enable log forwarding and iplogger
r3989 def forward_logging(self):
if self.log_url:
self.log.info("Forwarding logging to %s"%self.log_url)
context = zmq.Context.instance()
lsock = context.socket(zmq.PUB)
lsock.connect(self.log_url)
handler = PUBHandler(lsock)
self.log.removeHandler(self._log_handler)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
self._log_handler = handler
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # #
MinRK
ipcluster implemented with new subcommands
r3986
def initialize(self, argv=None):
super(IPControllerApp, self).initialize(argv)
MinRK
re-enable log forwarding and iplogger
r3989 self.forward_logging()
MinRK
ipcluster implemented with new subcommands
r3986 self.init_hub()
self.init_schedulers()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def start(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Start the subprocesses:
MinRK
Refactor newparallel to use Config system...
r3604 self.factory.start()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 child_procs = []
for child in self.children:
child.start()
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
MinRK
Refactor newparallel to use Config system...
r3604 self.write_pid_file(overwrite=True)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPControllerApp()
MinRK
ipcluster implemented with new subcommands
r3986 app.initialize()
MinRK
Refactor newparallel to use Config system...
r3604 app.start()
if __name__ == '__main__':
launch_new_instance()