#!/usr/bin/env python """The IPython Controller with 0MQ This is the master object that handles connections from engines and clients, and monitors traffic through the various queues. """ #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from __future__ import print_function import os import sys import time import logging from multiprocessing import Process import zmq from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream # from zmq.devices import ProcessMonitoredQueue # internal: from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, Str, Instance, List, Bool from IPython.zmq.entry_point import bind_port from entry_point import (make_base_argument_parser, select_random_ports, split_ports, connect_logger, parse_url, signal_children, generate_exec_key, local_logger) import streamsession as session import heartmonitor from scheduler import launch_scheduler from hub import Hub, HubFactory from dictdb import DictDB try: import pymongo except ImportError: MongoDB=None else: from mongodb import MongoDB #------------------------------------------------------------------------- # Entry Point #------------------------------------------------------------------------- def make_argument_parser(): """Make an argument parser""" parser = make_base_argument_parser() parser.add_argument('--client', type=int, metavar='PORT', default=0, help='set the XREP port for clients [default: random]') parser.add_argument('--notice', type=int, metavar='PORT', default=0, help='set the PUB socket for registration notification [default: random]') parser.add_argument('--hb', type=str, metavar='PORTS', help='set the 2 ports for heartbeats [default: random]') parser.add_argument('--ping', type=int, default=100, help='set the heartbeat period in ms [default: 100]') parser.add_argument('--monitor', type=int, metavar='PORT', default=0, help='set the SUB port for queue monitoring [default: random]') parser.add_argument('--mux', type=str, metavar='PORTS', help='set the XREP ports for the MUX queue [default: random]') parser.add_argument('--task', type=str, metavar='PORTS', help='set the XREP/XREQ ports for the task queue [default: random]') parser.add_argument('--control', type=str, metavar='PORTS', help='set the XREP ports for the control queue [default: random]') parser.add_argument('--iopub', type=str, metavar='PORTS', help='set the PUB/SUB ports for the iopub relay [default: random]') parser.add_argument('--scheduler', type=str, default='lru', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], help='select the task scheduler [default: Python LRU]') parser.add_argument('--mongodb', action='store_true', help='Use MongoDB task storage [default: in-memory]') parser.add_argument('--session', type=str, default=None, help='Manually specify the session id.') return parser class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" scheme = Str('pure', config=True) usethreads = Bool(False, config=True) # internal children = List() mq_class = Str('zmq.devices.ProcessMonitoredQueue') def _update_mq(self): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process') def __init__(self, **kwargs): super(ControllerFactory, self).__init__(**kwargs) self.subconstructors.append(self.construct_schedulers) self._update_mq() self.on_trait_change(self._update_mq, 'usethreads') def start(self): super(ControllerFactory, self).start() for child in self.children: child.start() if not self.usethreads: signal_children([ getattr(c, 'launcher', c) for c in self.children ]) def construct_schedulers(self): children = self.children mq = import_item(self.mq_class) # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') q.bind_in(self.client_addrs['iopub']) q.bind_out(self.engine_addrs['iopub']) q.setsockopt_out(zmq.SUBSCRIBE, '') q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Multiplexer Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') q.bind_in(self.client_addrs['mux']) q.bind_out(self.engine_addrs['mux']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Control Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') q.bind_in(self.client_addrs['control']) q.bind_out(self.engine_addrs['control']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Task Queue (in a Process) if self.scheme == 'pure': logging.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') q.bind_in(self.client_addrs['task']) q.bind_out(self.engine_addrs['task']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) elif self.scheme == 'none': logging.warn("task::using no Task scheduler") else: logging.warn("task::using Python %s Task scheduler"%self.scheme) sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) q.daemon=True children.append(q) def main(argv=None): """DO NOT USE ME ANYMORE""" parser = make_argument_parser() args = parser.parse_args(argv) parse_url(args) iface="%s://%s"%(args.transport,args.ip)+':%i' random_ports = 0 if args.hb: hb = split_ports(args.hb, 2) else: hb = select_random_ports(2) if args.mux: mux = split_ports(args.mux, 2) else: mux = None random_ports += 2 if args.iopub: iopub = split_ports(args.iopub, 2) else: iopub = None random_ports += 2 if args.task: task = split_ports(args.task, 2) else: task = None random_ports += 2 if args.control: control = split_ports(args.control, 2) else: control = None random_ports += 2 ctx = zmq.Context() loop = ioloop.IOLoop.instance() # Registrar socket reg = ZMQStream(ctx.socket(zmq.XREP), loop) regport = bind_port(reg, args.ip, args.regport) ### Engine connections ### # heartbeat hpub = ctx.socket(zmq.PUB) bind_port(hpub, args.ip, hb[0]) hrep = ctx.socket(zmq.XREP) bind_port(hrep, args.ip, hb[1]) hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping) hmon.start() ### Client connections ### # Clientele socket c = ZMQStream(ctx.socket(zmq.XREP), loop) cport = bind_port(c, args.ip, args.client) # Notifier socket n = ZMQStream(ctx.socket(zmq.PUB), loop) nport = bind_port(n, args.ip, args.notice) ### Key File ### if args.execkey and not os.path.isfile(args.execkey): generate_exec_key(args.execkey) thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey, session=args.session) ### build and launch the queues ### # monitor socket sub = ctx.socket(zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, "") monport = bind_port(sub, args.ip, args.monitor) sub = ZMQStream(sub, loop) ports = select_random_ports(random_ports) children = [] # IOPub relay (in a Process) if not iopub: iopub = (ports.pop(),ports.pop()) q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') q.bind_in(iface%iopub[1]) q.bind_out(iface%iopub[0]) q.setsockopt_in(zmq.SUBSCRIBE, '') q.connect_mon(iface%monport) q.daemon=True q.start() children.append(q.launcher) # Multiplexer Queue (in a Process) if not mux: mux = (ports.pop(),ports.pop()) q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') q.bind_in(iface%mux[0]) q.bind_out(iface%mux[1]) q.connect_mon(iface%monport) q.daemon=True q.start() children.append(q.launcher) # Control Queue (in a Process) if not control: control = (ports.pop(),ports.pop()) q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') q.bind_in(iface%control[0]) q.bind_out(iface%control[1]) q.connect_mon(iface%monport) q.daemon=True q.start() children.append(q.launcher) # Task Queue (in a Process) if not task: task = (ports.pop(),ports.pop()) if args.scheduler == 'pure': q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') q.bind_in(iface%task[0]) q.bind_out(iface%task[1]) q.connect_mon(iface%monport) q.daemon=True q.start() children.append(q.launcher) else: log_addr = iface%args.logport if args.logport else None sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, log_addr, args.loglevel, args.scheduler) print (sargs) q = Process(target=launch_scheduler, args=sargs) q.daemon=True q.start() children.append(q) if args.mongodb: from mongodb import MongoDB db = MongoDB(thesession.session) else: db = DictDB() time.sleep(.25) # build connection dicts engine_addrs = { 'control' : iface%control[1], 'mux': iface%mux[1], 'heartbeat': (iface%hb[0], iface%hb[1]), 'task' : iface%task[1], 'iopub' : iface%iopub[1], 'monitor' : iface%monport, } client_addrs = { 'control' : iface%control[0], 'query': iface%cport, 'mux': iface%mux[0], 'task' : iface%task[0], 'iopub' : iface%iopub[0], 'notification': iface%nport } # setup logging if args.logport: connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) else: local_logger(args.loglevel) # register relay of signals to the children signal_children(children) hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, registrar=reg, clientele=c, notifier=n, db=db, engine_addrs=engine_addrs, client_addrs=client_addrs) dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) dc.start() try: loop.start() except KeyboardInterrupt: print ("interrupted, exiting...", file=sys.__stderr__) if __name__ == '__main__': main()