##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3605:2d79d3e4
r3606:9f1a03ab
Show More
controller.py
110 lines | 3.9 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""The IPython Controller with 0MQ
This is a collection of one Hub and several Schedulers.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
import logging
from multiprocessing import Process
import zmq
# internal:
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, Str, Instance, List, Bool
from entry_point import signal_children
from scheduler import launch_scheduler
from hub import Hub, HubFactory
#-----------------------------------------------------------------------------
# Configurable
#-----------------------------------------------------------------------------
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
scheme = Str('pure', config=True)
usethreads = Bool(False, config=True)
# internal
children = List()
mq_class = Str('zmq.devices.ProcessMonitoredQueue')
def _update_mq(self):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
self._update_mq()
self.on_trait_change(self._update_mq, 'usethreads')
def start(self):
super(ControllerFactory, self).start()
for child in self.children:
child.start()
if not self.usethreads:
signal_children([ getattr(c, 'launcher', c) for c in self.children ])
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(self.client_addrs['iopub'])
q.bind_out(self.engine_addrs['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(self.client_addrs['mux'])
q.bind_out(self.engine_addrs['mux'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(self.client_addrs['control'])
q.bind_out(self.engine_addrs['control'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
logging.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(self.client_addrs['task'])
q.bind_out(self.engine_addrs['task'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
elif self.scheme == 'none':
logging.warn("task::using no Task scheduler")
else:
logging.warn("task::using Python %s Task scheduler"%self.scheme)
sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
q.daemon=True
children.append(q)