##// END OF EJS Templates
Merge PR #413...
Merge PR #413 Adds retries and resubmit logic to IPython.parallel. closes gh-413

File last commit:

r3785:f216d709
r3882:50deb546 merge
Show More
controller.py
116 lines | 4.4 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""The IPython Controller with 0MQ
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 This is a collection of one Hub and several Schedulers.
MinRK
prep newparallel for rebase...
r3539 """
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # Copyright (C) 2010 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 from multiprocessing import Process
MinRK
prep newparallel for rebase...
r3539
import zmq
MinRK
add default ip<x>z_config files
r3630 from zmq.devices import ProcessMonitoredQueue
MinRK
prep newparallel for rebase...
r3539 # internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
MinRK
cleanup pass
r3644 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
MinRK
prep newparallel for rebase...
r3539
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.util import signal_children
MinRK
eliminate relative imports
r3642 from .hub import Hub, HubFactory
from .scheduler import launch_scheduler
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# Configurable
#-----------------------------------------------------------------------------
MinRK
general parallel code cleanup
r3556
MinRK
Refactor newparallel to use Config system...
r3604
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
usethreads = Bool(False, config=True)
# internal
children = List()
MinRK
cleanup pass
r3644 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 def _usethreads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
MinRK
Refactor newparallel to use Config system...
r3604
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
def start(self):
super(ControllerFactory, self).start()
MinRK
add default ip<x>z_config files
r3630 child_procs = []
MinRK
Refactor newparallel to use Config system...
r3604 for child in self.children:
child.start()
MinRK
add default ip<x>z_config files
r3630 if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
MinRK
Refactor newparallel to use Config system...
r3604
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
MinRK
don't use in threads inproc://monitor
r3679 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
MinRK
Refactor newparallel to use Config system...
r3604 # IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['iopub'])
q.bind_out(self.engine_info['iopub'])
MinRK
Refactor newparallel to use Config system...
r3604 q.setsockopt_out(zmq.SUBSCRIBE, '')
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['mux'])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'mux')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['mux'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['control'])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'control')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['control'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
MinRK
rework logging connections
r3610 self.log.warn("task::using pure XREQ Task scheduler")
MinRK
Refactor newparallel to use Config system...
r3604 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['task'][1])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'task')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['task'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
elif self.scheme == 'none':
MinRK
rework logging connections
r3610 self.log.warn("task::using no Task scheduler")
MinRK
Refactor newparallel to use Config system...
r3604
else:
MinRK
add timeout for unmet dependencies in task scheduler
r3611 self.log.info("task::using Python %s Task scheduler"%self.scheme)
MinRK
pass config obj to Scheduler as dict...
r3773 sargs = (self.client_info['task'][1], self.engine_info['task'],
self.monitor_url, self.client_info['notification'])
kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
config=dict(self.config))
MinRK
newparallel tweaks, fixes...
r3622 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
MinRK
added simple cluster entry point
r3552