From e90463bace52cbdc9e2988bc2889e21757677057 2011-04-08 00:38:26 From: MinRK Date: 2011-04-08 00:38:26 Subject: [PATCH] update API after sagedays29 tests, docs updated to match * Client no longer has high-level methods (only in Views) * module functions can be pushed * clients can have a connection timeout * dependencies have separate switches for success/failure, not just success_only * add `with view.temp_flags(**flags):` for temporary flags Also updated some docs and examples --- diff --git a/IPython/utils/pickleutil.py b/IPython/utils/pickleutil.py index 2a92e18..669df9d 100644 --- a/IPython/utils/pickleutil.py +++ b/IPython/utils/pickleutil.py @@ -15,10 +15,9 @@ __docformat__ = "restructuredtext en" # Imports #------------------------------------------------------------------------------- -from types import FunctionType import copy - -from IPython.zmq.parallel.dependency import dependent +import sys +from types import FunctionType import codeutil @@ -67,12 +66,22 @@ class CannedFunction(CannedObject): self._checkType(f) self.code = f.func_code self.defaults = f.func_defaults + self.module = f.__module__ or '__main__' self.__name__ = f.__name__ def _checkType(self, obj): assert isinstance(obj, FunctionType), "Not a function type" def getObject(self, g=None): + # try to load function back into its module: + if not self.module.startswith('__'): + try: + __import__(self.module) + except ImportError: + pass + else: + g = sys.modules[self.module].__dict__ + if g is None: g = globals() newFunc = FunctionType(self.code, g, self.__name__, self.defaults) @@ -82,8 +91,9 @@ class CannedFunction(CannedObject): # Functions #------------------------------------------------------------------------------- - def can(obj): + # import here to prevent module-level circular imports + from IPython.zmq.parallel.dependency import dependent if isinstance(obj, dependent): keys = ('f','df') return CannedObject(obj, keys=keys) diff --git a/IPython/zmq/parallel/asyncresult.py b/IPython/zmq/parallel/asyncresult.py index b1ba39b..c2f283e 100644 --- a/IPython/zmq/parallel/asyncresult.py +++ b/IPython/zmq/parallel/asyncresult.py @@ -12,6 +12,8 @@ import time +from zmq import MessageTracker + from IPython.external.decorator import decorator from . import error @@ -19,6 +21,9 @@ from . import error # Classes #----------------------------------------------------------------------------- +# global empty tracker that's always done: +finished_tracker = MessageTracker() + @decorator def check_ready(f, self, *args, **kwargs): """Call spin() to sync state prior to calling the method.""" @@ -36,18 +41,26 @@ class AsyncResult(object): msg_ids = None _targets = None _tracker = None + _single_result = False def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None): - self._client = client if isinstance(msg_ids, basestring): + # always a list msg_ids = [msg_ids] + if tracker is None: + # default to always done + tracker = finished_tracker + self._client = client self.msg_ids = msg_ids self._fname=fname self._targets = targets self._tracker = tracker self._ready = False self._success = None - self._single_result = len(msg_ids) == 1 + if len(msg_ids) == 1: + self._single_result = not isinstance(targets, (list, tuple)) + else: + self._single_result = False def __repr__(self): if self._ready: @@ -99,7 +112,7 @@ class AsyncResult(object): """ if self._ready: return - self._ready = self._client.barrier(self.msg_ids, timeout) + self._ready = self._client.wait(self.msg_ids, timeout) if self._ready: try: results = map(self._client.results.get, self.msg_ids) @@ -149,10 +162,9 @@ class AsyncResult(object): return dict(zip(engine_ids,results)) @property - @check_ready def result(self): """result property wrapper for `get(timeout=0)`.""" - return self._result + return self.get() # abbreviated alias: r = result @@ -169,7 +181,7 @@ class AsyncResult(object): @property def result_dict(self): """result property as a dict.""" - return self.get_dict(0) + return self.get_dict() def __dict__(self): return self.get_dict(0) @@ -181,11 +193,17 @@ class AsyncResult(object): @property def sent(self): - """check whether my messages have been sent""" - if self._tracker is None: - return True - else: - return self._tracker.done + """check whether my messages have been sent.""" + return self._tracker.done + + def wait_for_send(self, timeout=-1): + """wait for pyzmq send to complete. + + This is necessary when sending arrays that you intend to edit in-place. + `timeout` is in seconds, and will raise TimeoutError if it is reached + before the send completes. + """ + return self._tracker.wait(timeout) #------------------------------------- # dict-access @@ -285,7 +303,7 @@ class AsyncHubResult(AsyncResult): if self._ready: return local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) - local_ready = self._client.barrier(local_ids, timeout) + local_ready = self._client.wait(local_ids, timeout) if local_ready: remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) if not remote_ids: diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 0d7e5a4..2045266 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -1,4 +1,4 @@ -"""A semi-synchronous Client for the ZMQ controller""" +"""A semi-synchronous Client for the ZMQ cluster""" #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # @@ -31,57 +31,26 @@ from IPython.external.decorator import decorator from IPython.external.ssh import tunnel from . import error -from . import map as Map from . import util from . import streamsession as ss from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult from .clusterdir import ClusterDir, ClusterDirError from .dependency import Dependency, depend, require, dependent from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction -from .util import ReverseDict, validate_url, disambiguate_url from .view import DirectView, LoadBalancedView #-------------------------------------------------------------------------- -# helpers for implementing old MEC API via client.apply -#-------------------------------------------------------------------------- - -def _push(user_ns, **ns): - """helper method for implementing `client.push` via `client.apply`""" - user_ns.update(ns) - -def _pull(user_ns, keys): - """helper method for implementing `client.pull` via `client.apply`""" - if isinstance(keys, (list,tuple, set)): - for key in keys: - if not user_ns.has_key(key): - raise NameError("name '%s' is not defined"%key) - return map(user_ns.get, keys) - else: - if not user_ns.has_key(keys): - raise NameError("name '%s' is not defined"%keys) - return user_ns.get(keys) - -def _clear(user_ns): - """helper method for implementing `client.clear` via `client.apply`""" - user_ns.clear() - -def _execute(user_ns, code): - """helper method for implementing `client.execute` via `client.apply`""" - exec code in user_ns - - -#-------------------------------------------------------------------------- # Decorators for Client methods #-------------------------------------------------------------------------- @decorator -def spinfirst(f, self, *args, **kwargs): +def spin_first(f, self, *args, **kwargs): """Call spin() to sync state prior to calling the method.""" self.spin() return f(self, *args, **kwargs) @decorator -def defaultblock(f, self, *args, **kwargs): +def default_block(f, self, *args, **kwargs): """Default to self.block; preserve self.block.""" block = kwargs.get('block',None) block = self.block if block is None else block @@ -151,7 +120,7 @@ class Metadata(dict): class Client(HasTraits): - """A semi-synchronous client to the IPython ZMQ controller + """A semi-synchronous client to the IPython ZMQ cluster Parameters ---------- @@ -193,11 +162,11 @@ class Client(HasTraits): flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else] - #------- exec authentication args ------- - # If even localhost is untrusted, you can have some protection against - # unauthorized execution by using a key. Messages are still sent - # as cleartext, so if someone can snoop your loopback traffic this will - # not help against malicious attacks. + ------- exec authentication args ------- + If even localhost is untrusted, you can have some protection against + unauthorized execution by using a key. Messages are still sent + as cleartext, so if someone can snoop your loopback traffic this will + not help against malicious attacks. exec_key : str an authentication key or file containing a key @@ -207,7 +176,7 @@ class Client(HasTraits): Attributes ---------- - ids : set of int engine IDs + ids : list of int engine IDs requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, use semi-private _ids attributes. @@ -234,15 +203,18 @@ class Client(HasTraits): flushes incoming results and registration state changes control methods spin, and requesting `ids` also ensures up to date - barrier + wait wait on one or more msg_ids execution methods apply legacy: execute, run + data movement + push, pull, scatter, gather + query methods - queue_status, get_result, purge + queue_status, get_result, purge, result_status control methods abort, shutdown @@ -264,23 +236,25 @@ class Client(HasTraits): _ssh=Bool(False) _context = Instance('zmq.Context') _config = Dict() - _engines=Instance(ReverseDict, (), {}) + _engines=Instance(util.ReverseDict, (), {}) # _hub_socket=Instance('zmq.Socket') _query_socket=Instance('zmq.Socket') _control_socket=Instance('zmq.Socket') _iopub_socket=Instance('zmq.Socket') _notification_socket=Instance('zmq.Socket') - _apply_socket=Instance('zmq.Socket') - _mux_ident=Str() - _task_ident=Str() + _mux_socket=Instance('zmq.Socket') + _task_socket=Instance('zmq.Socket') _task_scheme=Str() _balanced_views=Dict() _direct_views=Dict() _closed = False + _ignored_control_replies=Int(0) + _ignored_hub_replies=Int(0) def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None, context=None, username=None, debug=False, exec_key=None, sshserver=None, sshkey=None, password=None, paramiko=None, + timeout=10 ): super(Client, self).__init__(debug=debug, profile=profile) if context is None: @@ -292,11 +266,11 @@ class Client(HasTraits): if self._cd is not None: if url_or_file is None: url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') - assert url_or_file is not None, "I can't find enough information to connect to a controller!"\ + assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ " Please specify at least one of url_or_file or profile." try: - validate_url(url_or_file) + util.validate_url(url_or_file) except AssertionError: if not os.path.exists(url_or_file): if self._cd: @@ -316,7 +290,7 @@ class Client(HasTraits): sshserver=cfg['ssh'] url = cfg['url'] location = cfg.setdefault('location', None) - cfg['url'] = disambiguate_url(cfg['url'], location) + cfg['url'] = util.disambiguate_url(cfg['url'], location) url = cfg['url'] self._config = cfg @@ -351,10 +325,11 @@ class Client(HasTraits): self._notification_handlers = {'registration_notification' : self._register_engine, 'unregistration_notification' : self._unregister_engine, + 'shutdown_notification' : lambda msg: self.close(), } self._queue_handlers = {'execute_reply' : self._handle_execute_reply, 'apply_reply' : self._handle_apply_reply} - self._connect(sshserver, ssh_kwargs) + self._connect(sshserver, ssh_kwargs, timeout) def __del__(self): """cleanup sockets, but _not_ context.""" @@ -378,22 +353,6 @@ class Client(HasTraits): pass self._cd = None - @property - def ids(self): - """Always up-to-date ids property.""" - self._flush_notifications() - # always copy: - return list(self._ids) - - def close(self): - if self._closed: - return - snames = filter(lambda n: n.endswith('socket'), dir(self)) - for socket in map(lambda name: getattr(self, name), snames): - if isinstance(socket, zmq.Socket) and not socket.closed: - socket.close() - self._closed = True - def _update_engines(self, engines): """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" for k,v in engines.iteritems(): @@ -402,16 +361,15 @@ class Client(HasTraits): self._ids.append(eid) self._ids = sorted(self._ids) if sorted(self._engines.keys()) != range(len(self._engines)) and \ - self._task_scheme == 'pure' and self._task_ident: + self._task_scheme == 'pure' and self._task_socket: self._stop_scheduling_tasks() def _stop_scheduling_tasks(self): """Stop scheduling tasks because an engine has been unregistered from a pure ZMQ scheduler. """ - self._task_ident = '' - # self._task_socket.close() - # self._task_socket = None + self._task_socket.close() + self._task_socket = None msg = "An engine has been unregistered, and we are using pure " +\ "ZMQ task scheduling. Task farming will be disabled." if self.outstanding: @@ -434,8 +392,8 @@ class Client(HasTraits): targets = [targets] return [self._engines[t] for t in targets], list(targets) - def _connect(self, sshserver, ssh_kwargs): - """setup all our socket connections to the controller. This is called from + def _connect(self, sshserver, ssh_kwargs, timeout): + """setup all our socket connections to the cluster. This is called from __init__.""" # Maybe allow reconnecting? @@ -444,13 +402,16 @@ class Client(HasTraits): self._connected=True def connect_socket(s, url): - url = disambiguate_url(url, self._config['location']) + url = util.disambiguate_url(url, self._config['location']) if self._ssh: return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) else: return s.connect(url) self.session.send(self._query_socket, 'connection_request') + r,w,x = zmq.select([self._query_socket],[],[], timeout) + if not r: + raise error.TimeoutError("Hub connection request timed out") idents,msg = self.session.recv(self._query_socket,mode=0) if self.debug: pprint(msg) @@ -458,18 +419,15 @@ class Client(HasTraits): content = msg.content self._config['registration'] = dict(content) if content.status == 'ok': - self._apply_socket = self._context.socket(zmq.XREP) - self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session) if content.mux: - # self._mux_socket = self._context.socket(zmq.XREQ) - self._mux_ident = 'mux' - connect_socket(self._apply_socket, content.mux) + self._mux_socket = self._context.socket(zmq.XREQ) + self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) + connect_socket(self._mux_socket, content.mux) if content.task: self._task_scheme, task_addr = content.task - # self._task_socket = self._context.socket(zmq.XREQ) - # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) - connect_socket(self._apply_socket, task_addr) - self._task_ident = 'task' + self._task_socket = self._context.socket(zmq.XREQ) + self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) + connect_socket(self._task_socket, task_addr) if content.notification: self._notification_socket = self._context.socket(zmq.SUB) connect_socket(self._notification_socket, content.notification) @@ -488,8 +446,6 @@ class Client(HasTraits): self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) connect_socket(self._iopub_socket, content.iopub) self._update_engines(dict(content.engines)) - # give XREP apply_socket some time to connect - time.sleep(0.25) else: self._connected = False raise Exception("Failed to connect!") @@ -499,7 +455,7 @@ class Client(HasTraits): #-------------------------------------------------------------------------- def _unwrap_exception(self, content): - """unwrap exception, and remap engineid to int.""" + """unwrap exception, and remap engine_id to int.""" e = error.unwrap_exception(content) # print e.traceback if e.engine_info: @@ -545,7 +501,7 @@ class Client(HasTraits): self._handle_stranded_msgs(eid, uuid) - if self._task_ident and self._task_scheme == 'pure': + if self._task_socket and self._task_scheme == 'pure': self._stop_scheduling_tasks() def _handle_stranded_msgs(self, eid, uuid): @@ -622,7 +578,7 @@ class Client(HasTraits): if content['status'] == 'ok': self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] elif content['status'] == 'aborted': - self.results[msg_id] = error.AbortedTask(msg_id) + self.results[msg_id] = error.TaskAborted(msg_id) elif content['status'] == 'resubmitted': # TODO: handle resubmission pass @@ -665,12 +621,26 @@ class Client(HasTraits): in the ZMQ queue. Currently: ignore them.""" + if self._ignored_control_replies <= 0: + return msg = self.session.recv(sock, mode=zmq.NOBLOCK) while msg is not None: + self._ignored_control_replies -= 1 if self.debug: pprint(msg) msg = self.session.recv(sock, mode=zmq.NOBLOCK) + def _flush_ignored_control(self): + """flush ignored control replies""" + while self._ignored_control_replies > 0: + self.session.recv(self._control_socket) + self._ignored_control_replies -= 1 + + def _flush_ignored_hub_replies(self): + msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) + while msg is not None: + msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) + def _flush_iopub(self, sock): """Flush replies from the iopub channel waiting in the ZMQ queue. @@ -718,26 +688,46 @@ class Client(HasTraits): if not isinstance(key, (int, slice, tuple, list, xrange)): raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) else: - return self.view(key, balanced=False) + return self._get_view(key, balanced=False) #-------------------------------------------------------------------------- # Begin public methods #-------------------------------------------------------------------------- + @property + def ids(self): + """Always up-to-date ids property.""" + self._flush_notifications() + # always copy: + return list(self._ids) + + def close(self): + if self._closed: + return + snames = filter(lambda n: n.endswith('socket'), dir(self)) + for socket in map(lambda name: getattr(self, name), snames): + if isinstance(socket, zmq.Socket) and not socket.closed: + socket.close() + self._closed = True + def spin(self): """Flush any registration notifications and execution results waiting in the ZMQ queue. """ if self._notification_socket: self._flush_notifications() - if self._apply_socket: - self._flush_results(self._apply_socket) + if self._mux_socket: + self._flush_results(self._mux_socket) + if self._task_socket: + self._flush_results(self._task_socket) if self._control_socket: self._flush_control(self._control_socket) if self._iopub_socket: self._flush_iopub(self._iopub_socket) + if self._query_socket: + self._flush_ignored_hub_replies() - def barrier(self, jobs=None, timeout=-1): + def wait(self, jobs=None, timeout=-1): """waits on one or more `jobs`, for up to `timeout` seconds. Parameters @@ -786,8 +776,8 @@ class Client(HasTraits): # Control methods #-------------------------------------------------------------------------- - @spinfirst - @defaultblock + @spin_first + @default_block def clear(self, targets=None, block=None): """Clear the namespace in target(s).""" targets = self._build_targets(targets)[0] @@ -795,18 +785,21 @@ class Client(HasTraits): self.session.send(self._control_socket, 'clear_request', content={}, ident=t) error = False if self.block: + self._flush_ignored_control() for i in range(len(targets)): idents,msg = self.session.recv(self._control_socket,0) if self.debug: pprint(msg) if msg['content']['status'] != 'ok': error = self._unwrap_exception(msg['content']) + else: + self._ignored_control_replies += len(targets) if error: raise error - @spinfirst - @defaultblock + @spin_first + @default_block def abort(self, jobs=None, targets=None, block=None): """Abort specific jobs from the execution queues of target(s). @@ -839,35 +832,41 @@ class Client(HasTraits): content=content, ident=t) error = False if self.block: + self._flush_ignored_control() for i in range(len(targets)): idents,msg = self.session.recv(self._control_socket,0) if self.debug: pprint(msg) if msg['content']['status'] != 'ok': error = self._unwrap_exception(msg['content']) + else: + self._ignored_control_replies += len(targets) if error: raise error - @spinfirst - @defaultblock - def shutdown(self, targets=None, restart=False, controller=False, block=None): - """Terminates one or more engine processes, optionally including the controller.""" - if controller: + @spin_first + @default_block + def shutdown(self, targets=None, restart=False, hub=False, block=None): + """Terminates one or more engine processes, optionally including the hub.""" + if hub: targets = 'all' targets = self._build_targets(targets)[0] for t in targets: self.session.send(self._control_socket, 'shutdown_request', content={'restart':restart},ident=t) error = False - if block or controller: + if block or hub: + self._flush_ignored_control() for i in range(len(targets)): - idents,msg = self.session.recv(self._control_socket,0) + idents,msg = self.session.recv(self._control_socket, 0) if self.debug: pprint(msg) if msg['content']['status'] != 'ok': error = self._unwrap_exception(msg['content']) + else: + self._ignored_control_replies += len(targets) - if controller: + if hub: time.sleep(0.25) self.session.send(self._query_socket, 'shutdown_request') idents,msg = self.session.recv(self._query_socket, 0) @@ -883,8 +882,8 @@ class Client(HasTraits): # Execution methods #-------------------------------------------------------------------------- - @defaultblock - def execute(self, code, targets='all', block=None): + @default_block + def _execute(self, code, targets='all', block=None): """Executes `code` on `targets` in blocking or nonblocking manner. ``execute`` is always `bound` (affects engine namespace) @@ -901,33 +900,7 @@ class Client(HasTraits): whether or not to wait until done to return default: self.block """ - result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) - if not block: - return result - - def run(self, filename, targets='all', block=None): - """Execute contents of `filename` on engine(s). - - This simply reads the contents of the file and calls `execute`. - - Parameters - ---------- - - filename : str - The path to the file - targets : int/str/list of ints/strs - the engines on which to execute - default : all - block : bool - whether or not to wait until done - default: self.block - - """ - with open(filename, 'r') as f: - # add newline in case of trailing indented whitespace - # which will cause SyntaxError - code = f.read()+'\n' - return self.execute(code, targets=targets, block=block) + return self[targets].execute(code, block=block) def _maybe_raise(self, result): """wrapper for maybe raising an exception if apply failed.""" @@ -936,287 +909,113 @@ class Client(HasTraits): return result - def _build_dependency(self, dep): - """helper for building jsonable dependencies from various input forms""" - if isinstance(dep, Dependency): - return dep.as_dict() - elif isinstance(dep, AsyncResult): - return dep.msg_ids - elif dep is None: - return [] - else: - # pass to Dependency constructor - return list(Dependency(dep)) - - @defaultblock - def apply(self, f, args=None, kwargs=None, bound=False, block=None, - targets=None, balanced=None, - after=None, follow=None, timeout=None, - track=False): - """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. + def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False, + ident=None): + """construct and send an apply message via a socket. - This is the central execution command for the client. - - Parameters - ---------- - - f : function - The fuction to be called remotely - args : tuple/list - The positional arguments passed to `f` - kwargs : dict - The keyword arguments passed to `f` - bound : bool (default: False) - Whether to pass the Engine(s) Namespace as the first argument to `f`. - block : bool (default: self.block) - Whether to wait for the result, or return immediately. - False: - returns AsyncResult - True: - returns actual result(s) of f(*args, **kwargs) - if multiple targets: - list of results, matching `targets` - track : bool - whether to track non-copying sends. - [default False] - - targets : int,list of ints, 'all', None - Specify the destination of the job. - if None: - Submit via Task queue for load-balancing. - if 'all': - Run on all active engines - if list: - Run on each specified engine - if int: - Run on single engine - Note: - that if `balanced=True`, and `targets` is specified, - then the load-balancing will be limited to balancing - among `targets`. - - balanced : bool, default None - whether to load-balance. This will default to True - if targets is unspecified, or False if targets is specified. - - If `balanced` and `targets` are both specified, the task will - be assigne to *one* of the targets by the scheduler. - - The following arguments are only used when balanced is True: - - after : Dependency or collection of msg_ids - Only for load-balanced execution (targets=None) - Specify a list of msg_ids as a time-based dependency. - This job will only be run *after* the dependencies - have been met. - - follow : Dependency or collection of msg_ids - Only for load-balanced execution (targets=None) - Specify a list of msg_ids as a location-based dependency. - This job will only be run on an engine where this dependency - is met. - - timeout : float/int or None - Only for load-balanced execution (targets=None) - Specify an amount of time (in seconds) for the scheduler to - wait for dependencies to be met before failing with a - DependencyTimeout. - - Returns - ------- - - if block is False: - return AsyncResult wrapping msg_ids - output of AsyncResult.get() is identical to that of `apply(...block=True)` - else: - if single target (or balanced): - return result of `f(*args, **kwargs)` - else: - return list of results, matching `targets` + This is the principal method with which all engine execution is performed by views. """ + assert not self._closed, "cannot use me anymore, I'm closed!" # defaults: - block = block if block is not None else self.block args = args if args is not None else [] kwargs = kwargs if kwargs is not None else {} + subheader = subheader if subheader is not None else {} - if not self._ids: - # flush notification socket if no engines yet - any_ids = self.ids - if not any_ids: - raise error.NoEnginesRegistered("Can't execute without any connected engines.") - - if balanced is None: - if targets is None: - # default to balanced if targets unspecified - balanced = True - else: - # otherwise default to multiplexing - balanced = False - - if targets is None and balanced is False: - # default to all if *not* balanced, and targets is unspecified - targets = 'all' - - # enforce types of f,args,kwrags + # validate arguments if not callable(f): raise TypeError("f must be callable, not %s"%type(f)) if not isinstance(args, (tuple, list)): raise TypeError("args must be tuple or list, not %s"%type(args)) if not isinstance(kwargs, dict): raise TypeError("kwargs must be dict, not %s"%type(kwargs)) + if not isinstance(subheader, dict): + raise TypeError("subheader must be dict, not %s"%type(subheader)) - options = dict(bound=bound, block=block, targets=targets, track=track) - - if balanced: - return self._apply_balanced(f, args, kwargs, timeout=timeout, - after=after, follow=follow, **options) - elif follow or after or timeout: - msg = "follow, after, and timeout args are only used for" - msg += " load-balanced execution." - raise ValueError(msg) - else: - return self._apply_direct(f, args, kwargs, **options) - - def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, - after=None, follow=None, timeout=None, track=None): - """call f(*args, **kwargs) remotely in a load-balanced manner. - - This is a private method, see `apply` for details. - Not to be called directly! - """ - - loc = locals() - for name in ('bound', 'block', 'track'): - assert loc[name] is not None, "kwarg %r must be specified!"%name - - if not self._task_ident: - msg = "Task farming is disabled" - if self._task_scheme == 'pure': - msg += " because the pure ZMQ scheduler cannot handle" - msg += " disappearing engines." - raise RuntimeError(msg) - - if self._task_scheme == 'pure': - # pure zmq scheme doesn't support dependencies - msg = "Pure ZMQ scheduler doesn't support dependencies" - if (follow or after): - # hard fail on DAG dependencies - raise RuntimeError(msg) - if isinstance(f, dependent): - # soft warn on functional dependencies - warnings.warn(msg, RuntimeWarning) - - # defaults: - args = args if args is not None else [] - kwargs = kwargs if kwargs is not None else {} - - if targets: - idents,_ = self._build_targets(targets) - else: - idents = [] + if not self._ids: + # flush notification socket if no engines yet + any_ids = self.ids + if not any_ids: + raise error.NoEnginesRegistered("Can't execute without any connected engines.") + # enforce types of f,args,kwargs - after = self._build_dependency(after) - follow = self._build_dependency(follow) - subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) bufs = util.pack_apply_message(f,args,kwargs) - content = dict(bound=bound) - msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident, - content=content, buffers=bufs, subheader=subheader, track=track) + msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, + subheader=subheader, track=track) + msg_id = msg['msg_id'] self.outstanding.add(msg_id) + if ident: + # possibly routed to a specific engine + if isinstance(ident, list): + ident = ident[-1] + if ident in self._engines.values(): + # save for later, in case of engine death + self._outstanding_dict[ident].add(msg_id) self.history.append(msg_id) self.metadata[msg_id]['submitted'] = datetime.now() - tracker = None if track is False else msg['tracker'] - ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker) - if block: - try: - return ar.get() - except KeyboardInterrupt: - return ar - else: - return ar - - def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None, - track=None): - """Then underlying method for applying functions to specific engines - via the MUX queue. - - This is a private method, see `apply` for details. - Not to be called directly! - """ - - if not self._mux_ident: - msg = "Multiplexing is disabled" - raise RuntimeError(msg) - loc = locals() - for name in ('bound', 'block', 'targets', 'track'): - assert loc[name] is not None, "kwarg %r must be specified!"%name - - idents,targets = self._build_targets(targets) - - subheader = {} - content = dict(bound=bound) - bufs = util.pack_apply_message(f,args,kwargs) - - msg_ids = [] - trackers = [] - for ident in idents: - msg = self.session.send(self._apply_socket, "apply_request", - content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader, - track=track) - if track: - trackers.append(msg['tracker']) - msg_id = msg['msg_id'] - self.outstanding.add(msg_id) - self._outstanding_dict[ident].add(msg_id) - self.history.append(msg_id) - msg_ids.append(msg_id) - - tracker = None if track is False else zmq.MessageTracker(*trackers) - ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker) - - if block: - try: - return ar.get() - except KeyboardInterrupt: - return ar - else: - return ar - + return msg + #-------------------------------------------------------------------------- # construct a View object #-------------------------------------------------------------------------- - @defaultblock - def remote(self, bound=False, block=None, targets=None, balanced=None): - """Decorator for making a RemoteFunction""" - return remote(self, bound=bound, targets=targets, block=block, balanced=balanced) - - @defaultblock - def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None): - """Decorator for making a ParallelFunction""" - return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced) - def _cache_view(self, targets, balanced): """save views, so subsequent requests don't create new objects.""" if balanced: + # validate whether we can run + if not self._task_socket: + msg = "Task farming is disabled" + if self._task_scheme == 'pure': + msg += " because the pure ZMQ scheduler cannot handle" + msg += " disappearing engines." + raise RuntimeError(msg) + socket = self._task_socket view_class = LoadBalancedView view_cache = self._balanced_views else: + socket = self._mux_socket view_class = DirectView view_cache = self._direct_views # use str, since often targets will be a list key = str(targets) if key not in view_cache: - view_cache[key] = view_class(client=self, targets=targets) + view_cache[key] = view_class(client=self, socket=socket, targets=targets) return view_cache[key] - def view(self, targets=None, balanced=None): + def load_balanced_view(self, targets=None): + """construct a DirectView object. + + If no arguments are specified, create a LoadBalancedView + using all engines. + + Parameters + ---------- + + targets: list,slice,int,etc. [default: use all engines] + The subset of engines across which to load-balance + """ + return self._get_view(targets, balanced=True) + + def direct_view(self, targets='all'): + """construct a DirectView object. + + If no targets are specified, create a DirectView + using all engines. + + Parameters + ---------- + + targets: list,slice,int,etc. [default: use all engines] + The engines to use for the View + """ + return self._get_view(targets, balanced=False) + + def _get_view(self, targets, balanced): """Method for constructing View objects. If no arguments are specified, create a LoadBalancedView @@ -1234,9 +1033,7 @@ class Client(HasTraits): """ - balanced = (targets is None) if balanced is None else balanced - - if targets is None: + if targets in (None,'all'): if balanced: return self._cache_view(None,True) else: @@ -1261,20 +1058,20 @@ class Client(HasTraits): raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) #-------------------------------------------------------------------------- - # Data movement + # Data movement (TO BE REMOVED) #-------------------------------------------------------------------------- - @defaultblock - def push(self, ns, targets='all', block=None, track=False): + @default_block + def _push(self, ns, targets='all', block=None, track=False): """Push the contents of `ns` into the namespace on `target`""" if not isinstance(ns, dict): raise TypeError("Must be a dict, not %s"%type(ns)) - result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track) + result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track) if not block: return result - @defaultblock - def pull(self, keys, targets='all', block=None): + @default_block + def _pull(self, keys, targets='all', block=None): """Pull objects from `target`'s namespace by `keys`""" if isinstance(keys, basestring): pass @@ -1284,64 +1081,15 @@ class Client(HasTraits): raise TypeError("keys must be str, not type %r"%type(key)) else: raise TypeError("keys must be strs, not %r"%keys) - result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) + result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False) return result - @defaultblock - def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False): - """ - Partition a Python sequence and send the partitions to a set of engines. - """ - targets = self._build_targets(targets)[-1] - mapObject = Map.dists[dist]() - nparts = len(targets) - msg_ids = [] - trackers = [] - for index, engineid in enumerate(targets): - partition = mapObject.getPartition(seq, index, nparts) - if flatten and len(partition) == 1: - r = self.push({key: partition[0]}, targets=engineid, block=False, track=track) - else: - r = self.push({key: partition}, targets=engineid, block=False, track=track) - msg_ids.extend(r.msg_ids) - if track: - trackers.append(r._tracker) - - if track: - tracker = zmq.MessageTracker(*trackers) - else: - tracker = None - - r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker) - if block: - r.wait() - else: - return r - - @defaultblock - def gather(self, key, dist='b', targets='all', block=None): - """ - Gather a partitioned sequence on a set of engines as a single local seq. - """ - - targets = self._build_targets(targets)[-1] - mapObject = Map.dists[dist]() - msg_ids = [] - for index, engineid in enumerate(targets): - msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids) - - r = AsyncMapResult(self, msg_ids, mapObject, fname='gather') - if block: - return r.get() - else: - return r - #-------------------------------------------------------------------------- # Query methods #-------------------------------------------------------------------------- - @spinfirst - @defaultblock + @spin_first + @default_block def get_result(self, indices_or_msg_ids=None, block=None): """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. @@ -1406,7 +1154,7 @@ class Client(HasTraits): return ar - @spinfirst + @spin_first def result_status(self, msg_ids, status_only=True): """Check on the status of the result(s) of the apply request with `msg_ids`. @@ -1505,7 +1253,7 @@ class Client(HasTraits): error.collect_exceptions(failures, "result_status") return content - @spinfirst + @spin_first def queue_status(self, targets='all', verbose=False): """Fetch the status of engine queues. @@ -1518,8 +1266,8 @@ class Client(HasTraits): verbose : bool Whether to return lengths only, or lists of ids for each element """ - targets = self._build_targets(targets)[1] - content = dict(targets=targets, verbose=verbose) + engine_ids = self._build_targets(targets)[1] + content = dict(targets=engine_ids, verbose=verbose) self.session.send(self._query_socket, "queue_request", content=content) idents,msg = self.session.recv(self._query_socket, 0) if self.debug: @@ -1528,11 +1276,15 @@ class Client(HasTraits): status = content.pop('status') if status != 'ok': raise self._unwrap_exception(content) - return util.rekey(content) + content = util.rekey(content) + if isinstance(targets, int): + return content[targets] + else: + return content - @spinfirst + @spin_first def purge_results(self, jobs=[], targets=[]): - """Tell the controller to forget results. + """Tell the Hub to forget results. Individual results can be purged by msg_id, or the entire history of specific targets can be purged. @@ -1540,11 +1292,11 @@ class Client(HasTraits): Parameters ---------- - jobs : str or list of strs or AsyncResult objects + jobs : str or list of str or AsyncResult objects the msg_ids whose results should be forgotten. targets : int/str/list of ints/strs The targets, by uuid or int_id, whose entire history is to be purged. - Use `targets='all'` to scrub everything from the controller's memory. + Use `targets='all'` to scrub everything from the Hub's memory. default : None """ diff --git a/IPython/zmq/parallel/dependency.py b/IPython/zmq/parallel/dependency.py index 39fc03e..9216709 100644 --- a/IPython/zmq/parallel/dependency.py +++ b/IPython/zmq/parallel/dependency.py @@ -6,11 +6,9 @@ # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- -from IPython.external.decorator import decorator - from .asyncresult import AsyncResult from .error import UnmetDependency - +from .util import interactive class depend(object): """Dependency decorator, for use with tasks. @@ -54,6 +52,8 @@ class dependent(object): self.dkwargs = dkwargs def __call__(self, *args, **kwargs): + # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'): + # self.df.func_globals = self.f.func_globals if self.df(*self.dargs, **self.dkwargs) is False: raise UnmetDependency() return self.f(*args, **kwargs) @@ -62,13 +62,18 @@ class dependent(object): def __name__(self): return self.func_name +@interactive def _require(*names): """Helper for @require decorator.""" + from IPython.zmq.parallel.error import UnmetDependency + user_ns = globals() for name in names: + if name in user_ns: + continue try: - __import__(name) + exec 'import %s'%name in user_ns except ImportError: - return False + raise UnmetDependency(name) return True def require(*names): @@ -96,54 +101,73 @@ class Dependency(set): all : bool [default True] Whether the dependency should be considered met when *all* depending tasks have completed or only when *any* have been completed. - success_only : bool [default True] - Whether to consider only successes for Dependencies, or consider failures as well. - If `all=success_only=True`, then this task will fail with an ImpossibleDependency + success : bool [default True] + Whether to consider successes as fulfilling dependencies. + failure : bool [default False] + Whether to consider failures as fulfilling dependencies. + + If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency as soon as the first depended-upon task fails. """ all=True - success_only=True + success=True + failure=True - def __init__(self, dependencies=[], all=True, success_only=True): + def __init__(self, dependencies=[], all=True, success=True, failure=False): if isinstance(dependencies, dict): # load from dict all = dependencies.get('all', True) - success_only = dependencies.get('success_only', success_only) + success = dependencies.get('success', success) + failure = dependencies.get('failure', failure) dependencies = dependencies.get('dependencies', []) ids = [] - if isinstance(dependencies, AsyncResult): - ids.extend(AsyncResult.msg_ids) - else: - for d in dependencies: - if isinstance(d, basestring): - ids.append(d) - elif isinstance(d, AsyncResult): - ids.extend(d.msg_ids) - else: - raise TypeError("invalid dependency type: %r"%type(d)) + + # extract ids from various sources: + if isinstance(dependencies, (basestring, AsyncResult)): + dependencies = [dependencies] + for d in dependencies: + if isinstance(d, basestring): + ids.append(d) + elif isinstance(d, AsyncResult): + ids.extend(d.msg_ids) + else: + raise TypeError("invalid dependency type: %r"%type(d)) + set.__init__(self, ids) self.all = all - self.success_only=success_only + if not (success or failure): + raise ValueError("Must depend on at least one of successes or failures!") + self.success=success + self.failure = failure def check(self, completed, failed=None): - if failed is not None and not self.success_only: - completed = completed.union(failed) + """check whether our dependencies have been met.""" if len(self) == 0: return True + against = set() + if self.success: + against = completed + if failed is not None and self.failure: + against = against.union(failed) if self.all: - return self.issubset(completed) + return self.issubset(against) else: - return not self.isdisjoint(completed) + return not self.isdisjoint(against) - def unreachable(self, failed): - if len(self) == 0 or len(failed) == 0 or not self.success_only: + def unreachable(self, completed, failed=None): + """return whether this dependency has become impossible.""" + if len(self) == 0: return False - # print self, self.success_only, self.all, failed + against = set() + if not self.success: + against = completed + if failed is not None and not self.failure: + against = against.union(failed) if self.all: - return not self.isdisjoint(failed) + return not self.isdisjoint(against) else: - return self.issubset(failed) + return self.issubset(against) def as_dict(self): @@ -151,9 +175,10 @@ class Dependency(set): return dict( dependencies=list(self), all=self.all, - success_only=self.success_only, + success=self.success, + failure=self.failure ) - + __all__ = ['depend', 'require', 'dependent', 'Dependency'] diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 9c45d66..27070e3 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -890,13 +890,9 @@ class Hub(LoggingFactory): def shutdown_request(self, client_id, msg): """handle shutdown request.""" - # s = self.context.socket(zmq.XREQ) - # s.connect(self.client_connections['mux']) - # time.sleep(0.1) - # for eid,ec in self.engines.iteritems(): - # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) - # time.sleep(1) self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) + # also notify other clients of shutdown + self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) dc.start() diff --git a/IPython/zmq/parallel/remotefunction.py b/IPython/zmq/parallel/remotefunction.py index 7c32bd4..66f1f8d 100644 --- a/IPython/zmq/parallel/remotefunction.py +++ b/IPython/zmq/parallel/remotefunction.py @@ -1,4 +1,4 @@ -"""Remote Functions and decorators for the client.""" +"""Remote Functions and decorators for Views.""" #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # @@ -22,33 +22,33 @@ from .asyncresult import AsyncMapResult #----------------------------------------------------------------------------- @testdec.skip_doctest -def remote(client, bound=False, block=None, targets=None, balanced=None): +def remote(view, block=None, **flags): """Turn a function into a remote function. This method can be used for map: - In [1]: @remote(client,block=True) + In [1]: @remote(view,block=True) ...: def func(a): ...: pass """ def remote_function(f): - return RemoteFunction(client, f, bound, block, targets, balanced) + return RemoteFunction(view, f, block=block, **flags) return remote_function @testdec.skip_doctest -def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): +def parallel(view, dist='b', block=None, **flags): """Turn a function into a parallel remote function. This method can be used for map: - In [1]: @parallel(client,block=True) + In [1]: @parallel(view, block=True) ...: def func(a): ...: pass """ def parallel_function(f): - return ParallelFunction(client, f, dist, bound, block, targets, balanced) + return ParallelFunction(view, f, dist=dist, block=block, **flags) return parallel_function #-------------------------------------------------------------------------- @@ -61,44 +61,32 @@ class RemoteFunction(object): Parameters ---------- - client : Client instance - The client to be used to connect to engines + view : View instance + The view to be used for execution f : callable The function to be wrapped into a remote function - bound : bool [default: False] - Whether the affect the remote namespace when called block : bool [default: None] Whether to wait for results or not. The default behavior is - to use the current `block` attribute of `client` - targets : valid target list [default: all] - The targets on which to execute. - balanced : bool - Whether to load-balance with the Task scheduler or not + to use the current `block` attribute of `view` + + **flags : remaining kwargs are passed to View.temp_flags """ - client = None # the remote connection + view = None # the remote connection func = None # the wrapped function block = None # whether to block - bound = None # whether to affect the namespace - targets = None # where to execute - balanced = None # whether to load-balance + flags = None # dict of extra kwargs for temp_flags - def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): - self.client = client + def __init__(self, view, f, block=None, **flags): + self.view = view self.func = f self.block=block - self.bound=bound - self.targets=targets - if balanced is None: - if targets is None: - balanced = True - else: - balanced = False - self.balanced = balanced + self.flags=flags def __call__(self, *args, **kwargs): - return self.client.apply(self.func, args=args, kwargs=kwargs, - block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) + block = self.view.block if self.block is None else self.block + with self.view.temp_flags(block=block, **self.flags): + return self.view.apply(self.func, *args, **kwargs) class ParallelFunction(RemoteFunction): @@ -111,51 +99,57 @@ class ParallelFunction(RemoteFunction): Parameters ---------- - client : Client instance - The client to be used to connect to engines + view : View instance + The view to be used for execution f : callable The function to be wrapped into a remote function - bound : bool [default: False] - Whether the affect the remote namespace when called + dist : str [default: 'b'] + The key for which mapObject to use to distribute sequences + options are: + * 'b' : use contiguous chunks in order + * 'r' : use round-robin striping block : bool [default: None] Whether to wait for results or not. The default behavior is - to use the current `block` attribute of `client` - targets : valid target list [default: all] - The targets on which to execute. - balanced : bool - Whether to load-balance with the Task scheduler or not - chunk_size : int or None + to use the current `block` attribute of `view` + chunksize : int or None The size of chunk to use when breaking up sequences in a load-balanced manner + **flags : remaining kwargs are passed to View.temp_flags """ - def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): - super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) - self.chunk_size = chunk_size + + chunksize=None + mapObject=None + + def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): + super(ParallelFunction, self).__init__(view, f, block=block, **flags) + self.chunksize = chunksize mapClass = Map.dists[dist] self.mapObject = mapClass() def __call__(self, *sequences): + # check that the length of sequences match len_0 = len(sequences[0]) for s in sequences: if len(s)!=len_0: msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) raise ValueError(msg) - - if self.balanced: - if self.chunk_size: - nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0) + balanced = 'Balanced' in self.view.__class__.__name__ + if balanced: + if self.chunksize: + nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) else: nparts = len_0 - targets = [self.targets]*nparts + targets = [None]*nparts else: - if self.chunk_size: - warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning) + if self.chunksize: + warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) # multiplexed: - targets = self.client._build_targets(self.targets)[-1] + targets = self.view.targets nparts = len(targets) msg_ids = [] # my_f = lambda *a: map(self.func, *a) + client = self.view.client for index, t in enumerate(targets): args = [] for seq in sequences: @@ -173,12 +167,15 @@ class ParallelFunction(RemoteFunction): args = [self.func]+args else: f=self.func - ar = self.client.apply(f, args=args, block=False, bound=self.bound, - targets=t, balanced=self.balanced) + + view = self.view if balanced else client[t] + with view.temp_flags(block=False, **self.flags): + ar = view.apply(f, *args) msg_ids.append(ar.msg_ids[0]) - r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) + r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__) + if self.block: try: return r.get() diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 675e442..6f61794 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -238,7 +238,7 @@ class TaskScheduler(SessionFactory): msg = self.session.unpack_message(msg, copy=False, content=False) parent = msg['header'] idents = [idents[0],engine]+idents[1:] - print (idents) + # print (idents) try: raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) except: @@ -277,8 +277,9 @@ class TaskScheduler(SessionFactory): # time dependencies after = Dependency(header.get('after', [])) if after.all: - after.difference_update(self.all_completed) - if not after.success_only: + if after.success: + after.difference_update(self.all_completed) + if after.failure: after.difference_update(self.all_failed) if after.check(self.all_completed, self.all_failed): # recast as empty set, if `after` already met, @@ -302,7 +303,7 @@ class TaskScheduler(SessionFactory): self.depending[msg_id] = args return self.fail_unreachable(msg_id, error.InvalidDependency) # check if unreachable: - if dep.unreachable(self.all_failed): + if dep.unreachable(self.all_completed, self.all_failed): self.depending[msg_id] = args return self.fail_unreachable(msg_id) @@ -379,7 +380,11 @@ class TaskScheduler(SessionFactory): if follow.all: # check follow for impossibility dests = set() - relevant = self.all_completed if follow.success_only else self.all_done + relevant = set() + if follow.success: + relevant = self.all_completed + if follow.failure: + relevant = relevant.union(self.all_failed) for m in follow.intersection(relevant): dests.add(self.destinations[m]) if len(dests) > 1: @@ -514,11 +519,8 @@ class TaskScheduler(SessionFactory): for msg_id in jobs: raw_msg, targets, after, follow, timeout = self.depending[msg_id] - # if dep_id in after: - # if after.all and (success or not after.success_only): - # after.remove(dep_id) - if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): + if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed): self.fail_unreachable(msg_id) elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 76bc6ba..155009a 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -170,7 +170,7 @@ class Kernel(SessionFactory): content = dict(status='ok') reply_msg = self.session.send(stream, 'abort_reply', content=content, - parent=parent, ident=ident)[0] + parent=parent, ident=ident) self.log.debug(str(reply_msg)) def shutdown_request(self, stream, ident, parent): @@ -184,10 +184,7 @@ class Kernel(SessionFactory): content['status'] = 'ok' msg = self.session.send(stream, 'shutdown_reply', content=content, parent=parent, ident=ident) - # msg = self.session.send(self.pub_socket, 'shutdown_reply', - # content, parent, ident) - # print >> sys.__stdout__, msg - # time.sleep(0.2) + self.log.debug(str(msg)) dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) dc.start() @@ -295,7 +292,7 @@ class Kernel(SessionFactory): content = parent[u'content'] bufs = parent[u'buffers'] msg_id = parent['header']['msg_id'] - bound = content.get('bound', False) + # bound = parent['header'].get('bound', False) except: self.log.error("Got bad msg: %s"%parent, exc_info=True) return @@ -314,16 +311,12 @@ class Kernel(SessionFactory): working = self.user_ns # suffix = prefix = "_"+str(msg_id).replace("-","")+"_" - # if bound: - # - # else: - # working = dict() - # suffix = prefix = "_" # prevent keyword collisions with lambda + f,args,kwargs = unpack_apply_message(bufs, working, copy=False) - if bound: - bound_ns = Namespace(working) - args = [bound_ns]+list(args) - # if f.fun + # if bound: + # bound_ns = Namespace(working) + # args = [bound_ns]+list(args) + fname = getattr(f, '__name__', 'f') fname = prefix+"f" @@ -341,8 +334,8 @@ class Kernel(SessionFactory): finally: for key in ns.iterkeys(): working.pop(key) - if bound: - working.update(bound_ns) + # if bound: + # working.update(bound_ns) packed_result,buf = serialize_object(result) result_buf = [packed_result]+buf @@ -364,9 +357,12 @@ class Kernel(SessionFactory): reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf, subheader=sub) - - # if reply_msg['content']['status'] == u'error': - # self.abort_queues() + + # flush i/o + # should this be before reply_msg is sent, like in the single-kernel code, + # or should nothing get in the way of real results? + sys.stdout.flush() + sys.stderr.flush() def dispatch_queue(self, stream, msg): self.control_stream.flush() diff --git a/IPython/zmq/parallel/tests/__init__.py b/IPython/zmq/parallel/tests/__init__.py index 9126ca0..680736e 100644 --- a/IPython/zmq/parallel/tests/__init__.py +++ b/IPython/zmq/parallel/tests/__init__.py @@ -1,5 +1,16 @@ """toplevel setup/teardown for parallel tests.""" +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + import tempfile import time from subprocess import Popen, PIPE, STDOUT @@ -15,17 +26,27 @@ def setup(): cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT) processes.append(cp) time.sleep(.5) - add_engine() + add_engines(1) c = client.Client(profile='iptest') while not c.ids: time.sleep(.1) c.spin() + c.close() -def add_engine(profile='iptest'): - ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) - # ep.start() - processes.append(ep) - return ep +def add_engines(n=1, profile='iptest'): + rc = client.Client(profile=profile) + base = len(rc) + eps = [] + for i in range(n): + ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) + # ep.start() + processes.append(ep) + eps.append(ep) + while len(rc) < base+n: + time.sleep(.1) + rc.spin() + rc.close() + return eps def teardown(): time.sleep(1) diff --git a/IPython/zmq/parallel/tests/clienttest.py b/IPython/zmq/parallel/tests/clienttest.py index eadf532..bd8a87f 100644 --- a/IPython/zmq/parallel/tests/clienttest.py +++ b/IPython/zmq/parallel/tests/clienttest.py @@ -1,3 +1,12 @@ +"""base class for parallel client tests""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + import sys import tempfile import time @@ -6,6 +15,7 @@ from multiprocessing import Process from nose import SkipTest +import zmq from zmq.tests import BaseZMQTestCase from IPython.external.decorator import decorator @@ -14,7 +24,7 @@ from IPython.zmq.parallel import error from IPython.zmq.parallel.client import Client from IPython.zmq.parallel.ipcluster import launch_process from IPython.zmq.parallel.entry_point import select_random_ports -from IPython.zmq.parallel.tests import processes,add_engine +from IPython.zmq.parallel.tests import processes,add_engines # simple tasks for use in apply tests @@ -47,13 +57,11 @@ def skip_without(*names): return f(*args, **kwargs) return skip_without_names - class ClusterTestCase(BaseZMQTestCase): def add_engines(self, n=1, block=True): """add multiple engines to our cluster""" - for i in range(n): - self.engines.append(add_engine()) + self.engines.extend(add_engines(n)) if block: self.wait_on_engines() @@ -68,10 +76,11 @@ class ClusterTestCase(BaseZMQTestCase): def connect_client(self): """connect a client with my Context, and track its sockets for cleanup""" - c = Client(profile='iptest',context=self.context) - - # for name in filter(lambda n:n.endswith('socket'), dir(c)): - # self.sockets.append(getattr(c, name)) + c = Client(profile='iptest', context=self.context) + for name in filter(lambda n:n.endswith('socket'), dir(c)): + s = getattr(c, name) + s.setsockopt(zmq.LINGER, 0) + self.sockets.append(s) return c def assertRaisesRemote(self, etype, f, *args, **kwargs): @@ -92,15 +101,19 @@ class ClusterTestCase(BaseZMQTestCase): self.engines=[] def tearDown(self): - + # self.client.clear(block=True) # close fds: for e in filter(lambda e: e.poll() is not None, processes): processes.remove(e) + # allow flushing of incoming messages to prevent crash on socket close + self.client.wait(timeout=2) + # time.sleep(2) + self.client.spin() self.client.close() BaseZMQTestCase.tearDown(self) - # this will be superfluous when pyzmq merges PR #88 - self.context.term() + # this will be redundant when pyzmq merges PR #88 + # self.context.term() # print tempfile.TemporaryFile().fileno(), # sys.stdout.flush() \ No newline at end of file diff --git a/IPython/zmq/parallel/tests/test_asyncresult.py b/IPython/zmq/parallel/tests/test_asyncresult.py new file mode 100644 index 0000000..dfe9230 --- /dev/null +++ b/IPython/zmq/parallel/tests/test_asyncresult.py @@ -0,0 +1,69 @@ +"""Tests for asyncresult.py""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + + +from IPython.zmq.parallel.error import TimeoutError + +from IPython.zmq.parallel.tests import add_engines +from .clienttest import ClusterTestCase + +def setup(): + add_engines(2) + +def wait(n): + import time + time.sleep(n) + return n + +class AsyncResultTest(ClusterTestCase): + + def test_single_result(self): + eid = self.client.ids[-1] + ar = self.client[eid].apply_async(lambda : 42) + self.assertEquals(ar.get(), 42) + ar = self.client[[eid]].apply_async(lambda : 42) + self.assertEquals(ar.get(), [42]) + ar = self.client[-1:].apply_async(lambda : 42) + self.assertEquals(ar.get(), [42]) + + def test_get_after_done(self): + ar = self.client[-1].apply_async(lambda : 42) + self.assertFalse(ar.ready()) + ar.wait() + self.assertTrue(ar.ready()) + self.assertEquals(ar.get(), 42) + self.assertEquals(ar.get(), 42) + + def test_get_before_done(self): + ar = self.client[-1].apply_async(wait, 0.1) + self.assertRaises(TimeoutError, ar.get, 0) + ar.wait(0) + self.assertFalse(ar.ready()) + self.assertEquals(ar.get(), 0.1) + + def test_get_after_error(self): + ar = self.client[-1].apply_async(lambda : 1/0) + ar.wait() + self.assertRaisesRemote(ZeroDivisionError, ar.get) + self.assertRaisesRemote(ZeroDivisionError, ar.get) + self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) + + def test_get_dict(self): + n = len(self.client) + ar = self.client[:].apply_async(lambda : 5) + self.assertEquals(ar.get(), [5]*n) + d = ar.get_dict() + self.assertEquals(sorted(d.keys()), sorted(self.client.ids)) + for eid,r in d.iteritems(): + self.assertEquals(r, 5) + diff --git a/IPython/zmq/parallel/tests/test_client.py b/IPython/zmq/parallel/tests/test_client.py index 97d2118..47bebca 100644 --- a/IPython/zmq/parallel/tests/test_client.py +++ b/IPython/zmq/parallel/tests/test_client.py @@ -1,3 +1,16 @@ +"""Tests for parallel client.py""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + import time from tempfile import mktemp @@ -8,7 +21,10 @@ from IPython.zmq.parallel import error from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult from IPython.zmq.parallel.view import LoadBalancedView, DirectView -from clienttest import ClusterTestCase, segfault, wait +from clienttest import ClusterTestCase, segfault, wait, add_engines + +def setup(): + add_engines(4) class TestClient(ClusterTestCase): @@ -17,27 +33,6 @@ class TestClient(ClusterTestCase): self.add_engines(3) self.assertEquals(len(self.client.ids), n+3) - def test_segfault_task(self): - """test graceful handling of engine death (balanced)""" - self.add_engines(1) - ar = self.client.apply(segfault, block=False) - self.assertRaisesRemote(error.EngineError, ar.get) - eid = ar.engine_id - while eid in self.client.ids: - time.sleep(.01) - self.client.spin() - - def test_segfault_mux(self): - """test graceful handling of engine death (direct)""" - self.add_engines(1) - eid = self.client.ids[-1] - ar = self.client[eid].apply_async(segfault) - self.assertRaisesRemote(error.EngineError, ar.get) - eid = ar.engine_id - while eid in self.client.ids: - time.sleep(.01) - self.client.spin() - def test_view_indexing(self): """test index access for views""" self.add_engines(2) @@ -71,8 +66,8 @@ class TestClient(ClusterTestCase): v = self.client[:2] v2 =self.client[:2] self.assertTrue(v is v2) - v = self.client.view() - v2 = self.client.view(balanced=True) + v = self.client.load_balanced_view() + v2 = self.client.load_balanced_view(targets=None) self.assertTrue(v is v2) def test_targets(self): @@ -84,102 +79,26 @@ class TestClient(ClusterTestCase): def test_clear(self): """test clear behavior""" - self.add_engines(2) - self.client.block=True - self.client.push(dict(a=5)) - self.client.pull('a') + # self.add_engines(2) + v = self.client[:] + v.block=True + v.push(dict(a=5)) + v.pull('a') id0 = self.client.ids[-1] self.client.clear(targets=id0) - self.client.pull('a', targets=self.client.ids[:-1]) - self.assertRaisesRemote(NameError, self.client.pull, 'a') - self.client.clear() + self.client[:-1].pull('a') + self.assertRaisesRemote(NameError, self.client[id0].get, 'a') + self.client.clear(block=True) for i in self.client.ids: - self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i) - - - def test_push_pull(self): - """test pushing and pulling""" - data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) - t = self.client.ids[-1] - self.add_engines(2) - push = self.client.push - pull = self.client.pull - self.client.block=True - nengines = len(self.client) - push({'data':data}, targets=t) - d = pull('data', targets=t) - self.assertEquals(d, data) - push({'data':data}) - d = pull('data') - self.assertEquals(d, nengines*[data]) - ar = push({'data':data}, block=False) - self.assertTrue(isinstance(ar, AsyncResult)) - r = ar.get() - ar = pull('data', block=False) - self.assertTrue(isinstance(ar, AsyncResult)) - r = ar.get() - self.assertEquals(r, nengines*[data]) - push(dict(a=10,b=20)) - r = pull(('a','b')) - self.assertEquals(r, nengines*[[10,20]]) - - def test_push_pull_function(self): - "test pushing and pulling functions" - def testf(x): - return 2.0*x - - self.add_engines(4) - t = self.client.ids[-1] - self.client.block=True - push = self.client.push - pull = self.client.pull - execute = self.client.execute - push({'testf':testf}, targets=t) - r = pull('testf', targets=t) - self.assertEqual(r(1.0), testf(1.0)) - execute('r = testf(10)', targets=t) - r = pull('r', targets=t) - self.assertEquals(r, testf(10)) - ar = push({'testf':testf}, block=False) - ar.get() - ar = pull('testf', block=False) - rlist = ar.get() - for r in rlist: - self.assertEqual(r(1.0), testf(1.0)) - execute("def g(x): return x*x", targets=t) - r = pull(('testf','g'),targets=t) - self.assertEquals((r[0](10),r[1](10)), (testf(10), 100)) - - def test_push_function_globals(self): - """test that pushed functions have access to globals""" - def geta(): - return a - self.add_engines(1) - v = self.client[-1] - v.block=True - v['f'] = geta - self.assertRaisesRemote(NameError, v.execute, 'b=f()') - v.execute('a=5') - v.execute('b=f()') - self.assertEquals(v['b'], 5) + # print i + self.assertRaisesRemote(NameError, self.client[i].get, 'a') - def test_push_function_defaults(self): - """test that pushed functions preserve default args""" - def echo(a=10): - return a - self.add_engines(1) - v = self.client[-1] - v.block=True - v['f'] = echo - v.execute('b=f()') - self.assertEquals(v['b'], 10) - def test_get_result(self): """test getting results from the Hub.""" c = clientmod.Client(profile='iptest') - self.add_engines(1) + # self.add_engines(1) t = c.ids[-1] - ar = c.apply(wait, (1,), block=False, targets=t) + ar = c[t].apply_async(wait, 1) # give the monitor time to notice the message time.sleep(.25) ahr = self.client.get_result(ar.msg_ids) @@ -187,76 +106,42 @@ class TestClient(ClusterTestCase): self.assertEquals(ahr.get(), ar.get()) ar2 = self.client.get_result(ar.msg_ids) self.assertFalse(isinstance(ar2, AsyncHubResult)) + c.close() def test_ids_list(self): """test client.ids""" - self.add_engines(2) + # self.add_engines(2) ids = self.client.ids self.assertEquals(ids, self.client._ids) self.assertFalse(ids is self.client._ids) ids.remove(ids[-1]) self.assertNotEquals(ids, self.client._ids) - def test_run_newline(self): - """test that run appends newline to files""" - tmpfile = mktemp() - with open(tmpfile, 'w') as f: - f.write("""def g(): - return 5 - """) - v = self.client[-1] - v.run(tmpfile, block=True) - self.assertEquals(v.apply_sync(lambda : g()), 5) + def test_queue_status(self): + # self.addEngine(4) + ids = self.client.ids + id0 = ids[0] + qs = self.client.queue_status(targets=id0) + self.assertTrue(isinstance(qs, dict)) + self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) + allqs = self.client.queue_status() + self.assertTrue(isinstance(allqs, dict)) + self.assertEquals(sorted(allqs.keys()), self.client.ids) + for eid,qs in allqs.items(): + self.assertTrue(isinstance(qs, dict)) + self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) - def test_apply_tracked(self): - """test tracking for apply""" - # self.add_engines(1) - t = self.client.ids[-1] - self.client.block=False - def echo(n=1024*1024, **kwargs): - return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs) - ar = echo(1) - self.assertTrue(ar._tracker is None) - self.assertTrue(ar.sent) - ar = echo(track=True) - self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) - self.assertEquals(ar.sent, ar._tracker.done) - ar._tracker.wait() - self.assertTrue(ar.sent) - - def test_push_tracked(self): - t = self.client.ids[-1] - ns = dict(x='x'*1024*1024) - ar = self.client.push(ns, targets=t, block=False) - self.assertTrue(ar._tracker is None) - self.assertTrue(ar.sent) - - ar = self.client.push(ns, targets=t, block=False, track=True) - self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) - self.assertEquals(ar.sent, ar._tracker.done) - ar._tracker.wait() - self.assertTrue(ar.sent) - ar.get() + def test_shutdown(self): + # self.addEngine(4) + ids = self.client.ids + id0 = ids[0] + self.client.shutdown(id0, block=True) + while id0 in self.client.ids: + time.sleep(0.1) + self.client.spin() - def test_scatter_tracked(self): - t = self.client.ids - x='x'*1024*1024 - ar = self.client.scatter('x', x, targets=t, block=False) - self.assertTrue(ar._tracker is None) - self.assertTrue(ar.sent) + self.assertRaises(IndexError, lambda : self.client[id0]) - ar = self.client.scatter('x', x, targets=t, block=False, track=True) - self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) - self.assertEquals(ar.sent, ar._tracker.done) - ar._tracker.wait() - self.assertTrue(ar.sent) - ar.get() - - def test_remote_reference(self): - v = self.client[-1] - v['a'] = 123 - ra = clientmod.Reference('a') - b = v.apply_sync(lambda x: x, ra) - self.assertEquals(b, 123) - - + def test_result_status(self): + pass + # to be written diff --git a/IPython/zmq/parallel/tests/test_controller.py b/IPython/zmq/parallel/tests/test_controller.py deleted file mode 100644 index e69de29..0000000 --- a/IPython/zmq/parallel/tests/test_controller.py +++ /dev/null diff --git a/IPython/zmq/parallel/tests/test_dependency.py b/IPython/zmq/parallel/tests/test_dependency.py new file mode 100644 index 0000000..e67d37e --- /dev/null +++ b/IPython/zmq/parallel/tests/test_dependency.py @@ -0,0 +1,101 @@ +"""Tests for dependency.py""" + +__docformat__ = "restructuredtext en" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +# import +import os + +from IPython.utils.pickleutil import can, uncan + +from IPython.zmq.parallel import dependency as dmod +from IPython.zmq.parallel.util import interactive + +from IPython.zmq.parallel.tests import add_engines +from .clienttest import ClusterTestCase + +def setup(): + add_engines(1) + +@dmod.require('time') +def wait(n): + time.sleep(n) + return n + +mixed = map(str, range(10)) +completed = map(str, range(0,10,2)) +failed = map(str, range(1,10,2)) + +class DependencyTest(ClusterTestCase): + + def setUp(self): + ClusterTestCase.setUp(self) + self.user_ns = {'__builtins__' : __builtins__} + self.view = self.client.load_balanced_view() + self.dview = self.client[-1] + self.succeeded = set(map(str, range(0,25,2))) + self.failed = set(map(str, range(1,25,2))) + + def assertMet(self, dep): + self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met") + + def assertUnmet(self, dep): + self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met") + + def assertUnreachable(self, dep): + self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable") + + def assertReachable(self, dep): + self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable") + + def cancan(self, f): + """decorator to pass through canning into self.user_ns""" + return uncan(can(f), self.user_ns) + + def test_require_imports(self): + """test that @require imports names""" + @self.cancan + @dmod.require('urllib') + @interactive + def encode(dikt): + return urllib.urlencode(dikt) + # must pass through canning to properly connect namespaces + self.assertEquals(encode(dict(a=5)), 'a=5') + + def test_success_only(self): + dep = dmod.Dependency(mixed, success=True, failure=False) + self.assertUnmet(dep) + self.assertUnreachable(dep) + dep.all=False + self.assertMet(dep) + self.assertReachable(dep) + dep = dmod.Dependency(completed, success=True, failure=False) + self.assertMet(dep) + self.assertReachable(dep) + dep.all=False + self.assertMet(dep) + self.assertReachable(dep) + + def test_failure_only(self): + dep = dmod.Dependency(mixed, success=False, failure=True) + self.assertUnmet(dep) + self.assertUnreachable(dep) + dep.all=False + self.assertMet(dep) + self.assertReachable(dep) + dep = dmod.Dependency(completed, success=False, failure=True) + self.assertUnmet(dep) + self.assertUnreachable(dep) + dep.all=False + self.assertUnmet(dep) + self.assertUnreachable(dep) diff --git a/IPython/zmq/parallel/tests/test_newserialized.py b/IPython/zmq/parallel/tests/test_newserialized.py index 3b89168..8fd0bb7 100644 --- a/IPython/zmq/parallel/tests/test_newserialized.py +++ b/IPython/zmq/parallel/tests/test_newserialized.py @@ -1,5 +1,16 @@ """test serialization with newserialized""" +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + from unittest import TestCase from IPython.testing.parametric import parametric @@ -82,6 +93,16 @@ class CanningTestCase(TestCase): # test non-copying: a[2] = 1e9 self.assertTrue((a==final).all()) - + + def test_uncan_function_globals(self): + """test that uncanning a module function restores it into its module""" + from re import search + cf = can(search) + csearch = uncan(cf) + self.assertEqual(csearch.__module__, search.__module__) + self.assertNotEqual(csearch('asd', 'asdf'), None) + csearch = uncan(cf, dict(a=5)) + self.assertEqual(csearch.__module__, search.__module__) + self.assertNotEqual(csearch('asd', 'asdf'), None) \ No newline at end of file diff --git a/IPython/zmq/parallel/tests/test_streamsession.py b/IPython/zmq/parallel/tests/test_streamsession.py index 7a2b896..562d8bc 100644 --- a/IPython/zmq/parallel/tests/test_streamsession.py +++ b/IPython/zmq/parallel/tests/test_streamsession.py @@ -1,3 +1,15 @@ +"""test building messages with streamsession""" + +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- import os import uuid diff --git a/IPython/zmq/parallel/tests/test_view.py b/IPython/zmq/parallel/tests/test_view.py new file mode 100644 index 0000000..4c1679f --- /dev/null +++ b/IPython/zmq/parallel/tests/test_view.py @@ -0,0 +1,287 @@ +"""test View objects""" +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +import time +from tempfile import mktemp + +import zmq + +from IPython.zmq.parallel import client as clientmod +from IPython.zmq.parallel import error +from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult, AsyncMapResult +from IPython.zmq.parallel.view import LoadBalancedView, DirectView +from IPython.zmq.parallel.util import interactive + +from IPython.zmq.parallel.tests import add_engines + +from .clienttest import ClusterTestCase, segfault, wait, skip_without + +def setup(): + add_engines(3) + +class TestView(ClusterTestCase): + + def test_segfault_task(self): + """test graceful handling of engine death (balanced)""" + # self.add_engines(1) + ar = self.client[-1].apply_async(segfault) + self.assertRaisesRemote(error.EngineError, ar.get) + eid = ar.engine_id + while eid in self.client.ids: + time.sleep(.01) + self.client.spin() + + def test_segfault_mux(self): + """test graceful handling of engine death (direct)""" + # self.add_engines(1) + eid = self.client.ids[-1] + ar = self.client[eid].apply_async(segfault) + self.assertRaisesRemote(error.EngineError, ar.get) + eid = ar.engine_id + while eid in self.client.ids: + time.sleep(.01) + self.client.spin() + + def test_push_pull(self): + """test pushing and pulling""" + data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) + t = self.client.ids[-1] + v = self.client[t] + push = v.push + pull = v.pull + v.block=True + nengines = len(self.client) + push({'data':data}) + d = pull('data') + self.assertEquals(d, data) + self.client[:].push({'data':data}) + d = self.client[:].pull('data', block=True) + self.assertEquals(d, nengines*[data]) + ar = push({'data':data}, block=False) + self.assertTrue(isinstance(ar, AsyncResult)) + r = ar.get() + ar = self.client[:].pull('data', block=False) + self.assertTrue(isinstance(ar, AsyncResult)) + r = ar.get() + self.assertEquals(r, nengines*[data]) + self.client[:].push(dict(a=10,b=20)) + r = self.client[:].pull(('a','b')) + self.assertEquals(r, nengines*[[10,20]]) + + def test_push_pull_function(self): + "test pushing and pulling functions" + def testf(x): + return 2.0*x + + t = self.client.ids[-1] + self.client[t].block=True + push = self.client[t].push + pull = self.client[t].pull + execute = self.client[t].execute + push({'testf':testf}) + r = pull('testf') + self.assertEqual(r(1.0), testf(1.0)) + execute('r = testf(10)') + r = pull('r') + self.assertEquals(r, testf(10)) + ar = self.client[:].push({'testf':testf}, block=False) + ar.get() + ar = self.client[:].pull('testf', block=False) + rlist = ar.get() + for r in rlist: + self.assertEqual(r(1.0), testf(1.0)) + execute("def g(x): return x*x") + r = pull(('testf','g')) + self.assertEquals((r[0](10),r[1](10)), (testf(10), 100)) + + def test_push_function_globals(self): + """test that pushed functions have access to globals""" + @interactive + def geta(): + return a + # self.add_engines(1) + v = self.client[-1] + v.block=True + v['f'] = geta + self.assertRaisesRemote(NameError, v.execute, 'b=f()') + v.execute('a=5') + v.execute('b=f()') + self.assertEquals(v['b'], 5) + + def test_push_function_defaults(self): + """test that pushed functions preserve default args""" + def echo(a=10): + return a + v = self.client[-1] + v.block=True + v['f'] = echo + v.execute('b=f()') + self.assertEquals(v['b'], 10) + + def test_get_result(self): + """test getting results from the Hub.""" + c = clientmod.Client(profile='iptest') + # self.add_engines(1) + t = c.ids[-1] + v = c[t] + v2 = self.client[t] + ar = v.apply_async(wait, 1) + # give the monitor time to notice the message + time.sleep(.25) + ahr = v2.get_result(ar.msg_ids) + self.assertTrue(isinstance(ahr, AsyncHubResult)) + self.assertEquals(ahr.get(), ar.get()) + ar2 = v2.get_result(ar.msg_ids) + self.assertFalse(isinstance(ar2, AsyncHubResult)) + c.spin() + c.close() + + def test_run_newline(self): + """test that run appends newline to files""" + tmpfile = mktemp() + with open(tmpfile, 'w') as f: + f.write("""def g(): + return 5 + """) + v = self.client[-1] + v.run(tmpfile, block=True) + self.assertEquals(v.apply_sync(lambda f: f(), clientmod.Reference('g')), 5) + + def test_apply_tracked(self): + """test tracking for apply""" + # self.add_engines(1) + t = self.client.ids[-1] + v = self.client[t] + v.block=False + def echo(n=1024*1024, **kwargs): + with v.temp_flags(**kwargs): + return v.apply(lambda x: x, 'x'*n) + ar = echo(1, track=False) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertTrue(ar.sent) + ar = echo(track=True) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertEquals(ar.sent, ar._tracker.done) + ar._tracker.wait() + self.assertTrue(ar.sent) + + def test_push_tracked(self): + t = self.client.ids[-1] + ns = dict(x='x'*1024*1024) + v = self.client[t] + ar = v.push(ns, block=False, track=False) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertTrue(ar.sent) + + ar = v.push(ns, block=False, track=True) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertEquals(ar.sent, ar._tracker.done) + ar._tracker.wait() + self.assertTrue(ar.sent) + ar.get() + + def test_scatter_tracked(self): + t = self.client.ids + x='x'*1024*1024 + ar = self.client[t].scatter('x', x, block=False, track=False) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertTrue(ar.sent) + + ar = self.client[t].scatter('x', x, block=False, track=True) + self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) + self.assertEquals(ar.sent, ar._tracker.done) + ar._tracker.wait() + self.assertTrue(ar.sent) + ar.get() + + def test_remote_reference(self): + v = self.client[-1] + v['a'] = 123 + ra = clientmod.Reference('a') + b = v.apply_sync(lambda x: x, ra) + self.assertEquals(b, 123) + + + def test_scatter_gather(self): + view = self.client[:] + seq1 = range(16) + view.scatter('a', seq1) + seq2 = view.gather('a', block=True) + self.assertEquals(seq2, seq1) + self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True) + + @skip_without('numpy') + def test_scatter_gather_numpy(self): + import numpy + from numpy.testing.utils import assert_array_equal, assert_array_almost_equal + view = self.client[:] + a = numpy.arange(64) + view.scatter('a', a) + b = view.gather('a', block=True) + assert_array_equal(b, a) + + def test_map(self): + view = self.client[:] + def f(x): + return x**2 + data = range(16) + r = view.map_sync(f, data) + self.assertEquals(r, map(f, data)) + + def test_scatterGatherNonblocking(self): + data = range(16) + view = self.client[:] + view.scatter('a', data, block=False) + ar = view.gather('a', block=False) + self.assertEquals(ar.get(), data) + + @skip_without('numpy') + def test_scatter_gather_numpy_nonblocking(self): + import numpy + from numpy.testing.utils import assert_array_equal, assert_array_almost_equal + a = numpy.arange(64) + view = self.client[:] + ar = view.scatter('a', a, block=False) + self.assertTrue(isinstance(ar, AsyncResult)) + amr = view.gather('a', block=False) + self.assertTrue(isinstance(amr, AsyncMapResult)) + assert_array_equal(amr.get(), a) + + def test_execute(self): + view = self.client[:] + # self.client.debug=True + execute = view.execute + ar = execute('c=30', block=False) + self.assertTrue(isinstance(ar, AsyncResult)) + ar = execute('d=[0,1,2]', block=False) + self.client.wait(ar, 1) + self.assertEquals(len(ar.get()), len(self.client)) + for c in view['c']: + self.assertEquals(c, 30) + + def test_abort(self): + view = self.client[-1] + ar = view.execute('import time; time.sleep(0.25)', block=False) + ar2 = view.apply_async(lambda : 2) + ar3 = view.apply_async(lambda : 3) + view.abort(ar2) + view.abort(ar3.msg_ids) + self.assertRaises(error.TaskAborted, ar2.get) + self.assertRaises(error.TaskAborted, ar3.get) + + def test_temp_flags(self): + view = self.client[-1] + view.block=True + with view.temp_flags(block=False): + self.assertFalse(view.block) + self.assertTrue(view.block) + diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py index f15aed9..03f1251 100644 --- a/IPython/zmq/parallel/util.py +++ b/IPython/zmq/parallel/util.py @@ -316,3 +316,39 @@ def unpack_apply_message(bufs, g=None, copy=True): return f,args,kwargs +#-------------------------------------------------------------------------- +# helpers for implementing old MEC API via view.apply +#-------------------------------------------------------------------------- + +def interactive(f): + """decorator for making functions appear as interactively defined. + This results in the function being linked to the user_ns as globals() + instead of the module globals(). + """ + f.__module__ = '__main__' + return f + +@interactive +def _push(ns): + """helper method for implementing `client.push` via `client.apply`""" + globals().update(ns) + +@interactive +def _pull(keys): + """helper method for implementing `client.pull` via `client.apply`""" + user_ns = globals() + if isinstance(keys, (list,tuple, set)): + for key in keys: + if not user_ns.has_key(key): + raise NameError("name '%s' is not defined"%key) + return map(user_ns.get, keys) + else: + if not user_ns.has_key(keys): + raise NameError("name '%s' is not defined"%keys) + return user_ns.get(keys) + +@interactive +def _execute(code): + """helper method for implementing `client.execute` via `client.apply`""" + exec code in globals() + diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 72ac6de..74baa22 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -10,13 +10,20 @@ # Imports #----------------------------------------------------------------------------- +import warnings +from contextlib import contextmanager + +import zmq + from IPython.testing import decorators as testdec from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance from IPython.external.decorator import decorator -from .asyncresult import AsyncResult -from .dependency import Dependency +from . import map as Map +from . import util +from .asyncresult import AsyncResult, AsyncMapResult +from .dependency import Dependency, dependent from .remotefunction import ParallelFunction, parallel, remote #----------------------------------------------------------------------------- @@ -24,25 +31,16 @@ from .remotefunction import ParallelFunction, parallel, remote #----------------------------------------------------------------------------- @decorator -def myblock(f, self, *args, **kwargs): - """override client.block with self.block during a call""" - block = self.client.block - self.client.block = self.block - try: - ret = f(self, *args, **kwargs) - finally: - self.client.block = block - return ret - -@decorator def save_ids(f, self, *args, **kwargs): """Keep our history and outstanding attributes up to date after a method call.""" n_previous = len(self.client.history) - ret = f(self, *args, **kwargs) - nmsgs = len(self.client.history) - n_previous - msg_ids = self.client.history[-nmsgs:] - self.history.extend(msg_ids) - map(self.outstanding.add, msg_ids) + try: + ret = f(self, *args, **kwargs) + finally: + nmsgs = len(self.client.history) - n_previous + msg_ids = self.client.history[-nmsgs:] + self.history.extend(msg_ids) + map(self.outstanding.add, msg_ids) return ret @decorator @@ -71,27 +69,54 @@ class View(HasTraits): """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. Don't use this class, use subclasses. + + Methods + ------- + + spin + flushes incoming results and registration state changes + control methods spin, and requesting `ids` also ensures up to date + + wait + wait on one or more msg_ids + + execution methods + apply + legacy: execute, run + + data movement + push, pull, scatter, gather + + query methods + get_result, queue_status, purge_results, result_status + + control methods + abort, shutdown + """ block=Bool(False) - bound=Bool(False) - track=Bool(False) + track=Bool(True) history=List() outstanding = Set() results = Dict() client = Instance('IPython.zmq.parallel.client.Client') + _socket = Instance('zmq.Socket') _ntargets = Int(1) - _balanced = Bool(False) - _default_names = List(['block', 'bound', 'track']) + _flag_names = List(['block', 'track']) _targets = Any() + _idents = Any() - def __init__(self, client=None, targets=None): - super(View, self).__init__(client=client) - self._targets = targets + def __init__(self, client=None, socket=None, targets=None): + super(View, self).__init__(client=client, _socket=socket) self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) self.block = client.block - for name in self._default_names: + self._idents, self._targets = self.client._build_targets(targets) + if targets is None or isinstance(targets, int): + self._targets = targets + for name in self._flag_names: + # set flags, if they haven't been set yet setattr(self, name, getattr(self, name, None)) assert not self.__class__ is View, "Don't use base View objects, use subclasses" @@ -111,134 +136,127 @@ class View(HasTraits): def targets(self, value): raise AttributeError("Cannot set View `targets` after construction!") - @property - def balanced(self): - return self._balanced - - @balanced.setter - def balanced(self, value): - raise AttributeError("Cannot set View `balanced` after construction!") - - def _defaults(self, *excludes): - """return dict of our default attributes, excluding names given.""" - d = dict(balanced=self._balanced, targets=self._targets) - for name in self._default_names: - if name not in excludes: - d[name] = getattr(self, name) - return d - def set_flags(self, **kwargs): """set my attribute flags by keyword. - A View is a wrapper for the Client's apply method, but - with attributes that specify keyword arguments, those attributes - can be set by keyword argument with this method. + Views determine behavior with a few attributes (`block`, `track`, etc.). + These attributes can be set all at once by name with this method. Parameters ---------- block : bool whether to wait for results - bound : bool - whether to pass the client's Namespace as the first argument - to functions called via `apply`. track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. """ - for key in kwargs: - if key not in self._default_names: - raise KeyError("Invalid name: %r"%key) - for name in ('block', 'bound'): - if name in kwargs: - setattr(self, name, kwargs[name]) + for name, value in kwargs.iteritems(): + if name not in self._flag_names: + raise KeyError("Invalid name: %r"%name) + else: + setattr(self, name, value) + @contextmanager + def temp_flags(self, **kwargs): + """temporarily set flags, for use in `with` statements. + + See set_flags for permanent setting of flags + + Examples + -------- + + >>> view.track=False + ... + >>> with view.temp_flags(track=True): + ... ar = view.apply(dostuff, my_big_array) + ... ar.tracker.wait() # wait for send to finish + >>> view.track + False + + """ + # preflight: save flags, and set temporaries + saved_flags = {} + for f in self._flag_names: + saved_flags[f] = getattr(self, f) + self.set_flags(**kwargs) + # yield to the with-statement block + yield + # postflight: restore saved flags + self.set_flags(**saved_flags) + + #---------------------------------------------------------------- - # wrappers for client methods: + # apply #---------------------------------------------------------------- - @sync_results - def spin(self): - """spin the client, and sync""" - self.client.spin() @sync_results @save_ids + def _really_apply(self, f, args, kwargs, block=None, **options): + """wrapper for client.send_apply_message""" + raise NotImplementedError("Implement in subclasses") + def apply(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines, returning the result. - This method sets all of `client.apply`'s keyword arguments via this - View's attributes. + This method sets all apply flags via this View's attributes. if self.block is False: returns AsyncResult else: returns actual result of f(*args, **kwargs) """ - return self.client.apply(f, args, kwargs, **self._defaults()) + return self._really_apply(f, args, kwargs) - @save_ids def apply_async(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines in a nonblocking manner. returns AsyncResult """ - d = self._defaults('block', 'bound') - return self.client.apply(f,args,kwargs, block=False, bound=False, **d) + return self._really_apply(f, args, kwargs, block=False) @spin_after - @save_ids def apply_sync(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines in a blocking manner, returning the result. returns: actual result of f(*args, **kwargs) """ - d = self._defaults('block', 'bound', 'track') - return self.client.apply(f,args,kwargs, block=True, bound=False, **d) + return self._really_apply(f, args, kwargs, block=True) - # @sync_results - # @save_ids - # def apply_bound(self, f, *args, **kwargs): - # """calls f(*args, **kwargs) bound to engine namespace(s). - # - # if self.block is False: - # returns msg_id - # else: - # returns actual result of f(*args, **kwargs) - # - # This method has access to the targets' namespace via globals() - # - # """ - # d = self._defaults('bound') - # return self.client.apply(f, args, kwargs, bound=True, **d) - # + #---------------------------------------------------------------- + # wrappers for client and control methods + #---------------------------------------------------------------- @sync_results - @save_ids - def apply_async_bound(self, f, *args, **kwargs): - """calls f(*args, **kwargs) bound to engine namespace(s) - in a nonblocking manner. - - The first argument to `f` will be the Engine's Namespace - - returns: AsyncResult + def spin(self): + """spin the client, and sync""" + self.client.spin() + + @sync_results + def wait(self, jobs=None, timeout=-1): + """waits on one or more `jobs`, for up to `timeout` seconds. - """ - d = self._defaults('block', 'bound') - return self.client.apply(f, args, kwargs, block=False, bound=True, **d) - - @spin_after - @save_ids - def apply_sync_bound(self, f, *args, **kwargs): - """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. + Parameters + ---------- - The first argument to `f` will be the Engine's Namespace + jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects + ints are indices to self.history + strs are msg_ids + default: wait on all outstanding messages + timeout : float + a time in seconds, after which to give up. + default is -1, which means no timeout - returns: actual result of f(*args, **kwargs) + Returns + ------- + True : when all msg_ids are done + False : timeout reached, some msg_ids still outstanding """ - d = self._defaults('block', 'bound') - return self.client.apply(f, args, kwargs, block=True, bound=True, **d) + if jobs is None: + jobs = self.history + return self.client.wait(jobs, timeout) def abort(self, jobs=None, block=None): """Abort jobs on my engines. @@ -318,6 +336,7 @@ class View(HasTraits): """Parallel version of `itertools.imap`. See `self.map` for details. + """ return iter(self.map_async(f,*sequences, **kwargs)) @@ -326,14 +345,15 @@ class View(HasTraits): # Decorators #------------------------------------------------------------------- - def remote(self, bound=False, block=True): + def remote(self, block=True, **flags): """Decorator for making a RemoteFunction""" - return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) + block = self.block if block is None else block + return remote(self, block=block, **flags) - def parallel(self, dist='b', bound=False, block=None): + def parallel(self, dist='b', block=None, **flags): """Decorator for making a ParallelFunction""" block = self.block if block is None else block - return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) + return parallel(self, dist=dist, block=block, **flags) @testdec.skip_doctest class DirectView(View): @@ -355,14 +375,65 @@ class DirectView(View): """ - def __init__(self, client=None, targets=None): - super(DirectView, self).__init__(client=client, targets=targets) - self._balanced = False + def __init__(self, client=None, socket=None, targets=None): + super(DirectView, self).__init__(client=client, socket=socket, targets=targets) + - @spin_after + @sync_results @save_ids + def _really_apply(self, f, args=None, kwargs=None, block=None, track=None): + """calls f(*args, **kwargs) on remote engines, returning the result. + + This method sets all of `apply`'s flags via this View's attributes. + + Parameters + ---------- + + f : callable + + args : list [default: empty] + + kwargs : dict [default: empty] + + block : bool [default: self.block] + whether to block + track : bool [default: self.track] + whether to ask zmq to track the message, for safe non-copying sends + + Returns + ------- + + if self.block is False: + returns AsyncResult + else: + returns actual result of f(*args, **kwargs) on the engine(s) + This will be a list of self.targets is also a list (even length 1), or + the single result if self.targets is an integer engine id + """ + args = [] if args is None else args + kwargs = {} if kwargs is None else kwargs + block = self.block if block is None else block + track = self.track if track is None else track + msg_ids = [] + trackers = [] + for ident in self._idents: + msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, + ident=ident) + if track: + trackers.append(msg['tracker']) + msg_ids.append(msg['msg_id']) + tracker = None if track is False else zmq.MessageTracker(*trackers) + ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=self._targets, tracker=tracker) + if block: + try: + return ar.get() + except KeyboardInterrupt: + pass + return ar + + @spin_after def map(self, f, *sequences, **kwargs): - """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult + """view.map(f, *sequences, block=self.block) => list|AsyncMapResult Parallel version of builtin `map`, using this View's `targets`. @@ -380,8 +451,6 @@ class DirectView(View): the sequences to be distributed and passed to `f` block : bool whether to wait for the result or not [default self.block] - bound : bool - whether to pass the client's Namespace as the first argument to `f` Returns ------- @@ -396,70 +465,140 @@ class DirectView(View): the result of map(f,*sequences) """ - block = kwargs.get('block', self.block) - bound = kwargs.get('bound', self.bound) + block = kwargs.pop('block', self.block) for k in kwargs.keys(): - if k not in ['block', 'bound']: + if k not in ['block', 'track']: raise TypeError("invalid keyword arg, %r"%k) assert len(sequences) > 0, "must have some sequences to map onto!" - pf = ParallelFunction(self.client, f, block=block, bound=bound, - targets=self._targets, balanced=False) + pf = ParallelFunction(self, f, block=block, **kwargs) return pf.map(*sequences) - @sync_results - @save_ids def execute(self, code, block=None): - """execute some code on my targets.""" + """Executes `code` on `targets` in blocking or nonblocking manner. - block = block if block is not None else self.block + ``execute`` is always `bound` (affects engine namespace) - return self.client.execute(code, block=block, targets=self._targets) + Parameters + ---------- + + code : str + the code string to be executed + block : bool + whether or not to wait until done to return + default: self.block + """ + return self._really_apply(util._execute, args=(code,), block=block) - @sync_results - @save_ids - def run(self, fname, block=None): - """execute the code in a file on my targets.""" + def run(self, filename, block=None): + """Execute contents of `filename` on my engine(s). - block = block if block is not None else self.block + This simply reads the contents of the file and calls `execute`. - return self.client.run(fname, block=block, targets=self._targets) + Parameters + ---------- + + filename : str + The path to the file + targets : int/str/list of ints/strs + the engines on which to execute + default : all + block : bool + whether or not to wait until done + default: self.block + + """ + with open(filename, 'r') as f: + # add newline in case of trailing indented whitespace + # which will cause SyntaxError + code = f.read()+'\n' + return self.execute(code, block=block) def update(self, ns): - """update remote namespace with dict `ns`""" - return self.client.push(ns, targets=self._targets, block=self.block) + """update remote namespace with dict `ns` + + See `push` for details. + """ + return self.push(ns, block=self.block, track=self.track) - def push(self, ns, block=None): - """update remote namespace with dict `ns`""" + def push(self, ns, block=None, track=None): + """update remote namespace with dict `ns` - block = block if block is not None else self.block + Parameters + ---------- - return self.client.push(ns, targets=self._targets, block=block) + ns : dict + dict of keys with which to update engine namespace(s) + block : bool [default : self.block] + whether to wait to be notified of engine receipt + + """ + + block = block if block is not None else self.block + track = track if track is not None else self.track + # applier = self.apply_sync if block else self.apply_async + if not isinstance(ns, dict): + raise TypeError("Must be a dict, not %s"%type(ns)) + return self._really_apply(util._push, (ns,),block=block, track=track) def get(self, key_s): """get object(s) by `key_s` from remote namespace - will return one object if it is a key. - It also takes a list of keys, and will return a list of objects.""" + + see `pull` for details. + """ # block = block if block is not None else self.block - return self.client.pull(key_s, block=True, targets=self._targets) + return self.pull(key_s, block=True) - @sync_results - @save_ids - def pull(self, key_s, block=True): - """get object(s) by `key_s` from remote namespace + def pull(self, names, block=True): + """get object(s) by `name` from remote namespace + will return one object if it is a key. - It also takes a list of keys, and will return a list of objects.""" + can also take a list of keys, in which case it will return a list of objects. + """ block = block if block is not None else self.block - return self.client.pull(key_s, block=block, targets=self._targets) + applier = self.apply_sync if block else self.apply_async + if isinstance(names, basestring): + pass + elif isinstance(names, (list,tuple,set)): + for key in names: + if not isinstance(key, basestring): + raise TypeError("keys must be str, not type %r"%type(key)) + else: + raise TypeError("names must be strs, not %r"%names) + return applier(util._pull, names) - def scatter(self, key, seq, dist='b', flatten=False, block=None): + def scatter(self, key, seq, dist='b', flatten=False, block=None, track=None): """ Partition a Python sequence and send the partitions to a set of engines. """ block = block if block is not None else self.block + track = track if track is not None else self.track + targets = self._targets + mapObject = Map.dists[dist]() + nparts = len(targets) + msg_ids = [] + trackers = [] + for index, engineid in enumerate(targets): + push = self.client[engineid].push + partition = mapObject.getPartition(seq, index, nparts) + if flatten and len(partition) == 1: + r = push({key: partition[0]}, block=False, track=track) + else: + r = push({key: partition},block=False, track=track) + msg_ids.extend(r.msg_ids) + if track: + trackers.append(r._tracker) - return self.client.scatter(key, seq, dist=dist, flatten=flatten, - targets=self._targets, block=block) + if track: + tracker = zmq.MessageTracker(*trackers) + else: + tracker = None + + r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker) + if block: + r.wait() + else: + return r @sync_results @save_ids @@ -468,8 +607,20 @@ class DirectView(View): Gather a partitioned sequence on a set of engines as a single local seq. """ block = block if block is not None else self.block + mapObject = Map.dists[dist]() + msg_ids = [] + for index, engineid in enumerate(self._targets): + + msg_ids.extend(self.client[engineid].pull(key, block=False).msg_ids) - return self.client.gather(key, dist=dist, targets=self._targets, block=block) + r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather') + + if block: + try: + return r.get() + except KeyboardInterrupt: + pass + return r def __getitem__(self, key): return self.get(key) @@ -523,22 +674,25 @@ class LoadBalancedView(View): Load-balanced views can be created with the client's `view` method: - >>> v = client.view(balanced=True) + >>> v = client.load_balanced_view() or targets can be specified, to restrict the potential destinations: - >>> v = client.view([1,3],balanced=True) + >>> v = client.client.load_balanced_view(([1,3]) which would restrict loadbalancing to between engines 1 and 3. """ - _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] + _flag_names = ['block', 'track', 'follow', 'after', 'timeout'] - def __init__(self, client=None, targets=None): - super(LoadBalancedView, self).__init__(client=client, targets=targets) + def __init__(self, client=None, socket=None, targets=None): + super(LoadBalancedView, self).__init__(client=client, socket=socket, targets=targets) self._ntargets = 1 - self._balanced = True + self._task_scheme=client._task_scheme + if targets is None: + self._targets = None + self._idents=[] def _validate_dependency(self, dep): """validate a dependency. @@ -549,7 +703,7 @@ class LoadBalancedView(View): return True elif isinstance(dep, (list,set, tuple)): for d in dep: - if not isinstance(d, str, AsyncResult): + if not isinstance(d, (str, AsyncResult)): return False elif isinstance(dep, dict): if set(dep.keys()) != set(Dependency().as_dict().keys()): @@ -561,7 +715,21 @@ class LoadBalancedView(View): return False else: return False + + return True + def _render_dependency(self, dep): + """helper for building jsonable dependencies from various input forms.""" + if isinstance(dep, Dependency): + return dep.as_dict() + elif isinstance(dep, AsyncResult): + return dep.msg_ids + elif dep is None: + return [] + else: + # pass to Dependency constructor + return list(Dependency(dep)) + def set_flags(self, **kwargs): """set my attribute flags by keyword. @@ -574,19 +742,28 @@ class LoadBalancedView(View): block : bool whether to wait for results - bound : bool - whether to pass the client's Namespace as the first argument - to functions called via `apply`. track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. - follow : Dependency, list, msg_id, AsyncResult - the location dependencies of tasks - after : Dependency, list, msg_id, AsyncResult - the time dependencies of tasks - timeout : int,None - the timeout to be used for tasks + # + after : Dependency or collection of msg_ids + Only for load-balanced execution (targets=None) + Specify a list of msg_ids as a time-based dependency. + This job will only be run *after* the dependencies + have been met. + + follow : Dependency or collection of msg_ids + Only for load-balanced execution (targets=None) + Specify a list of msg_ids as a location-based dependency. + This job will only be run on an engine where this dependency + is met. + + timeout : float/int or None + Only for load-balanced execution (targets=None) + Specify an amount of time (in seconds) for the scheduler to + wait for dependencies to be met before failing with a + DependencyTimeout. """ super(LoadBalancedView, self).set_flags(**kwargs) @@ -599,23 +776,101 @@ class LoadBalancedView(View): raise ValueError("Invalid dependency: %r"%value) if 'timeout' in kwargs: t = kwargs['timeout'] - if not isinstance(t, (int, long, float, None)): + if not isinstance(t, (int, long, float, type(None))): raise TypeError("Invalid type for timeout: %r"%type(t)) if t is not None: if t < 0: raise ValueError("Invalid timeout: %s"%t) self.timeout = t - + + @sync_results + @save_ids + def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, + after=None, follow=None, timeout=None): + """calls f(*args, **kwargs) on a remote engine, returning the result. + + This method temporarily sets all of `apply`'s flags for a single call. + + Parameters + ---------- + + f : callable + + args : list [default: empty] + + kwargs : dict [default: empty] + + block : bool [default: self.block] + whether to block + track : bool [default: self.track] + whether to ask zmq to track the message, for safe non-copying sends + + !!!!!! TODO: THE REST HERE !!!! + + Returns + ------- + + if self.block is False: + returns AsyncResult + else: + returns actual result of f(*args, **kwargs) on the engine(s) + This will be a list of self.targets is also a list (even length 1), or + the single result if self.targets is an integer engine id + """ + + # validate whether we can run + if self._socket.closed: + msg = "Task farming is disabled" + if self._task_scheme == 'pure': + msg += " because the pure ZMQ scheduler cannot handle" + msg += " disappearing engines." + raise RuntimeError(msg) + + if self._task_scheme == 'pure': + # pure zmq scheme doesn't support dependencies + msg = "Pure ZMQ scheduler doesn't support dependencies" + if (follow or after): + # hard fail on DAG dependencies + raise RuntimeError(msg) + if isinstance(f, dependent): + # soft warn on functional dependencies + warnings.warn(msg, RuntimeWarning) + + # build args + args = [] if args is None else args + kwargs = {} if kwargs is None else kwargs + block = self.block if block is None else block + track = self.track if track is None else track + after = self.after if after is None else after + follow = self.follow if follow is None else follow + timeout = self.timeout if timeout is None else timeout + after = self._render_dependency(after) + follow = self._render_dependency(follow) + subheader = dict(after=after, follow=follow, timeout=timeout, targets=self._idents) + + msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, + subheader=subheader) + tracker = None if track is False else msg['tracker'] + + ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker) + + if block: + try: + return ar.get() + except KeyboardInterrupt: + pass + return ar + @spin_after @save_ids def map(self, f, *sequences, **kwargs): - """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult + """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult Parallel version of builtin `map`, load-balanced by this View. - `block`, `bound`, and `chunk_size` can be specified by keyword only. + `block`, and `chunksize` can be specified by keyword only. - Each `chunk_size` elements will be a separate task, and will be + Each `chunksize` elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive. @@ -628,13 +883,11 @@ class LoadBalancedView(View): the sequences to be distributed and passed to `f` block : bool whether to wait for the result or not [default self.block] - bound : bool - whether to pass the client's Namespace as the first argument to `f` track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. - chunk_size : int + chunksize : int how many elements should be in each task [default 1] Returns @@ -652,19 +905,16 @@ class LoadBalancedView(View): # default block = kwargs.get('block', self.block) - bound = kwargs.get('bound', self.bound) - chunk_size = kwargs.get('chunk_size', 1) + chunksize = kwargs.get('chunksize', 1) keyset = set(kwargs.keys()) - extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size'])) + extra_keys = keyset.difference_update(set(['block', 'chunksize'])) if extra_keys: raise TypeError("Invalid kwargs: %s"%list(extra_keys)) assert len(sequences) > 0, "must have some sequences to map onto!" - pf = ParallelFunction(self.client, f, block=block, bound=bound, - targets=self._targets, balanced=True, - chunk_size=chunk_size) + pf = ParallelFunction(self, f, block=block, chunksize=chunksize) return pf.map(*sequences) __all__ = ['LoadBalancedView', 'DirectView'] \ No newline at end of file diff --git a/docs/examples/newparallel/dagdeps.py b/docs/examples/newparallel/dagdeps.py index 8fe96e3..d65ec66 100644 --- a/docs/examples/newparallel/dagdeps.py +++ b/docs/examples/newparallel/dagdeps.py @@ -53,12 +53,12 @@ def make_bintree(levels): add_children(G, root, levels, 2) return G -def submit_jobs(client, G, jobs): +def submit_jobs(view, G, jobs): """Submit jobs via client where G describes the time dependencies.""" results = {} for node in nx.topological_sort(G): - deps = [ results[n] for n in G.predecessors(node) ] - results[node] = client.apply(jobs[node], after=deps) + with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]): + results[node] = view.apply(jobs[node]) return results def validate_tree(G, results): @@ -76,7 +76,7 @@ def main(nodes, edges): in-degree on the y (just for spread). All arrows must point at least slightly to the right if the graph is valid. """ - import pylab + from matplotlib import pyplot as plt from matplotlib.dates import date2num from matplotlib.cm import gist_rainbow print "building DAG" @@ -88,10 +88,11 @@ def main(nodes, edges): jobs[node] = randomwait client = cmod.Client() + view = client.load_balanced_view() print "submitting %i tasks with %i dependencies"%(nodes,edges) - results = submit_jobs(client, G, jobs) + results = submit_jobs(view, G, jobs) print "waiting for results" - client.barrier() + view.wait() print "done" for node in G: md = results[node].metadata @@ -107,13 +108,13 @@ def main(nodes, edges): xmax,ymax = map(max, (x,y)) xscale = xmax-xmin yscale = ymax-ymin - pylab.xlim(xmin-xscale*.1,xmax+xscale*.1) - pylab.ylim(ymin-yscale*.1,ymax+yscale*.1) + plt.xlim(xmin-xscale*.1,xmax+xscale*.1) + plt.ylim(ymin-yscale*.1,ymax+yscale*.1) return G,results if __name__ == '__main__': - import pylab + from matplotlib import pyplot as plt # main(5,10) main(32,96) - pylab.show() + plt.show() \ No newline at end of file diff --git a/docs/examples/newparallel/demo/dependencies.py b/docs/examples/newparallel/demo/dependencies.py index 492690d..fd43003 100644 --- a/docs/examples/newparallel/demo/dependencies.py +++ b/docs/examples/newparallel/demo/dependencies.py @@ -31,7 +31,7 @@ def getpid2(): import os return os.getpid() -view = client[None] +view = client.load_balanced_view() view.block=True # will run on anything: @@ -58,29 +58,41 @@ successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] mixed = [failures[0],successes[0]] -d1a = Dependency(mixed, mode='any', success_only=False) # yes -d1b = Dependency(mixed, mode='any', success_only=True) # yes -d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow -d2b = Dependency(mixed, mode='all', success_only=True) # no -d3 = Dependency(failures, mode='any', success_only=True) # no -d4 = Dependency(failures, mode='any', success_only=False) # yes -d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow -d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow - -client.block = False - -r1a = client.apply(getpid, after=d1a) -r1b = client.apply(getpid, follow=d1b) -r2a = client.apply(getpid, after=d2b, follow=d2a) -r2b = client.apply(getpid, after=d2a, follow=d2b) -r3 = client.apply(getpid, after=d3) -r4a = client.apply(getpid, after=d4) -r4b = client.apply(getpid, follow=d4) -r4c = client.apply(getpid, after=d3, follow=d4) -r5 = client.apply(getpid, after=d5) -r5b = client.apply(getpid, follow=d5, after=d3) -r6 = client.apply(getpid, follow=d6) -r6b = client.apply(getpid, after=d6, follow=d2b) +d1a = Dependency(mixed, all=False, failure=True) # yes +d1b = Dependency(mixed, all=False) # yes +d2a = Dependency(mixed, all=True, failure=True) # yes after / no follow +d2b = Dependency(mixed, all=True) # no +d3 = Dependency(failures, all=False) # no +d4 = Dependency(failures, all=False, failure=True) # yes +d5 = Dependency(failures, all=True, failure=True) # yes after / no follow +d6 = Dependency(successes, all=True, failure=True) # yes after / no follow + +view.block = False +flags = view.temp_flags +with flags(after=d1a): + r1a = view.apply(getpid) +with flags(follow=d1b): + r1b = view.apply(getpid) +with flags(after=d2b, follow=d2a): + r2a = view.apply(getpid) +with flags(after=d2a, follow=d2b): + r2b = view.apply(getpid) +with flags(after=d3): + r3 = view.apply(getpid) +with flags(after=d4): + r4a = view.apply(getpid) +with flags(follow=d4): + r4b = view.apply(getpid) +with flags(after=d3, follow=d4): + r4c = view.apply(getpid) +with flags(after=d5): + r5 = view.apply(getpid) +with flags(follow=d5, after=d3): + r5b = view.apply(getpid) +with flags(follow=d6): + r6 = view.apply(getpid) +with flags(after=d6, follow=d2b): + r6b = view.apply(getpid) def should_fail(f): try: diff --git a/docs/examples/newparallel/demo/map.py b/docs/examples/newparallel/demo/map.py index ccaa070..50d6af1 100644 --- a/docs/examples/newparallel/demo/map.py +++ b/docs/examples/newparallel/demo/map.py @@ -1,8 +1,9 @@ from IPython.zmq.parallel.client import * client = Client() +view = client[:] -@remote(client, block=True) +@view.remote(block=True) def square(a): """return square of a number""" return a*a @@ -21,7 +22,7 @@ squares2 = [ r.get() for r in arlist ] # now the more convenient @parallel decorator, which has a map method: -@parallel(client, block=False) +@view.parallel(block=False) def psquare(a): """return square of a number""" return a*a diff --git a/docs/examples/newparallel/demo/remotefunction.py b/docs/examples/newparallel/demo/remotefunction.py deleted file mode 100644 index 84a360f..0000000 --- a/docs/examples/newparallel/demo/remotefunction.py +++ /dev/null @@ -1,22 +0,0 @@ -from IPython.zmq.parallel.client import * - -client = Client() - -@remote(client, bound=True) -def getkey(name): - """fetch something from globals""" - return globals().get(name) - -@remote(client, bound=True, targets='all') -def setpids(): - import os - globals()['pid'] = os.getpid() - -# set pid in the globals -setpids() -getkey('pid') -getkey.targets=[1,2] -getkey('pid') -getkey.bound=False -getkey('pid') is None - diff --git a/docs/examples/newparallel/demo/views.py b/docs/examples/newparallel/demo/views.py index f5bb4c1..891a5a6 100644 --- a/docs/examples/newparallel/demo/views.py +++ b/docs/examples/newparallel/demo/views.py @@ -3,12 +3,12 @@ from IPython.zmq.parallel.client import * client = Client() for id in client.ids: - client.push(dict(ids=id*id), targets=id) + client[id].push(dict(ids=id*id)) -rns = client[0] -rns['a'] = 5 +v = client[0] +v['a'] = 5 -print rns['a'] +print v['a'] remotes = client[:] diff --git a/docs/examples/newparallel/fetchparse.py b/docs/examples/newparallel/fetchparse.py new file mode 100644 index 0000000..0691cc1 --- /dev/null +++ b/docs/examples/newparallel/fetchparse.py @@ -0,0 +1,97 @@ +""" +An exceptionally lousy site spider +Ken Kinder + +Updated for newparallel by Min Ragan-Kelley + +This module gives an example of how the task interface to the +IPython controller works. Before running this script start the IPython controller +and some engines using something like:: + + ipclusterz start -n 4 +""" +import sys +from IPython.zmq.parallel import client, error +import time +import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier + +def fetchAndParse(url, data=None): + import urllib2 + import urlparse + import BeautifulSoup + links = [] + try: + page = urllib2.urlopen(url, data=data) + except Exception: + return links + else: + if page.headers.type == 'text/html': + doc = BeautifulSoup.BeautifulSoup(page.read()) + for node in doc.findAll('a'): + href = node.get('href', None) + if href: + links.append(urlparse.urljoin(url, href)) + return links + +class DistributedSpider(object): + + # Time to wait between polling for task results. + pollingDelay = 0.5 + + def __init__(self, site): + self.client = client.Client() + self.view = self.client.load_balanced_view() + self.mux = self.client[:] + + self.allLinks = [] + self.linksWorking = {} + self.linksDone = {} + + self.site = site + + def visitLink(self, url): + if url not in self.allLinks: + self.allLinks.append(url) + if url.startswith(self.site): + print ' ', url + self.linksWorking[url] = self.view.apply(fetchAndParse, url) + + def onVisitDone(self, links, url): + print url, ':' + self.linksDone[url] = None + del self.linksWorking[url] + for link in links: + self.visitLink(link) + + def run(self): + self.visitLink(self.site) + while self.linksWorking: + print len(self.linksWorking), 'pending...' + self.synchronize() + time.sleep(self.pollingDelay) + + def synchronize(self): + for url, ar in self.linksWorking.items(): + # Calling get_task_result with block=False will return None if the + # task is not done yet. This provides a simple way of polling. + try: + links = ar.get(0) + except error.TimeoutError: + continue + except Exception as e: + self.linksDone[url] = None + del self.linksWorking[url] + print url, ':', e.traceback + else: + self.onVisitDone(links, url) + +def main(): + if len(sys.argv) > 1: + site = sys.argv[1] + else: + site = raw_input('Enter site to crawl: ') + distributedSpider = DistributedSpider(site) + distributedSpider.run() + +if __name__ == '__main__': + main() diff --git a/docs/examples/newparallel/helloworld.py b/docs/examples/newparallel/helloworld.py new file mode 100644 index 0000000..6e578e7 --- /dev/null +++ b/docs/examples/newparallel/helloworld.py @@ -0,0 +1,19 @@ +""" +A Distributed Hello world +Ken Kinder +""" +from IPython.zmq.parallel import client + +rc = client.Client() + +def sleep_and_echo(t, msg): + import time + time.sleep(t) + return msg + +view = rc.load_balanced_view() + +world = view.apply_async(sleep_and_echo, 3, 'World!') +hello = view.apply_async(sleep_and_echo, 2, 'Hello') +print "Submitted tasks:", hello.msg_ids, world.msg_ids +print hello.get(), world.get() diff --git a/docs/examples/newparallel/mcdriver.py b/docs/examples/newparallel/mcdriver.py index f068c10..4a53150 100644 --- a/docs/examples/newparallel/mcdriver.py +++ b/docs/examples/newparallel/mcdriver.py @@ -49,7 +49,7 @@ c = client.Client(profile=cluster_profile) # A LoadBalancedView is an interface to the engines that provides dynamic load # balancing at the expense of not knowing which engine will execute the code. -view = c.view() +view = c.load_balanced_view() # Initialize the common code on the engines. This Python module has the # price_options function that prices the options. @@ -75,7 +75,7 @@ print "Submitted tasks: ", len(async_results) sys.stdout.flush() # Block until all tasks are completed. -c.barrier(async_results) +c.wait(async_results) t2 = time.time() t = t2-t1 diff --git a/docs/examples/newparallel/parallelpi.py b/docs/examples/newparallel/parallelpi.py index 3b6d32f..e59dc8a 100644 --- a/docs/examples/newparallel/parallelpi.py +++ b/docs/examples/newparallel/parallelpi.py @@ -27,14 +27,14 @@ filestring = 'pi200m.ascii.%(i)02dof20' files = [filestring % {'i':i} for i in range(1,16)] # Connect to the IPython cluster -c = client.Client(profile='edison') -c.run('pidigits.py') +c = client.Client() +c[:].run('pidigits.py') # the number of engines n = len(c) id0 = c.ids[0] v = c[:] -v.set_flags(bound=True,block=True) +v.block=True # fetch the pi-files print "downloading %i files of pi"%n v.map(fetch_pi_file, files[:n]) diff --git a/docs/examples/newparallel/wave2D/parallelwave-mpi.py b/docs/examples/newparallel/wave2D/parallelwave-mpi.py index 78455bf..15dcbdc 100755 --- a/docs/examples/newparallel/wave2D/parallelwave-mpi.py +++ b/docs/examples/newparallel/wave2D/parallelwave-mpi.py @@ -30,17 +30,19 @@ from numpy import exp, zeros, newaxis, sqrt from IPython.external import argparse from IPython.zmq.parallel.client import Client, Reference -def setup_partitioner(ns, index, num_procs, gnum_cells, parts): +def setup_partitioner(index, num_procs, gnum_cells, parts): """create a partitioner in the engine namespace""" + global partitioner p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) p.redim(global_num_cells=gnum_cells, num_parts=parts) p.prepare_communication() # put the partitioner into the global namespace: - ns.partitioner=p + partitioner=p -def setup_solver(ns, *args, **kwargs): +def setup_solver(*args, **kwargs): """create a WaveSolver in the engine namespace""" - ns.solver = WaveSolver(*args, **kwargs) + global solver + solver = WaveSolver(*args, **kwargs) def wave_saver(u, x, y, t): """save the wave log""" @@ -146,11 +148,11 @@ if __name__ == '__main__': # setup remote partitioner # note that Reference means that the argument passed to setup_partitioner will be the # object named 'my_id' in the engine's namespace - view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition) + view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition) # wait for initial communication to complete view.execute('mpi.barrier()') # setup remote solvers - view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) # lambda for calling solver.solve: _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) @@ -172,7 +174,7 @@ if __name__ == '__main__': impl['inner'] = 'vectorized' # setup new solvers - view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) view.execute('mpi.barrier()') # run again with numpy vectorized inner-implementation diff --git a/docs/examples/newparallel/wave2D/parallelwave.py b/docs/examples/newparallel/wave2D/parallelwave.py index fc2a7e8..3309267 100755 --- a/docs/examples/newparallel/wave2D/parallelwave.py +++ b/docs/examples/newparallel/wave2D/parallelwave.py @@ -30,17 +30,19 @@ from numpy import exp, zeros, newaxis, sqrt from IPython.external import argparse from IPython.zmq.parallel.client import Client, Reference -def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts): +def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): """create a partitioner in the engine namespace""" + global partitioner p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) p.redim(global_num_cells=gnum_cells, num_parts=parts) p.prepare_communication() # put the partitioner into the global namespace: - ns.partitioner=p + partitioner=p -def setup_solver(ns, *args, **kwargs): +def setup_solver(*args, **kwargs): """create a WaveSolver in the engine namespace.""" - ns.solver = WaveSolver(*args, **kwargs) + global solver + solver = WaveSolver(*args, **kwargs) def wave_saver(u, x, y, t): """save the wave state for each timestep.""" @@ -156,7 +158,7 @@ if __name__ == '__main__': # setup remote partitioner # note that Reference means that the argument passed to setup_partitioner will be the # object named 'com' in the engine's namespace - view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) + view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) time.sleep(1) # convenience lambda to call solver.solve: _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) @@ -164,7 +166,7 @@ if __name__ == '__main__': if ns.scalar: impl['inner'] = 'scalar' # setup remote solvers - view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) + view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) # run first with element-wise Python operations for each cell t0 = time.time() @@ -182,7 +184,7 @@ if __name__ == '__main__': # run again with faster numpy-vectorized inner implementation: impl['inner'] = 'vectorized' # setup remote solvers - view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) t0 = time.time() diff --git a/docs/source/index.txt b/docs/source/index.txt index 8718ae9..7009b84 100644 --- a/docs/source/index.txt +++ b/docs/source/index.txt @@ -19,7 +19,7 @@ Contents whatsnew/index.txt install/index.txt interactive/index.txt - parallel/index.txt + .. parallel/index.txt parallelz/index.txt config/index.txt development/index.txt diff --git a/docs/source/parallelz/dag_dependencies.txt b/docs/source/parallelz/dag_dependencies.txt index 61823d8..488a953 100644 --- a/docs/source/parallelz/dag_dependencies.txt +++ b/docs/source/parallelz/dag_dependencies.txt @@ -45,7 +45,7 @@ A possible sequence of events for this workflow: Further, taking failures into account, assuming all dependencies are run with the default -`success_only=True`, the following cases would occur for each node's failure: +`success=True,failure=False`, the following cases would occur for each node's failure: 0. fails: all other tasks fail as Impossible 1. 2 can still succeed, but 3,4 are unreachable @@ -111,7 +111,8 @@ on which it depends: .. sourcecode:: ipython - In [5]: c = client.Client() + In [5]: rc = client.Client() + In [5]: view = rc.load_balanced_view() In [6]: results = {} @@ -120,13 +121,13 @@ on which it depends: ...: # leading into this one as dependencies ...: deps = [ results[n] for n in G.predecessors(node) ] ...: # submit and store AsyncResult object - ...: results[node] = client.apply(jobs[node], after=deps, block=False) + ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False) Now that we have submitted all the jobs, we can wait for the results: .. sourcecode:: ipython - In [8]: [ r.get() for r in results.values() ] + In [8]: view.wait(results.values()) Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have raised an error if a task failed). But we don't know that the ordering was properly diff --git a/docs/source/parallelz/index.txt b/docs/source/parallelz/index.txt index 07ed2d4..798c3c5 100644 --- a/docs/source/parallelz/index.txt +++ b/docs/source/parallelz/index.txt @@ -17,5 +17,6 @@ Using IPython for parallel computing (ZMQ) parallel_demos.txt dag_dependencies.txt parallel_details.txt + parallel_transition.txt diff --git a/docs/source/parallelz/parallel_demos.txt b/docs/source/parallelz/parallel_demos.txt index e6c0fbf..940e640 100644 --- a/docs/source/parallelz/parallel_demos.txt +++ b/docs/source/parallelz/parallel_demos.txt @@ -135,7 +135,7 @@ calculation can also be run by simply typing the commands from # We simply pass Client the name of the cluster profile we # are using. In [2]: c = client.Client(profile='mycluster') - In [3]: view = c.view(balanced=True) + In [3]: view = c.load_balanced_view() In [3]: c.ids Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] @@ -166,11 +166,11 @@ calculation can also be run by simply typing the commands from 'pi200m.ascii.15of20'] # download the data files if they don't already exist: - In [8]: c.map(fetch_pi_file, files) + In [8]: v.map(fetch_pi_file, files) # This is the parallel calculation using the Client.map method # which applies compute_two_digit_freqs to each file in files in parallel. - In [9]: freqs_all = c.map(compute_two_digit_freqs, files) + In [9]: freqs_all = v.map(compute_two_digit_freqs, files) # Add up the frequencies from each engine. In [10]: freqs = reduce_freqs(freqs_all) diff --git a/docs/source/parallelz/parallel_details.txt b/docs/source/parallelz/parallel_details.txt index 7ef84cf..50ae7b8 100644 --- a/docs/source/parallelz/parallel_details.txt +++ b/docs/source/parallelz/parallel_details.txt @@ -18,13 +18,14 @@ Non-copying sends and numpy arrays ---------------------------------- When numpy arrays are passed as arguments to apply or via data-movement methods, they are not -copied. This means that you must be careful if you are sending an array that you intend to work on. -PyZMQ does allow you to track when a message has been sent so you can know when it is safe to edit the buffer, but -IPython only allows for this. +copied. This means that you must be careful if you are sending an array that you intend to work +on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe +to edit the buffer, but IPython only allows for this. -It is also important to note that the non-copying receive of a message is *read-only*. That -means that if you intend to work in-place on an array that you have sent or received, you must copy -it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as results. +It is also important to note that the non-copying receive of a message is *read-only*. That +means that if you intend to work in-place on an array that you have sent or received, you must +copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as +results. The following will fail: @@ -69,6 +70,24 @@ The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an ar In [6]: _.flags.writeable Out[6]: False +If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You +must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method +for checking and waiting for 0MQ to finish with a buffer. + +.. sourcecode:: ipython + + In [5]: A = numpy.random.random((1024,1024)) + + In [6]: view.track=True + + In [7]: ar = view.apply_async(lambda x: 2*x, A) + + In [8]: ar.sent + Out[8]: False + + In [9]: ar.wait_on_send() # blocks until sent is True + + What is sendable? ----------------- @@ -83,6 +102,61 @@ buffer without copying - and reconstruct the object on the other side in your ow possible that the object reconstruction will become extensible, so you can add your own non-copying types, but this does not yet exist. +Closures +******** + +Just about anything in Python is pickleable. The one notable exception is objects (generally +functions) with *closures*. Closures can be a complicated topic, but the basic principal is that +functions that refer to variables in their parent scope have closures. + +An example of a function that uses a closure: + +.. sourcecode:: python + + def f(a): + def inner(): + # inner will have a closure + return a + return echo + + f1 = f(1) + f2 = f(2) + f1() # returns 1 + f2() # returns 2 + +f1 and f2 will have closures referring to the scope in which `inner` was defined, because they +use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython. +Note that you *would* be able to send `f`. This is only true for interactively defined +functions (as are often used in decorators), and only when there are variables used inside the +inner function, that are defined in the outer function. If the names are *not* in the outer +function, then there will not be a closure, and the generated function will look in +``globals()`` for the name: + +.. sourcecode:: python + + def g(b): + # note that `b` is not referenced in inner's scope + def inner(): + # this inner will *not* have a closure + return a + return echo + g1 = g(1) + g2 = g(2) + g1() # raises NameError on 'a' + a=5 + g2() # returns 5 + +`g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as +globals(). The :meth:`pull` method is implemented based on this principal. If we did not +provide pull, you could implement it yourself with `apply`, by simply returning objects out +of the global namespace: + +.. sourcecode:: ipython + + In [10]: view.apply(lambda : a) + + # is equivalent to + In [11]: view.pull('a') Running Code ============ @@ -94,7 +168,9 @@ Client method, called `apply`. Apply ----- -The principal method of remote execution is :meth:`apply`, of Client and View objects. The Client provides the full execution and communication API for engines via its apply method. +The principal method of remote execution is :meth:`apply`, of View objects. The Client provides +the full execution and communication API for engines via its low-level +:meth:`send_apply_message` method. f : function The fuction to be called remotely @@ -102,8 +178,6 @@ args : tuple/list The positional arguments passed to `f` kwargs : dict The keyword arguments passed to `f` -bound : bool (default: False) - Whether to pass the Engine(s) Namespace as the first argument to `f`. block : bool (default: self.block) Whether to wait for the result, or return immediately. False: @@ -135,8 +209,6 @@ balanced : bool, default None If `balanced` and `targets` are both specified, the task will be assigne to *one* of the targets by the scheduler. -The following arguments are only used when balanced is True: - after : Dependency or collection of msg_ids Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. @@ -158,11 +230,11 @@ timeout : float/int or None execute and run --------------- -For executing strings of Python code, Clients also provide an :meth:`execute` and a :meth:`run` -method, which rather than take functions and arguments, take simple strings. `execute` simply -takes a string of Python code to execute, and sends it to the Engine(s). `run` is the same as -`execute`, but for a *file*, rather than a string. It is simply a wrapper that does something -very similar to ``execute(open(f).read())``. +For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a +:meth:`run` method, which rather than take functions and arguments, take simple strings. +`execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run` +is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that +does something very similar to ``execute(open(f).read())``. .. note:: @@ -172,44 +244,25 @@ Views ===== The principal extension of the :class:`~parallel.client.Client` is the -:class:`~parallel.view.View` class. The client is a fairly stateless object with respect to -execution patterns, where you must specify everything about the execution as keywords to each -call to :meth:`apply`. For users who want to more conveniently specify various options for -several similar calls, we have the :class:`~parallel.view.View` objects. The basic principle of -the views is to encapsulate the keyword arguments to :meth:`client.apply` as attributes, -allowing users to specify them once and apply to any subsequent calls until the attribute is -changed. +:class:`~parallel.view.View` class. The client Two of apply's keyword arguments are set at the construction of the View, and are immutable for a given View: `balanced` and `targets`. `balanced` determines whether the View will be a :class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets` attribute. Attempts to change this will raise errors. -Views are cached by targets+balanced combinations, so requesting a view multiple times will always return the *same object*, not create a new one: +Views are cached by targets/class, so requesting a view multiple times will always return the +*same object*, not create a new one: .. sourcecode:: ipython - In [3]: v1 = rc.view([1,2,3], balanced=True) - In [4]: v2 = rc.view([1,2,3], balanced=True) + In [3]: v1 = rc.load_balanced_view([1,2,3]) + In [4]: v2 = rc.load_balanced_view([1,2,3]) In [5]: v2 is v1 Out[5]: True -A :class:`View` always uses its `targets` attribute, and it will use its `bound` -and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x` -methods allow overriding `bound` and `block` for a single call. - -================== ========== ========== -method block bound -================== ========== ========== -apply self.block self.bound -apply_sync True False -apply_async False False -apply_sync_bound True True -apply_async_bound False True -================== ========== ========== - DirectView ---------- @@ -379,24 +432,26 @@ interactive session - you must poll the 0MQ sockets for incoming messages. Note this polling *does not* actually make any network requests. It simply performs a `select` operation, to check if messages are already in local memory, waiting to be handled. -The method that handles incoming messages is :meth:`spin`. This method flushes any waiting messages on the various incoming sockets, and updates the state of the Client. +The method that handles incoming messages is :meth:`spin`. This method flushes any waiting +messages on the various incoming sockets, and updates the state of the Client. -If you need to wait for particular results to finish, you can use the :meth:`barrier` method, +If you need to wait for particular results to finish, you can use the :meth:`wait` method, which will call :meth:`spin` until the messages are no longer outstanding. Anything that represents a collection of messages, such as a list of msg_ids or one or more AsyncResult -objects, can be passed as argument to barrier. A timeout can be specified, which will prevent -the barrier from blocking for more than a specified time, but the default behavior is to wait +objects, can be passed as argument to wait. A timeout can be specified, which will prevent +the call from blocking for more than a specified time, but the default behavior is to wait forever. The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies. -This is the default if barrier is called with no arguments - i.e. barrier on *all* outstanding messages. +This is the default if wait is called with no arguments - i.e. wait on *all* outstanding +messages. .. note:: - TODO barrier example + TODO wait example Map === diff --git a/docs/source/parallelz/parallel_intro.txt b/docs/source/parallelz/parallel_intro.txt index 4b24855..de8d159 100644 --- a/docs/source/parallelz/parallel_intro.txt +++ b/docs/source/parallelz/parallel_intro.txt @@ -89,7 +89,7 @@ same machine as the Hub, but can be run anywhere from local threads or on remote The controller also provides a single point of contact for users who wish to utilize the engines connected to the controller. There are different ways of working with a controller. In IPython, all of these models are implemented via -the client's :meth:`.Client.apply` method, with various arguments, or +the client's :meth:`.View.apply` method, with various arguments, or constructing :class:`.View` objects to represent subsets of engines. The two primary models for interacting with engines are: @@ -124,12 +124,13 @@ themselves block when user code is run, the schedulers hide that from the user t a fully asynchronous interface to a set of engines. -IPython client --------------- +IPython client and views +------------------------ -There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a -controller. For each model, there is a corresponding view. These views allow users to -interact with a set of engines through the interface. Here are the two default views: +There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a cluster. +For each execution model, there is a corresponding :class:`~.parallel.view.View`. These views +allow users to interact with a set of engines through the interface. Here are the two default +views: * The :class:`DirectView` class for explicit addressing. * The :class:`LoadBalancedView` class for destination-agnostic scheduling. @@ -212,7 +213,7 @@ everything is working correctly, try the following commands: In [4]: c.ids Out[4]: set([0, 1, 2, 3]) - In [5]: c.apply(lambda : "Hello, World", targets='all', block=True) + In [5]: c[:].apply_sync(lambda : "Hello, World") Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ] @@ -234,10 +235,10 @@ then you would connect to it with: In [2]: c = client.Client(sshserver='myhub.example.com') Where 'myhub.example.com' is the url or IP address of the machine on -which the Hub process is running. +which the Hub process is running (or another machine that has direct access to the Hub's ports). You are now ready to learn more about the :ref:`Direct -` and :ref:`LoadBalanced ` interfaces to the +` and :ref:`LoadBalanced ` interfaces to the controller. .. [ZeroMQ] ZeroMQ. http://www.zeromq.org diff --git a/docs/source/parallelz/parallel_multiengine.txt b/docs/source/parallelz/parallel_multiengine.txt index ee1b649..3f445b8 100644 --- a/docs/source/parallelz/parallel_multiengine.txt +++ b/docs/source/parallelz/parallel_multiengine.txt @@ -1,4 +1,4 @@ -.. _parallelmultiengine: +.. _parallel_multiengine: ========================== IPython's Direct interface @@ -9,7 +9,7 @@ IPython engines. The basic idea behind the multiengine interface is that the capabilities of each engine are directly and explicitly exposed to the user. Thus, in the multiengine interface, each engine is given an id that is used to identify the engine and give it work to do. This interface is very intuitive -and is designed with interactive usage in mind, and is thus the best place for +and is designed with interactive usage in mind, and is the best place for new users of IPython to begin. Starting the IPython controller and engines @@ -91,9 +91,7 @@ DirectView's :meth:`map` method: In [62]: serial_result = map(lambda x:x**10, range(32)) - In [63]: dview.block = True - - In [66]: parallel_result = dview.map(lambda x: x**10, range(32)) + In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32)) In [67]: serial_result==parallel_result Out[67]: True @@ -103,8 +101,7 @@ DirectView's :meth:`map` method: The :class:`DirectView`'s version of :meth:`map` does not do dynamic load balancing. For a load balanced version, use a - :class:`LoadBalancedView`, or a :class:`ParallelFunction` with - `balanced=True`. + :class:`LoadBalancedView`. .. seealso:: @@ -119,7 +116,7 @@ two decorators: .. sourcecode:: ipython - In [10]: @rc.remote(block=True, targets='all') + In [10]: @dview.remote(block=True) ...: def getpid(): ...: import os ...: return os.getpid() @@ -128,7 +125,7 @@ two decorators: In [11]: getpid() Out[11]: [12345, 12346, 12347, 12348] -A ``@parallel`` decorator creates parallel functions, that break up an element-wise +The ``@parallel`` decorator creates parallel functions, that break up an element-wise operations and distribute them, reconstructing the result. .. sourcecode:: ipython @@ -137,13 +134,13 @@ operations and distribute them, reconstructing the result. In [13]: A = np.random.random((64,48)) - In [14]: @rc.parallel(block=True, targets='all') + In [14]: @dview.parallel(block=True) ...: def pmul(A,B): ...: return A*B In [15]: C_local = A*A - In [16]: C_remote_partial = pmul(A,A) + In [16]: C_remote = pmul(A,A) In [17]: (C_local == C_remote).all() Out[17]: True @@ -159,38 +156,36 @@ Calling Python functions The most basic type of operation that can be performed on the engines is to execute Python code or call Python functions. Executing Python code can be done in blocking or non-blocking mode (non-blocking is default) using the -:meth:`execute` method, and calling functions can be done via the +:meth:`.View.execute` method, and calling functions can be done via the :meth:`.View.apply` method. apply ----- The main method for doing remote execution (in fact, all methods that -communicate with the engines are built on top of it), is :meth:`Client.apply`. -Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``, -which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients` -require some more options, they cannot easily provide this interface. -Instead, they provide the signature: - -.. sourcecode:: python +communicate with the engines are built on top of it), is :meth:`View.apply`. - c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None, - after=None, follow=None, timeout=None) +We strive to provide the cleanest interface we can, so `apply` has the following +signature: -Where various behavior is controlled via keyword arguments. This means that in the client, -you must pass `args` as a tuple, and `kwargs` as a dict. +.. sourcecode:: python -In order to provide the nicer interface, we have :class:`View` classes, which wrap -:meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine -the extra keyword arguments. This means that the views can have the desired pattern: + view.apply(f, *args, **kwargs) -.. sourcecode:: python +There are various ways to call functions with IPython, and these flags are set as +attributes of the View. The ``DirectView`` has just two of these flags: - v.apply(f, *args, **kwargs) +dv.block : bool + whether to wait for the result, or return an :class:`AsyncResult` object + immediately +dv.track : bool + whether to instruct pyzmq to track when + This is primarily useful for non-copying sends of numpy arrays that you plan to + edit in-place. You need to know when it becomes safe to edit the buffer + without corrupting the message. -For instance, performing index-access on a client creates a -:class:`.DirectView`. +Creating a view is simple: index-access on a client creates a :class:`.DirectView`. .. sourcecode:: ipython @@ -198,23 +193,9 @@ For instance, performing index-access on a client creates a Out[4]: In [5]: view.apply - view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound - -A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound` -and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x` -methods allow specifying `bound` and `block` via the different methods. + view.apply view.apply_async view.apply_sync view.apply_with_flags -================== ========== ========== -method block bound -================== ========== ========== -apply self.block self.bound -apply_sync True False -apply_async False False -apply_sync_bound True True -apply_async_bound False True -================== ========== ========== - -For explanation of these values, read on. +For convenience, you can set block temporarily for a single call with the extra sync/async methods. Blocking execution ------------------ @@ -232,63 +213,29 @@ blocks until the engines are done executing the command: In [5]: dview['b'] = 10 - In [6]: dview.apply_sync(lambda x: a+b+x, 27) + In [6]: dview.apply(lambda x: a+b+x, 27) Out[6]: [42, 42, 42, 42] -Python commands can be executed on specific engines by calling execute using the ``targets`` -keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by -index-access to the client: - -.. sourcecode:: ipython - - In [6]: rc.execute('c=a+b', targets=[0,2]) - - In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3]) - - In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all') - Out[8]: [15, -5, 15, -5] - -.. note:: +You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync` +method: - Note that every call to ``rc.(...,targets=x)`` can be made via - ``rc[].(...)``, which constructs a View object. The only place - where this differs in in :meth:`apply`. The :class:`Client` takes many - arguments to apply, so it requires `args` and `kwargs` to be passed as - individual arguments. Extended options such as `bound`,`targets`, and - `block` are controlled by the attributes of the :class:`View` objects, so - they can provide the much more convenient - :meth:`View.apply(f,*args,**kwargs)`, which simply calls - ``f(*args,**kwargs)`` remotely. + In [7]: dview.block=False -Bound and unbound execution ---------------------------- + In [8]: dview.apply_sync(lambda x: a+b+x, 27) + Out[8]: [42, 42, 42, 42] -The previous example also shows one of the most important things about the IPython -engines: they have a persistent user namespaces. The :meth:`apply` method can -be run in either a bound or unbound manner. +Python commands can be executed as strings on specific engines by using a View's ``execute`` +method: -When applying a function in a `bound` manner, the first argument to that function -will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary -also providing attribute-access to keys. +.. sourcecode:: ipython -In all (unbound and bound) execution + In [6]: rc[::2].execute('c=a+b') -.. sourcecode:: ipython + In [7]: rc[1::2].execute('c=a-b') - In [9]: dview['b'] = 5 # assign b to 5 everywhere - - In [10]: v0 = rc[0] - - # multiply b*2 inplace - In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2) - - # b is still available in globals during unbound execution - In [13]: v0.apply_sync(lambda a: a*b, 3) - Out[13]: 30 + In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True) + Out[8]: [15, -5, 15, -5] -`bound=True` specifies that the engine's namespace is to be passed as the first argument when -the function is called, and the default `bound=False` specifies that the normal behavior, but -the engine's namespace will be available as the globals() when the function is called. Non-blocking execution ---------------------- @@ -351,22 +298,24 @@ local Python/IPython session: .. Note:: Note the import inside the function. This is a common model, to ensure - that the appropriate modules are imported where the task is run. + that the appropriate modules are imported where the task is run. You can + also manually import modules into the engine(s) namespace(s) via + :meth:`view.execute('import numpy')`. Often, it is desirable to wait until a set of :class:`AsyncResult` objects -are done. For this, there is a the method :meth:`barrier`. This method takes a +are done. For this, there is a the method :meth:`wait`. This method takes a tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History), and blocks until all of the associated results are ready: .. sourcecode:: ipython - In [72]: rc.block=False + In [72]: dview.block=False # A trivial list of AsyncResults objects In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] # Wait until all of them are done - In [74]: rc.barrier(pr_list) + In [74]: dview.wait(pr_list) # Then, their results are ready using get() or the `.r` attribute In [75]: pr_list[0].get() @@ -374,12 +323,12 @@ and blocks until all of the associated results are ready: -The ``block`` keyword argument and attributes ---------------------------------------------- +The ``block`` attribute +----------------------- -Most client methods(like :meth:`apply`) accept +Many View methods(excluding :meth:`apply`) accept ``block`` as a keyword argument. As we have seen above, these -keyword arguments control the blocking mode. The :class:`Client` class also has +keyword arguments control the blocking mode. The :class:`View` class also has a :attr:`block` attribute that controls the default behavior when the keyword argument is not provided. Thus the following logic is used for :attr:`block`: @@ -387,37 +336,33 @@ argument is not provided. Thus the following logic is used for :attr:`block`: * Keyword argument, if provided override the instance attributes for the duration of a single call. -DirectView objects also have a ``bound`` attribute, which is used in the same way. - The following examples demonstrate how to use the instance attributes: .. sourcecode:: ipython - In [17]: rc.block = False + In [17]: dview.block = False - In [18]: ar = rc.apply(lambda : 10, targets=[0,2]) + In [18]: ar = dview.apply(lambda : 10) In [19]: ar.get() - Out[19]: [10,10] + Out[19]: [10, 10, 10, 10] - In [21]: rc.block = True + In [21]: dview.block = True # Note targets='all' means all engines - In [22]: rc.apply(lambda : 42, targets='all') + In [22]: dview.apply(lambda : 42) Out[22]: [42, 42, 42, 42] -The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the +The :attr:`block` and :attr:`targets` instance attributes of the :class:`.DirectView` also determine the behavior of the parallel magic commands. - Parallel magic commands ----------------------- .. warning:: - The magics have not been changed to work with the zeromq system. ``%px`` - and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do - not* print stdin/out. + The magics have not been changed to work with the zeromq system. The + magics do work, but *do not* print stdin/out like they used to in IPython.kernel. We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines @@ -428,6 +373,9 @@ Python command on the engines specified by the :attr:`targets` attribute of the .. sourcecode:: ipython + # load the parallel magic extension: + In [21]: %load_ext parallelmagic + # Create a DirectView for all targets In [22]: dv = rc[:] @@ -512,7 +460,7 @@ Moving Python objects around In addition to calling functions and executing code on engines, you can transfer Python objects to and from your IPython session and the engines. In IPython, these operations are called :meth:`push` (sending an object to the -engines) and :meth:`pull` (getting an object from the engines). +engines) and :meth:`pull` (getting an object from the engines). Basic push and pull ------------------- @@ -521,23 +469,19 @@ Here are some examples of how you use :meth:`push` and :meth:`pull`: .. sourcecode:: ipython - In [38]: rc.push(dict(a=1.03234,b=3453)) + In [38]: dview.push(dict(a=1.03234,b=3453)) Out[38]: [None,None,None,None] - In [39]: rc.pull('a') + In [39]: dview.pull('a') Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] - In [40]: rc.pull('b',targets=0) + In [40]: rc[0].pull('b') Out[40]: 3453 - In [41]: rc.pull(('a','b')) + In [41]: dview.pull(('a','b')) Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ] - # zmq client does not have zip_pull - In [42]: rc.zip_pull(('a','b')) - Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)] - - In [43]: rc.push(dict(c='speed')) + In [43]: dview.push(dict(c='speed')) Out[43]: [None,None,None,None] In non-blocking mode :meth:`push` and :meth:`pull` also return @@ -545,9 +489,7 @@ In non-blocking mode :meth:`push` and :meth:`pull` also return .. sourcecode:: ipython - In [47]: rc.block=False - - In [48]: ar = rc.pull('a') + In [48]: ar = dview.pull('a', block=False) In [49]: ar.get() Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] @@ -563,8 +505,6 @@ appear as a local dictionary. Underneath, these methods call :meth:`apply`: .. sourcecode:: ipython - In [50]: dview.block=True - In [51]: dview['a']=['foo','bar'] In [52]: dview['a'] @@ -606,7 +546,7 @@ basic effect using :meth:`scatter` and :meth:`gather`: In [66]: dview.scatter('x',range(64)) - In [67]: px y = [i**10 for i in x] + In [67]: %px y = [i**10 for i in x] Parallel execution on engines: [0, 1, 2, 3] Out[67]: @@ -633,31 +573,47 @@ more other types of exceptions. Here is how it works: In [77]: dview.execute('1/0') --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) - /Users/minrk/ in () - ----> 1 dview.execute('1/0') - - ... - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout) - 1012 raise ValueError(msg) - 1013 else: - -> 1014 return self._apply_direct(f, args, kwargs, **options) - 1015 - 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets) - 1100 if block: - 1101 try: - -> 1102 return ar.get() - 1103 except KeyboardInterrupt: - 1104 return ar + /home/you/ in () + ----> 1 dview.execute('1/0', block=True) + + /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block) + 460 default: self.block + 461 """ + --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block) + 463 + 464 def run(self, filename, block=None): + + /home/you/ in apply_with_flags(self, f, args, kwargs, block, track) + + /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs) + 46 def sync_results(f, self, *args, **kwargs): + 47 """sync relevant results from self.client to our results attribute.""" + ---> 48 ret = f(self, *args, **kwargs) + 49 delta = self.outstanding.difference(self.client.outstanding) + 50 completed = self.outstanding.intersection(delta) + + /home/you/ in apply_with_flags(self, f, args, kwargs, block, track) + + /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs) + 35 n_previous = len(self.client.history) + 36 try: + ---> 37 ret = f(self, *args, **kwargs) + 38 finally: + 39 nmsgs = len(self.client.history) - n_previous + + /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track) + 398 if block: + 399 try: + --> 400 return ar.get() + 401 except KeyboardInterrupt: + 402 pass - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) - 78 return self._result - 79 else: - ---> 80 raise self._exception - 81 else: - 82 raise error.TimeoutError("Result not ready.") + /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) + 87 return self._result + 88 else: + ---> 89 raise self._exception + 90 else: + 91 raise error.TimeoutError("Result not ready.") CompositeError: one or more exceptions from call to method: _execute [0:apply]: ZeroDivisionError: integer division or modulo by zero @@ -665,6 +621,7 @@ more other types of exceptions. Here is how it works: [2:apply]: ZeroDivisionError: integer division or modulo by zero [3:apply]: ZeroDivisionError: integer division or modulo by zero + Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions: @@ -672,7 +629,7 @@ If you want, you can even raise one of these original exceptions: .. sourcecode:: ipython In [80]: try: - ....: rc.execute('1/0') + ....: dview.execute('1/0') ....: except client.CompositeError, e: ....: e.raise_exception() ....: @@ -697,57 +654,50 @@ instance: .. sourcecode:: ipython - In [81]: rc.execute('1/0') + In [81]: dview.execute('1/0') --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) - /Users/minrk/ in () - ----> 1 rc.execute('1/0') - - /Users/minrk/ in execute(self, code, targets, block) - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs) - 88 self.block = block - 89 try: - ---> 90 ret = f(self, *args, **kwargs) - 91 finally: - 92 self.block = saveblock - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in execute(self, code, targets, block) - 855 default: self.block - 856 """ - --> 857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) - 858 if not block: - 859 return result - - /Users/minrk/ in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout) - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs) - 88 self.block = block - 89 try: - ---> 90 ret = f(self, *args, **kwargs) - 91 finally: - 92 self.block = saveblock - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout) - 1012 raise ValueError(msg) - 1013 else: - -> 1014 return self._apply_direct(f, args, kwargs, **options) - 1015 - 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, - - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets) - 1100 if block: - 1101 try: - -> 1102 return ar.get() - 1103 except KeyboardInterrupt: - 1104 return ar + /home/you/ in () + ----> 1 dview.execute('1/0', block=True) + + /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block) + 460 default: self.block + 461 """ + --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block) + 463 + 464 def run(self, filename, block=None): + + /home/you/ in apply_with_flags(self, f, args, kwargs, block, track) + + /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs) + 46 def sync_results(f, self, *args, **kwargs): + 47 """sync relevant results from self.client to our results attribute.""" + ---> 48 ret = f(self, *args, **kwargs) + 49 delta = self.outstanding.difference(self.client.outstanding) + 50 completed = self.outstanding.intersection(delta) + + /home/you/ in apply_with_flags(self, f, args, kwargs, block, track) + + /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs) + 35 n_previous = len(self.client.history) + 36 try: + ---> 37 ret = f(self, *args, **kwargs) + 38 finally: + 39 nmsgs = len(self.client.history) - n_previous + + /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track) + 398 if block: + 399 try: + --> 400 return ar.get() + 401 except KeyboardInterrupt: + 402 pass - /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) - 78 return self._result - 79 else: - ---> 80 raise self._exception - 81 else: - 82 raise error.TimeoutError("Result not ready.") + /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) + 87 return self._result + 88 else: + ---> 89 raise self._exception + 90 else: + 91 raise error.TimeoutError("Result not ready.") CompositeError: one or more exceptions from call to method: _execute [0:apply]: ZeroDivisionError: integer division or modulo by zero @@ -815,14 +765,18 @@ instance: ZeroDivisionError: integer division or modulo by zero +.. note:: + + TODO: The above tracebacks are not up to date + All of this same error handling magic even works in non-blocking mode: .. sourcecode:: ipython - In [83]: rc.block=False + In [83]: dview.block=False - In [84]: ar = rc.execute('1/0') + In [84]: ar = dview.execute('1/0') In [85]: ar.get() --------------------------------------------------------------------------- diff --git a/docs/source/parallelz/parallel_task.txt b/docs/source/parallelz/parallel_task.txt index f2e7aa1..b73056a 100644 --- a/docs/source/parallelz/parallel_task.txt +++ b/docs/source/parallelz/parallel_task.txt @@ -1,4 +1,4 @@ -.. _paralleltask: +.. _parallel_task: ========================== The IPython task interface @@ -54,11 +54,12 @@ argument to the constructor: # or to connect with a specific profile you have set up: In [3]: rc = client.Client(profile='mpi') -For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can be constructed via the client's :meth:`view` method: +For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can +be constructed via the client's :meth:`load_balanced_view` method: .. sourcecode:: ipython - In [4]: lview = rc.view() # default load-balanced view + In [4]: lview = rc.load_balanced_view() # default load-balanced view .. seealso:: @@ -110,6 +111,8 @@ that turns any Python function into a parallel function: In [11]: f.map(range(32)) # this is done in parallel Out[11]: [0.0,10.0,160.0,...] +.. _parallel_dependencies: + Dependencies ============ @@ -230,12 +233,18 @@ any|all only after *all* of them have finished. This is set by a Dependency's :attr:`all` boolean attribute, which defaults to ``True``. -success_only - Whether to consider only tasks that did not raise an error as being fulfilled. - Sometimes you want to run a task after another, but only if that task succeeded. In - this case, ``success_only`` should be ``True``. However sometimes you may not care - whether the task succeeds, and always want the second task to run, in which case - you should use `success_only=False`. The default behavior is to only use successes. +success [default: True] + Whether to consider tasks that succeeded as fulfilling dependencies. + +failure [default : False] + Whether to consider tasks that failed as fulfilling dependencies. + using `failure=True,success=False` is useful for setting up cleanup tasks, to be run + only when tasks have failed. + +Sometimes you want to run a task after another, but only if that task succeeded. In this case, +``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may +not care whether the task succeeds, and always want the second task to run, in which case you +should use `success=failure=True`. The default behavior is to only use successes. There are other switches for interpretation that are made at the *task* level. These are specified via keyword arguments to the client's :meth:`apply` method. @@ -258,7 +267,7 @@ timeout Dependencies only work within the task scheduler. You cannot instruct a load-balanced task to run after a job submitted via the MUX interface. -The simplest form of Dependencies is with `all=True,success_only=True`. In these cases, +The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases, you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the `follow` and `after` keywords to :meth:`client.apply`: @@ -266,13 +275,13 @@ you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje In [14]: client.block=False - In [15]: ar = client.apply(f, args, kwargs, balanced=True) + In [15]: ar = lview.apply(f, args, kwargs) - In [16]: ar2 = client.apply(f2, balanced=True) + In [16]: ar2 = lview.apply(f2) - In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True) + In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2]) - In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True) + In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5) .. seealso:: @@ -297,8 +306,8 @@ The basic cases that are checked: * depending on nonexistent messages * `follow` dependencies were run on more than one machine and `all=True` -* any dependencies failed and `all=True,success_only=True` -* all dependencies failed and `all=False,success_only=True` +* any dependencies failed and `all=True,success=True,failures=False` +* all dependencies failed and `all=False,success=True,failure=False` .. warning:: @@ -386,27 +395,25 @@ Disabled features when using the ZMQ Scheduler: More details ============ -The :class:`Client` has many more powerful features that allow quite a bit +The :class:`LoadBalancedView` has many more powerful features that allow quite a bit of flexibility in how tasks are defined and run. The next places to look are in the following classes: -* :class:`IPython.zmq.parallel.client.Client` +* :class:`IPython.zmq.parallel.view.LoadBalancedView` * :class:`IPython.zmq.parallel.client.AsyncResult` -* :meth:`IPython.zmq.parallel.client.Client.apply` +* :meth:`IPython.zmq.parallel.view.LoadBalancedView.apply` * :mod:`IPython.zmq.parallel.dependency` The following is an overview of how to use these classes together: -1. Create a :class:`Client`. +1. Create a :class:`Client` and :class:`LoadBalancedView` 2. Define some functions to be run as tasks 3. Submit your tasks to using the :meth:`apply` method of your - :class:`Client` instance, specifying `balanced=True`. This signals - the :class:`Client` to entrust the Scheduler with assigning tasks to engines. -4. Use :meth:`Client.get_results` to get the results of the + :class:`LoadBalancedView` instance. +4. Use :meth:`Client.get_result` to get the results of the tasks, or use the :meth:`AsyncResult.get` method of the results to wait for and then receive the results. - .. seealso:: A demo of :ref:`DAG Dependencies ` with NetworkX and IPython.