diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index a77a360..baf8b43 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -15,6 +15,7 @@ import time from getpass import getpass from pprint import pprint from datetime import datetime +import warnings import json pjoin = os.path.join @@ -249,6 +250,7 @@ class Client(object): _notification_socket=None _mux_socket=None _task_socket=None + _task_scheme=None block = False outstanding=None results = None @@ -298,7 +300,6 @@ class Client(object): self._config = cfg - self._ssh = bool(sshserver or sshkey or password) if self._ssh and sshserver is None: # default to ssh via localhost @@ -360,7 +361,7 @@ class Client(object): @property def ids(self): - """Always up to date ids property.""" + """Always up-to-date ids property.""" self._flush_notifications() return self._ids @@ -370,6 +371,23 @@ class Client(object): eid = int(k) self._engines[eid] = bytes(v) # force not unicode self._ids.add(eid) + if sorted(self._engines.keys()) != range(len(self._engines)) and \ + self._task_scheme == 'pure' and self._task_socket: + self._stop_scheduling_tasks() + + def _stop_scheduling_tasks(self): + """Stop scheduling tasks because an engine has been unregistered + from a pure ZMQ scheduler. + """ + + self._task_socket.close() + self._task_socket = None + msg = "An engine has been unregistered, and we are using pure " +\ + "ZMQ task scheduling. Task farming will be disabled." + if self.outstanding: + msg += " If you were running tasks when this happened, " +\ + "some `outstanding` msg_ids may never resolve." + warnings.warn(msg, RuntimeWarning) def _build_targets(self, targets): """Turn valid target IDs or 'all' into two lists: @@ -389,6 +407,8 @@ class Client(object): def _connect(self, sshserver, ssh_kwargs): """setup all our socket connections to the controller. This is called from __init__.""" + + # Maybe allow reconnecting? if self._connected: return self._connected=True @@ -406,15 +426,17 @@ class Client(object): pprint(msg) msg = ss.Message(msg) content = msg.content + self._config['registration'] = dict(content) if content.status == 'ok': if content.mux: self._mux_socket = self.context.socket(zmq.PAIR) self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) connect_socket(self._mux_socket, content.mux) if content.task: + self._task_scheme, task_addr = content.task self._task_socket = self.context.socket(zmq.PAIR) self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) - connect_socket(self._task_socket, content.task) + connect_socket(self._task_socket, task_addr) if content.notification: self._notification_socket = self.context.socket(zmq.SUB) connect_socket(self._notification_socket, content.notification) @@ -457,6 +479,8 @@ class Client(object): if eid in self._ids: self._ids.remove(eid) self._engines.pop(eid) + if self._task_socket and self._task_scheme == 'pure': + self._stop_scheduling_tasks() def _extract_metadata(self, header, parent, content): md = {'msg_id' : parent['msg_id'], @@ -937,8 +961,15 @@ class Client(object): options = dict(bound=bound, block=block) if targets is None: - return self._apply_balanced(f, args, kwargs, timeout=timeout, + if self._task_socket: + return self._apply_balanced(f, args, kwargs, timeout=timeout, after=after, follow=follow, **options) + else: + msg = "Task farming is disabled" + if self._task_scheme == 'pure': + msg += " because the pure ZMQ scheduler cannot handle" + msg += " disappearing engines." + raise RuntimeError(msg) else: return self._apply_direct(f, args, kwargs, targets=targets, **options) @@ -1103,12 +1134,13 @@ class Client(object): completed = [] local_results = {} - # temporarily disable local shortcut - # for msg_id in list(theids): - # if msg_id in self.results: - # completed.append(msg_id) - # local_results[msg_id] = self.results[msg_id] - # theids.remove(msg_id) + + # comment this block out to temporarily disable local shortcut: + for msg_id in list(theids): + if msg_id in self.results: + completed.append(msg_id) + local_results[msg_id] = self.results[msg_id] + theids.remove(msg_id) if theids: # some not locally cached content = dict(msg_ids=theids, status_only=status_only) diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 26b7ea5..43aa10e 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -37,7 +37,6 @@ from hub import Hub, HubFactory class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" - scheme = Str('pure', config=True) usethreads = Bool(False, config=True) # internal @@ -65,8 +64,8 @@ class ControllerFactory(HubFactory): # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') - q.bind_in(self.client_addrs['iopub']) - q.bind_out(self.engine_addrs['iopub']) + q.bind_in(self.client_info['iopub']) + q.bind_out(self.engine_info['iopub']) q.setsockopt_out(zmq.SUBSCRIBE, '') q.connect_mon(self.monitor_url) q.daemon=True @@ -74,16 +73,16 @@ class ControllerFactory(HubFactory): # Multiplexer Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') - q.bind_in(self.client_addrs['mux']) - q.bind_out(self.engine_addrs['mux']) + q.bind_in(self.client_info['mux']) + q.bind_out(self.engine_info['mux']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) # Control Queue (in a Process) q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') - q.bind_in(self.client_addrs['control']) - q.bind_out(self.engine_addrs['control']) + q.bind_in(self.client_info['control']) + q.bind_out(self.engine_info['control']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) @@ -91,8 +90,8 @@ class ControllerFactory(HubFactory): if self.scheme == 'pure': self.log.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') - q.bind_in(self.client_addrs['task']) - q.bind_out(self.engine_addrs['task']) + q.bind_in(self.client_info['task'][1]) + q.bind_out(self.engine_info['task']) q.connect_mon(self.monitor_url) q.daemon=True children.append(q) @@ -101,8 +100,9 @@ class ControllerFactory(HubFactory): else: self.log.info("task::using Python %s Task scheduler"%self.scheme) - sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) - q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level)) + sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification']) + kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config) + q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) q.daemon=True children.append(q) diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 2b3a2aa..3c9b31c 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -99,6 +99,9 @@ class EngineConnector(HasTraits): class HubFactory(RegistrationFactory): """The Configurable for setting up a Hub.""" + # name of a scheduler scheme + scheme = Str('lru', config=True) + # port-pairs for monitoredqueues: hb = Instance(list, config=True) def _hb_default(self): @@ -238,7 +241,7 @@ class HubFactory(RegistrationFactory): time.sleep(.25) # build connection dicts - self.engine_addrs = { + self.engine_info = { 'control' : engine_iface%self.control[1], 'mux': engine_iface%self.mux[1], 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), @@ -247,19 +250,19 @@ class HubFactory(RegistrationFactory): # 'monitor' : engine_iface%self.mon_port, } - self.client_addrs = { + self.client_info = { 'control' : client_iface%self.control[0], 'query': client_iface%self.query_port, 'mux': client_iface%self.mux[0], - 'task' : client_iface%self.task[0], + 'task' : (self.scheme, client_iface%self.task[0]), 'iopub' : client_iface%self.iopub[0], 'notification': client_iface%self.notifier_port } - self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs) - self.log.debug("hub::Hub client addrs: %s"%self.client_addrs) + self.log.debug("hub::Hub engine addrs: %s"%self.engine_info) + self.log.debug("hub::Hub client addrs: %s"%self.client_info) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, registrar=reg, clientele=c, notifier=n, db=self.db, - engine_addrs=self.engine_addrs, client_addrs=self.client_addrs, + engine_info=self.engine_info, client_info=self.client_info, logname=self.log.name) @@ -279,9 +282,9 @@ class Hub(LoggingFactory): notifier: ZMQStream for broadcasting engine registration changes (PUB) db: connection to db for out of memory logging of commands NotImplemented - engine_addrs: dict of zmq connection information for engines to connect + engine_info: dict of zmq connection information for engines to connect to the queues. - client_addrs: dict of zmq connection information for engines to connect + client_info: dict of zmq connection information for engines to connect to the queues. """ # internal data structures: @@ -309,8 +312,8 @@ class Hub(LoggingFactory): heartmonitor=Instance(HeartMonitor) notifier=Instance(ZMQStream) db=Instance(object) - client_addrs=Dict() - engine_addrs=Dict() + client_info=Dict() + engine_info=Dict() def __init__(self, **kwargs): @@ -326,16 +329,21 @@ class Hub(LoggingFactory): clientele: ZMQStream for client connections # extra: db: ZMQStream for db connection (NotImplemented) - engine_addrs: zmq address/protocol dict for engine connections - client_addrs: zmq address/protocol dict for client connections + engine_info: zmq address/protocol dict for engine connections + client_info: zmq address/protocol dict for client connections """ super(Hub, self).__init__(**kwargs) self.registration_timeout = max(5000, 2*self.heartmonitor.period) # validate connection dicts: - validate_url_container(self.client_addrs) - validate_url_container(self.engine_addrs) + for k,v in self.client_info.iteritems(): + if k == 'task': + validate_url_container(v[1]) + else: + validate_url_container(v) + # validate_url_container(self.client_info) + validate_url_container(self.engine_info) # register our callbacks self.registrar.on_recv(self.dispatch_register_request) @@ -764,7 +772,7 @@ class Hub(LoggingFactory): """Reply with connection addresses for clients.""" self.log.info("client::client %s connected"%client_id) content = dict(status='ok') - content.update(self.client_addrs) + content.update(self.client_info) jsonable = {} for k,v in self.keytable.iteritems(): jsonable[str(k)] = v @@ -787,7 +795,7 @@ class Hub(LoggingFactory): self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) content = dict(id=eid,status='ok') - content.update(self.engine_addrs) + content.update(self.engine_info) # check if requesting available IDs: if queue in self.by_ident: try: diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py index 40c7466..e5c6334 100755 --- a/IPython/zmq/parallel/ipcontrollerapp.py +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -190,7 +190,7 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): 'connections, respectively [default: random]', metavar='Scheduler.iopub_ports') paa('--scheme', - type=str, dest='ControllerFactory.scheme', + type=str, dest='HubFactory.scheme', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], help='select the task scheduler scheme [default: Python LRU]', metavar='Scheduler.scheme') diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 5f4f687..ae2f851 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -294,7 +294,7 @@ class TaskScheduler(SessionFactory): else: self.save_unmet(msg_id, raw_msg, after, follow, timeout) - @logged + # @logged def audit_timeouts(self): """Audit all waiting tasks for expired timeouts.""" now = datetime.now() @@ -506,13 +506,13 @@ class TaskScheduler(SessionFactory): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream ctx = zmq.Context() loop = ioloop.IOLoop() - + print (in_addr, out_addr, mon_addr, not_addr) ins = ZMQStream(ctx.socket(zmq.XREP),loop) ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.XREP),loop) @@ -532,14 +532,11 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, - scheme=scheme, loop=loop, logname=logname) + scheme=scheme, loop=loop, logname=logname, + config=config) scheduler.start() try: loop.start() except KeyboardInterrupt: print ("interrupted, exiting...", file=sys.__stderr__) - -if __name__ == '__main__': - iface = 'tcp://127.0.0.1:%i' - launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348) diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py index 2cba71c..bf9df96 100644 --- a/IPython/zmq/parallel/util.py +++ b/IPython/zmq/parallel/util.py @@ -97,8 +97,8 @@ def disambiguate_ip_address(ip, location=None): external_ips = socket.gethostbyname_ex(socket.gethostname())[2] if location is None or location in external_ips: ip='127.0.0.1' - elif external_ips: - ip=external_ips[0] + elif location: + return location return ip def disambiguate_url(url, location=None): diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 541faaa..3249dd2 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -342,7 +342,7 @@ class LoadBalancedView(View): TODO: allow subset of engines across which to balance. """ def __repr__(self): - return "<%s %s>"%(self.__class__.__name__, self.client._addr) + return "<%s %s>"%(self.__class__.__name__, self.client._config['url']) @property def targets(self): diff --git a/docs/source/parallelz/parallel_demos.txt b/docs/source/parallelz/parallel_demos.txt index da44f42..ee1a329 100644 --- a/docs/source/parallelz/parallel_demos.txt +++ b/docs/source/parallelz/parallel_demos.txt @@ -4,7 +4,7 @@ Parallel examples .. note:: - Performance numbers from ``IPython.kernel``, not newparallel + Performance numbers from ``IPython.kernel``, not newparallel. In this section we describe two more involved examples of using an IPython cluster to perform a parallel computation. In these examples, we will be using @@ -34,7 +34,7 @@ million digits. In both the serial and parallel calculation we will be using functions defined in the :file:`pidigits.py` file, which is available in the -:file:`docs/examples/kernel` directory of the IPython source distribution. +:file:`docs/examples/newparallel` directory of the IPython source distribution. These functions provide basic facilities for working with the digits of pi and can be loaded into IPython by putting :file:`pidigits.py` in your current working directory and then doing: @@ -46,7 +46,7 @@ working directory and then doing: Serial calculation ------------------ -For the serial calculation, we will use SymPy (http://www.sympy.org) to +For the serial calculation, we will use `SymPy `_ to calculate 10,000 digits of pi and then look at the frequencies of the digits 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While SymPy is capable of calculating many more digits of pi, our purpose here is to @@ -110,7 +110,7 @@ calculation, we will need two top-level functions from :file:`pidigits.py`: .. literalinclude:: ../../examples/newparallel/pidigits.py :language: python - :lines: 34-49 + :lines: 41-56 We will also use the :func:`plot_two_digit_freqs` function to plot the results. The code to run this calculation in parallel is contained in @@ -215,12 +215,12 @@ simulation of the underlying asset price. In this example we use this approach to price both European and Asian (path dependent) options for various strike prices and volatilities. -The code for this example can be found in the :file:`docs/examples/kernel` +The code for this example can be found in the :file:`docs/examples/newparallel` directory of the IPython source. The function :func:`price_options` in :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using the NumPy package and is shown here: -.. literalinclude:: ../../examples/kernel/mcpricer.py +.. literalinclude:: ../../examples/newparallel/mcpricer.py :language: python To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, @@ -233,7 +233,7 @@ be found in the file :file:`mcpricer.py`. The code in this file creates a volatilities and strike prices. The results are then plotted as a 2D contour plot using Matplotlib. -.. literalinclude:: ../../examples/kernel/mcdriver.py +.. literalinclude:: ../../examples/newparallel/mcdriver.py :language: python To use this code, start an IPython cluster using :command:`ipclusterz`, open