##// END OF EJS Templates
Renaming the special methods of the formatters....
Renaming the special methods of the formatters. The IPython formatters use special methods to compute the format of objects. These special methods have names like "__html__", but with this commit these have been changed to "_repr_html_". I have also added a Javascript formatter and fixed a bug in pylab tools in getfigs.

File last commit:

r3785:f216d709
r3878:43a27cb1
Show More
controller.py
116 lines | 4.4 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""The IPython Controller with 0MQ
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 This is a collection of one Hub and several Schedulers.
MinRK
prep newparallel for rebase...
r3539 """
#-----------------------------------------------------------------------------
MinRK
scheduler progress
r3551 # Copyright (C) 2010 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 from multiprocessing import Process
MinRK
prep newparallel for rebase...
r3539
import zmq
MinRK
add default ip<x>z_config files
r3630 from zmq.devices import ProcessMonitoredQueue
MinRK
prep newparallel for rebase...
r3539 # internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
MinRK
cleanup pass
r3644 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
MinRK
prep newparallel for rebase...
r3539
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.util import signal_children
MinRK
eliminate relative imports
r3642 from .hub import Hub, HubFactory
from .scheduler import launch_scheduler
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 #-----------------------------------------------------------------------------
# Configurable
#-----------------------------------------------------------------------------
MinRK
general parallel code cleanup
r3556
MinRK
Refactor newparallel to use Config system...
r3604
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
usethreads = Bool(False, config=True)
# internal
children = List()
MinRK
cleanup pass
r3644 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 def _usethreads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
MinRK
Refactor newparallel to use Config system...
r3604
def __init__(self, **kwargs):
super(ControllerFactory, self).__init__(**kwargs)
self.subconstructors.append(self.construct_schedulers)
def start(self):
super(ControllerFactory, self).start()
MinRK
add default ip<x>z_config files
r3630 child_procs = []
MinRK
Refactor newparallel to use Config system...
r3604 for child in self.children:
child.start()
MinRK
add default ip<x>z_config files
r3630 if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
MinRK
Refactor newparallel to use Config system...
r3604
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
MinRK
don't use in threads inproc://monitor
r3679 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
MinRK
Refactor newparallel to use Config system...
r3604 # IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['iopub'])
q.bind_out(self.engine_info['iopub'])
MinRK
Refactor newparallel to use Config system...
r3604 q.setsockopt_out(zmq.SUBSCRIBE, '')
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['mux'])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'mux')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['mux'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['control'])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'control')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['control'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
MinRK
rework logging connections
r3610 self.log.warn("task::using pure XREQ Task scheduler")
MinRK
Refactor newparallel to use Config system...
r3604 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_in(self.client_info['task'][1])
MinRK
update connections and diagrams for reduced sockets
r3658 q.setsockopt_in(zmq.IDENTITY, 'task')
MinRK
newparallel tweaks, fixes...
r3622 q.bind_out(self.engine_info['task'])
MinRK
don't use in threads inproc://monitor
r3679 q.connect_mon(self.monitor_url)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
elif self.scheme == 'none':
MinRK
rework logging connections
r3610 self.log.warn("task::using no Task scheduler")
MinRK
Refactor newparallel to use Config system...
r3604
else:
MinRK
add timeout for unmet dependencies in task scheduler
r3611 self.log.info("task::using Python %s Task scheduler"%self.scheme)
MinRK
pass config obj to Scheduler as dict...
r3773 sargs = (self.client_info['task'][1], self.engine_info['task'],
self.monitor_url, self.client_info['notification'])
kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
config=dict(self.config))
MinRK
newparallel tweaks, fixes...
r3622 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
MinRK
Refactor newparallel to use Config system...
r3604 q.daemon=True
children.append(q)
MinRK
added simple cluster entry point
r3552