#!/usr/bin/env python """The IPython Controller with 0MQ This is a collection of one Hub and several Schedulers. """ #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from __future__ import print_function import logging from multiprocessing import Process import zmq # internal: from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, Str, Instance, List, Bool from entry_point import signal_children from scheduler import launch_scheduler from hub import Hub, HubFactory #----------------------------------------------------------------------------- # Configurable #----------------------------------------------------------------------------- class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" scheme = Str('pure', config=True) usethreads = Bool(False, config=True) # internal children = List() mq_class = Str('zmq.devices.ProcessMonitoredQueue') def _update_mq(self): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process') def __init__(self, **kwargs): super(ControllerFactory, self).__init__(**kwargs) self.subconstructors.append(self.construct_schedulers) self._update_mq() self.on_trait_change(self._update_mq, 'usethreads') def start(self): super(ControllerFactory, self).start() for child in self.children: child.start() if not self.usethreads: signal_children([ getattr(c, 'launcher', c) for c in self.children ]) def construct_schedulers(self): children = self.children mq = import_item(self.mq_class) # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') q.bind_in(self.client_addrs['iopub']) q.bind_out(self.engine_addrs['iopub']) q.setsockopt_out(zmq.SUBSCRIBE, '') q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Multiplexer Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') q.bind_in(self.client_addrs['mux']) q.bind_out(self.engine_addrs['mux']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Control Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') q.bind_in(self.client_addrs['control']) q.bind_out(self.engine_addrs['control']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Task Queue (in a Process) if self.scheme == 'pure': logging.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') q.bind_in(self.client_addrs['task']) q.bind_out(self.engine_addrs['task']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) elif self.scheme == 'none': logging.warn("task::using no Task scheduler") else: logging.warn("task::using Python %s Task scheduler"%self.scheme) sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) q.daemon=True children.append(q)