controller.py
116 lines
| 4.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
"""The IPython Controller with 0MQ | ||||
MinRK
|
r3605 | This is a collection of one Hub and several Schedulers. | ||
MinRK
|
r3539 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3551 | # Copyright (C) 2010 The IPython Development Team | ||
MinRK
|
r3539 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3552 | from __future__ import print_function | ||
MinRK
|
r3599 | from multiprocessing import Process | ||
MinRK
|
r3539 | |||
import zmq | ||||
MinRK
|
r3630 | from zmq.devices import ProcessMonitoredQueue | ||
MinRK
|
r3539 | # internal: | ||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3644 | from IPython.utils.traitlets import Int, CStr, Instance, List, Bool | ||
MinRK
|
r3539 | |||
MinRK
|
r3673 | from IPython.parallel.util import signal_children | ||
MinRK
|
r3642 | from .hub import Hub, HubFactory | ||
from .scheduler import launch_scheduler | ||||
MinRK
|
r3599 | |||
MinRK
|
r3605 | #----------------------------------------------------------------------------- | ||
# Configurable | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3556 | |||
MinRK
|
r3604 | |||
class ControllerFactory(HubFactory): | ||||
"""Configurable for setting up a Hub and Schedulers.""" | ||||
usethreads = Bool(False, config=True) | ||||
# internal | ||||
children = List() | ||||
MinRK
|
r3644 | mq_class = CStr('zmq.devices.ProcessMonitoredQueue') | ||
MinRK
|
r3604 | |||
MinRK
|
r3610 | def _usethreads_changed(self, name, old, new): | ||
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') | ||||
MinRK
|
r3604 | |||
def __init__(self, **kwargs): | ||||
super(ControllerFactory, self).__init__(**kwargs) | ||||
self.subconstructors.append(self.construct_schedulers) | ||||
def start(self): | ||||
super(ControllerFactory, self).start() | ||||
MinRK
|
r3630 | child_procs = [] | ||
MinRK
|
r3604 | for child in self.children: | ||
child.start() | ||||
MinRK
|
r3630 | if isinstance(child, ProcessMonitoredQueue): | ||
child_procs.append(child.launcher) | ||||
elif isinstance(child, Process): | ||||
child_procs.append(child) | ||||
if child_procs: | ||||
signal_children(child_procs) | ||||
MinRK
|
r3604 | |||
def construct_schedulers(self): | ||||
children = self.children | ||||
mq = import_item(self.mq_class) | ||||
MinRK
|
r3679 | # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url | ||
MinRK
|
r3604 | # IOPub relay (in a Process) | ||
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') | ||||
MinRK
|
r3622 | q.bind_in(self.client_info['iopub']) | ||
q.bind_out(self.engine_info['iopub']) | ||||
MinRK
|
r3604 | q.setsockopt_out(zmq.SUBSCRIBE, '') | ||
MinRK
|
r3679 | q.connect_mon(self.monitor_url) | ||
MinRK
|
r3604 | q.daemon=True | ||
children.append(q) | ||||
# Multiplexer Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | ||||
MinRK
|
r3622 | q.bind_in(self.client_info['mux']) | ||
MinRK
|
r3658 | q.setsockopt_in(zmq.IDENTITY, 'mux') | ||
MinRK
|
r3622 | q.bind_out(self.engine_info['mux']) | ||
MinRK
|
r3679 | q.connect_mon(self.monitor_url) | ||
MinRK
|
r3604 | q.daemon=True | ||
children.append(q) | ||||
# Control Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | ||||
MinRK
|
r3622 | q.bind_in(self.client_info['control']) | ||
MinRK
|
r3658 | q.setsockopt_in(zmq.IDENTITY, 'control') | ||
MinRK
|
r3622 | q.bind_out(self.engine_info['control']) | ||
MinRK
|
r3679 | q.connect_mon(self.monitor_url) | ||
MinRK
|
r3604 | q.daemon=True | ||
children.append(q) | ||||
# Task Queue (in a Process) | ||||
if self.scheme == 'pure': | ||||
MinRK
|
r3610 | self.log.warn("task::using pure XREQ Task scheduler") | ||
MinRK
|
r3604 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | ||
MinRK
|
r3622 | q.bind_in(self.client_info['task'][1]) | ||
MinRK
|
r3658 | q.setsockopt_in(zmq.IDENTITY, 'task') | ||
MinRK
|
r3622 | q.bind_out(self.engine_info['task']) | ||
MinRK
|
r3679 | q.connect_mon(self.monitor_url) | ||
MinRK
|
r3604 | q.daemon=True | ||
children.append(q) | ||||
elif self.scheme == 'none': | ||||
MinRK
|
r3610 | self.log.warn("task::using no Task scheduler") | ||
MinRK
|
r3604 | |||
else: | ||||
MinRK
|
r3611 | self.log.info("task::using Python %s Task scheduler"%self.scheme) | ||
MinRK
|
r3773 | sargs = (self.client_info['task'][1], self.engine_info['task'], | ||
self.monitor_url, self.client_info['notification']) | ||||
kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, | ||||
config=dict(self.config)) | ||||
MinRK
|
r3622 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | ||
MinRK
|
r3604 | q.daemon=True | ||
children.append(q) | ||||
MinRK
|
r3552 | |||