##// END OF EJS Templates
Refactor newparallel to use Config system...
Refactor newparallel to use Config system This is working, but incomplete.

File last commit:

r3604:2c044319
r3604:2c044319
Show More
controller.py
345 lines | 11.7 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""The IPython Controller with 0MQ
MinRK
general parallel code cleanup
r3556 This is the master object that handles connections from engines and clients,
and monitors traffic through the various queues.
MinRK
prep newparallel for rebase...
r3539 """
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # Copyright (C) 2010 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
added exec_key and fixed client.shutdown
r3575 import os
MinRK
Refactor newparallel to use Config system...
r3604 import sys
MinRK
Clients can now shutdown the controller.
r3580 import time
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 from multiprocessing import Process
MinRK
prep newparallel for rebase...
r3539
import zmq
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
MinRK
Refactor newparallel to use Config system...
r3604 # from zmq.devices import ProcessMonitoredQueue
MinRK
prep newparallel for rebase...
r3539
# internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, Str, Instance, List, Bool
MinRK
added zmq controller/engine entry points
r3550 from IPython.zmq.entry_point import bind_port
MinRK
prep newparallel for rebase...
r3539
MinRK
added simple cluster entry point
r3552 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 connect_logger, parse_url, signal_children, generate_exec_key,
local_logger)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
import streamsession as session
import heartmonitor
from scheduler import launch_scheduler
MinRK
Refactor newparallel to use Config system...
r3604 from hub import Hub, HubFactory
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
Started DB backend with mongoDB support.
r3579 from dictdb import DictDB
try:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 import pymongo
MinRK
Started DB backend with mongoDB support.
r3579 except ImportError:
MongoDB=None
else:
from mongodb import MongoDB
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
MinRK
added zmq controller/engine entry points
r3550 # Entry Point
MinRK
general parallel code cleanup
r3556 #-------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 def make_argument_parser():
"""Make an argument parser"""
parser = make_base_argument_parser()
MinRK
added zmq controller/engine entry points
r3550
parser.add_argument('--client', type=int, metavar='PORT', default=0,
help='set the XREP port for clients [default: random]')
parser.add_argument('--notice', type=int, metavar='PORT', default=0,
help='set the PUB socket for registration notification [default: random]')
parser.add_argument('--hb', type=str, metavar='PORTS',
help='set the 2 ports for heartbeats [default: random]')
MinRK
propagate iopub to clients
r3602 parser.add_argument('--ping', type=int, default=100,
help='set the heartbeat period in ms [default: 100]')
MinRK
added zmq controller/engine entry points
r3550 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
help='set the SUB port for queue monitoring [default: random]')
parser.add_argument('--mux', type=str, metavar='PORTS',
help='set the XREP ports for the MUX queue [default: random]')
parser.add_argument('--task', type=str, metavar='PORTS',
help='set the XREP/XREQ ports for the task queue [default: random]')
parser.add_argument('--control', type=str, metavar='PORTS',
help='set the XREP ports for the control queue [default: random]')
MinRK
propagate iopub to clients
r3602 parser.add_argument('--iopub', type=str, metavar='PORTS',
help='set the PUB/SUB ports for the iopub relay [default: random]')
parser.add_argument('--scheduler', type=str, default='lru',
MinRK
added zmq controller/engine entry points
r3550 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
MinRK
propagate iopub to clients
r3602 help='select the task scheduler [default: Python LRU]')
MinRK
Started DB backend with mongoDB support.
r3579 parser.add_argument('--mongodb', action='store_true',
help='Use MongoDB task storage [default: in-memory]')
MinRK
propagate iopub to clients
r3602 parser.add_argument('--session', type=str, default=None,
help='Manually specify the session id.')
MinRK
added zmq controller/engine entry points
r3550
MinRK
added simple cluster entry point
r3552 return parser
MinRK
Refactor newparallel to use Config system...
r3604
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
scheme = Str('pure', config=True)
usethreads = Bool(False, config=True)
# internal
children = List()
mq_class = Str('zmq.devices.ProcessMonitoredQueue')
def _update_mq(self):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
self._update_mq()
self.on_trait_change(self._update_mq, 'usethreads')
def start(self):
super(ControllerFactory, self).start()
for child in self.children:
child.start()
if not self.usethreads:
signal_children([ getattr(c, 'launcher', c) for c in self.children ])
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(self.client_addrs['iopub'])
q.bind_out(self.engine_addrs['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(self.client_addrs['mux'])
q.bind_out(self.engine_addrs['mux'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(self.client_addrs['control'])
q.bind_out(self.engine_addrs['control'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
logging.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(self.client_addrs['task'])
q.bind_out(self.engine_addrs['task'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
elif self.scheme == 'none':
logging.warn("task::using no Task scheduler")
else:
logging.warn("task::using Python %s Task scheduler"%self.scheme)
sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
q.daemon=True
children.append(q)
MinRK
added zmq controller/engine entry points
r3550
Fernando Perez
Allow argv and namespace control to be passed to engines/controller.
r3576 def main(argv=None):
MinRK
Refactor newparallel to use Config system...
r3604 """DO NOT USE ME ANYMORE"""
MinRK
added simple cluster entry point
r3552
parser = make_argument_parser()
Fernando Perez
Allow argv and namespace control to be passed to engines/controller.
r3576 args = parser.parse_args(argv)
MinRK
added simple cluster entry point
r3552 parse_url(args)
MinRK
added zmq controller/engine entry points
r3550
iface="%s://%s"%(args.transport,args.ip)+':%i'
random_ports = 0
if args.hb:
hb = split_ports(args.hb, 2)
else:
hb = select_random_ports(2)
if args.mux:
mux = split_ports(args.mux, 2)
else:
mux = None
random_ports += 2
MinRK
propagate iopub to clients
r3602 if args.iopub:
iopub = split_ports(args.iopub, 2)
else:
iopub = None
random_ports += 2
MinRK
added zmq controller/engine entry points
r3550 if args.task:
task = split_ports(args.task, 2)
else:
task = None
random_ports += 2
if args.control:
control = split_ports(args.control, 2)
else:
control = None
random_ports += 2
ctx = zmq.Context()
loop = ioloop.IOLoop.instance()
# Registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
regport = bind_port(reg, args.ip, args.regport)
### Engine connections ###
# heartbeat
hpub = ctx.socket(zmq.PUB)
bind_port(hpub, args.ip, hb[0])
hrep = ctx.socket(zmq.XREP)
bind_port(hrep, args.ip, hb[1])
hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
hmon.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
cport = bind_port(c, args.ip, args.client)
# Notifier socket
n = ZMQStream(ctx.socket(zmq.PUB), loop)
nport = bind_port(n, args.ip, args.notice)
MinRK
added exec_key and fixed client.shutdown
r3575 ### Key File ###
if args.execkey and not os.path.isfile(args.execkey):
generate_exec_key(args.execkey)
MinRK
propagate iopub to clients
r3602 thesession = session.StreamSession(username=args.ident or "controller",
keyfile=args.execkey, session=args.session)
MinRK
added zmq controller/engine entry points
r3550
### build and launch the queues ###
# monitor socket
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = bind_port(sub, args.ip, args.monitor)
sub = ZMQStream(sub, loop)
ports = select_random_ports(random_ports)
MinRK
Moved parallel test files to parallel subpackages...
r3573 children = []
MinRK
propagate iopub to clients
r3602
# IOPub relay (in a Process)
if not iopub:
iopub = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
q.bind_in(iface%iopub[1])
q.bind_out(iface%iopub[0])
q.setsockopt_in(zmq.SUBSCRIBE, '')
q.connect_mon(iface%monport)
q.daemon=True
q.start()
children.append(q.launcher)
MinRK
added zmq controller/engine entry points
r3550 # Multiplexer Queue (in a Process)
if not mux:
mux = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(iface%mux[0])
q.bind_out(iface%mux[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
MinRK
Moved parallel test files to parallel subpackages...
r3573 children.append(q.launcher)
MinRK
added zmq controller/engine entry points
r3550
# Control Queue (in a Process)
if not control:
control = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(iface%control[0])
q.bind_out(iface%control[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
MinRK
Moved parallel test files to parallel subpackages...
r3573 children.append(q.launcher)
MinRK
added zmq controller/engine entry points
r3550 # Task Queue (in a Process)
if not task:
task = (ports.pop(),ports.pop())
if args.scheduler == 'pure':
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(iface%task[0])
q.bind_out(iface%task[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
MinRK
Moved parallel test files to parallel subpackages...
r3573 children.append(q.launcher)
MinRK
added zmq controller/engine entry points
r3550 else:
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 log_addr = iface%args.logport if args.logport else None
sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
log_addr, args.loglevel, args.scheduler)
MinRK
added simple cluster entry point
r3552 print (sargs)
MinRK
Moved parallel test files to parallel subpackages...
r3573 q = Process(target=launch_scheduler, args=sargs)
q.daemon=True
q.start()
children.append(q)
MinRK
added zmq controller/engine entry points
r3550
MinRK
Started DB backend with mongoDB support.
r3579 if args.mongodb:
from mongodb import MongoDB
db = MongoDB(thesession.session)
else:
db = DictDB()
MinRK
added zmq controller/engine entry points
r3550 time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : iface%control[1],
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 'mux': iface%mux[1],
MinRK
added zmq controller/engine entry points
r3550 'heartbeat': (iface%hb[0], iface%hb[1]),
'task' : iface%task[1],
MinRK
propagate iopub to clients
r3602 'iopub' : iface%iopub[1],
MinRK
added zmq controller/engine entry points
r3550 'monitor' : iface%monport,
}
client_addrs = {
'control' : iface%control[0],
'query': iface%cport,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 'mux': iface%mux[0],
MinRK
added zmq controller/engine entry points
r3550 'task' : iface%task[0],
MinRK
propagate iopub to clients
r3602 'iopub' : iface%iopub[0],
MinRK
added zmq controller/engine entry points
r3550 'notification': iface%nport
}
MinRK
propagate iopub to clients
r3602
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # setup logging
if args.logport:
connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
else:
local_logger(args.loglevel)
MinRK
propagate iopub to clients
r3602 # register relay of signals to the children
MinRK
Moved parallel test files to parallel subpackages...
r3573 signal_children(children)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
registrar=reg, clientele=c, notifier=n, db=db,
engine_addrs=engine_addrs, client_addrs=client_addrs)
MinRK
added simple cluster entry point
r3552 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
MinRK
Clients can now shutdown the controller.
r3580 dc.start()
MinRK
Refactor newparallel to use Config system...
r3604 try:
loop.start()
except KeyboardInterrupt:
print ("interrupted, exiting...", file=sys.__stderr__)
MinRK
Moved parallel test files to parallel subpackages...
r3573
MinRK
added simple cluster entry point
r3552
if __name__ == '__main__':
main()