##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3605:2d79d3e4
r3606:9f1a03ab
Show More
controller.py
110 lines | 3.9 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""The IPython Controller with 0MQ
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 This is a collection of one Hub and several Schedulers.
MinRK
prep newparallel for rebase...
r3539 """
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # Copyright (C) 2010 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 from multiprocessing import Process
MinRK
prep newparallel for rebase...
r3539
import zmq
# internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, Str, Instance, List, Bool
MinRK
prep newparallel for rebase...
r3539
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 from entry_point import signal_children
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
from scheduler import launch_scheduler
MinRK
Refactor newparallel to use Config system...
r3604 from hub import Hub, HubFactory
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# Configurable
#-----------------------------------------------------------------------------
MinRK
general parallel code cleanup
r3556
MinRK
Refactor newparallel to use Config system...
r3604
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
scheme = Str('pure', config=True)
usethreads = Bool(False, config=True)
# internal
children = List()
mq_class = Str('zmq.devices.ProcessMonitoredQueue')
def _update_mq(self):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
self._update_mq()
self.on_trait_change(self._update_mq, 'usethreads')
def start(self):
super(ControllerFactory, self).start()
for child in self.children:
child.start()
if not self.usethreads:
signal_children([ getattr(c, 'launcher', c) for c in self.children ])
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(self.client_addrs['iopub'])
q.bind_out(self.engine_addrs['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(self.client_addrs['mux'])
q.bind_out(self.engine_addrs['mux'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(self.client_addrs['control'])
q.bind_out(self.engine_addrs['control'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
logging.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(self.client_addrs['task'])
q.bind_out(self.engine_addrs['task'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
elif self.scheme == 'none':
logging.warn("task::using no Task scheduler")
else:
logging.warn("task::using Python %s Task scheduler"%self.scheme)
sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
q.daemon=True
children.append(q)
MinRK
added simple cluster entry point
r3552