controller.py
110 lines
| 3.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
"""The IPython Controller with 0MQ | ||||
MinRK
|
r3605 | This is a collection of one Hub and several Schedulers. | ||
MinRK
|
r3539 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3551 | # Copyright (C) 2010 The IPython Development Team | ||
MinRK
|
r3539 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3552 | from __future__ import print_function | ||
MinRK
|
r3603 | import logging | ||
MinRK
|
r3599 | from multiprocessing import Process | ||
MinRK
|
r3539 | |||
import zmq | ||||
# internal: | ||||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
from IPython.utils.traitlets import Int, Str, Instance, List, Bool | ||||
MinRK
|
r3539 | |||
MinRK
|
r3605 | from entry_point import signal_children | ||
MinRK
|
r3599 | |||
from scheduler import launch_scheduler | ||||
MinRK
|
r3604 | from hub import Hub, HubFactory | ||
MinRK
|
r3599 | |||
MinRK
|
r3605 | #----------------------------------------------------------------------------- | ||
# Configurable | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3556 | |||
MinRK
|
r3604 | |||
class ControllerFactory(HubFactory): | ||||
"""Configurable for setting up a Hub and Schedulers.""" | ||||
scheme = Str('pure', config=True) | ||||
usethreads = Bool(False, config=True) | ||||
# internal | ||||
children = List() | ||||
mq_class = Str('zmq.devices.ProcessMonitoredQueue') | ||||
def _update_mq(self): | ||||
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process') | ||||
def __init__(self, **kwargs): | ||||
super(ControllerFactory, self).__init__(**kwargs) | ||||
self.subconstructors.append(self.construct_schedulers) | ||||
self._update_mq() | ||||
self.on_trait_change(self._update_mq, 'usethreads') | ||||
def start(self): | ||||
super(ControllerFactory, self).start() | ||||
for child in self.children: | ||||
child.start() | ||||
if not self.usethreads: | ||||
signal_children([ getattr(c, 'launcher', c) for c in self.children ]) | ||||
def construct_schedulers(self): | ||||
children = self.children | ||||
mq = import_item(self.mq_class) | ||||
# IOPub relay (in a Process) | ||||
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') | ||||
q.bind_in(self.client_addrs['iopub']) | ||||
q.bind_out(self.engine_addrs['iopub']) | ||||
q.setsockopt_out(zmq.SUBSCRIBE, '') | ||||
q.connect_mon(self.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
# Multiplexer Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | ||||
q.bind_in(self.client_addrs['mux']) | ||||
q.bind_out(self.engine_addrs['mux']) | ||||
q.connect_mon(self.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
# Control Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | ||||
q.bind_in(self.client_addrs['control']) | ||||
q.bind_out(self.engine_addrs['control']) | ||||
q.connect_mon(self.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
# Task Queue (in a Process) | ||||
if self.scheme == 'pure': | ||||
logging.warn("task::using pure XREQ Task scheduler") | ||||
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | ||||
q.bind_in(self.client_addrs['task']) | ||||
q.bind_out(self.engine_addrs['task']) | ||||
q.connect_mon(self.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
elif self.scheme == 'none': | ||||
logging.warn("task::using no Task scheduler") | ||||
else: | ||||
logging.warn("task::using Python %s Task scheduler"%self.scheme) | ||||
sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) | ||||
q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) | ||||
q.daemon=True | ||||
children.append(q) | ||||
MinRK
|
r3552 | |||