#!/usr/bin/env python """The IPython Controller with 0MQ This is a collection of one Hub and several Schedulers. """ #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from __future__ import print_function import logging from multiprocessing import Process import zmq from zmq.devices import ProcessMonitoredQueue # internal: from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, CStr, Instance, List, Bool from .entry_point import signal_children from .hub import Hub, HubFactory from .scheduler import launch_scheduler #----------------------------------------------------------------------------- # Configurable #----------------------------------------------------------------------------- class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" usethreads = Bool(False, config=True) # pure-zmq downstream HWM hwm = Int(0, config=True) # internal children = List() mq_class = CStr('zmq.devices.ProcessMonitoredQueue') def _usethreads_changed(self, name, old, new): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') def __init__(self, **kwargs): super(ControllerFactory, self).__init__(**kwargs) self.subconstructors.append(self.construct_schedulers) def start(self): super(ControllerFactory, self).start() child_procs = [] for child in self.children: child.start() if isinstance(child, ProcessMonitoredQueue): child_procs.append(child.launcher) elif isinstance(child, Process): child_procs.append(child) if child_procs: signal_children(child_procs) def construct_schedulers(self): children = self.children mq = import_item(self.mq_class) maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') q.bind_in(self.client_info['iopub']) q.bind_out(self.engine_info['iopub']) q.setsockopt_out(zmq.SUBSCRIBE, '') q.connect_mon(maybe_inproc) q.daemon=True children.append(q) # Multiplexer Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') q.bind_in(self.client_info['mux']) q.bind_out(self.engine_info['mux']) q.connect_mon(maybe_inproc) q.daemon=True children.append(q) # Control Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') q.bind_in(self.client_info['control']) q.bind_out(self.engine_info['control']) q.connect_mon(maybe_inproc) q.daemon=True children.append(q) # Task Queue (in a Process) if self.scheme == 'pure': self.log.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') q.setsockopt_out(zmq.HWM, self.hwm) q.bind_in(self.client_info['task'][1]) q.bind_out(self.engine_info['task']) q.connect_mon(maybe_inproc) q.daemon=True children.append(q) elif self.scheme == 'none': self.log.warn("task::using no Task scheduler") else: self.log.info("task::using Python %s Task scheduler"%self.scheme) sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification']) kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config) q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) q.daemon=True children.append(q)