diff --git a/IPython/zmq/taskthread.py b/IPython/zmq/parallel/taskthread.py similarity index 98% rename from IPython/zmq/taskthread.py rename to IPython/zmq/parallel/taskthread.py index b1824ea..bddaa23 100644 --- a/IPython/zmq/taskthread.py +++ b/IPython/zmq/parallel/taskthread.py @@ -12,7 +12,7 @@ except: import zmq from zmq.core.poll import _poll as poll from zmq.devices import ThreadDevice -from IPython.zmq import streamsession as ss +from IPython.zmq.parallel import streamsession as ss class QueueStream(object): diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 7755c4f..3ece970 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -63,8 +63,6 @@ Message type: ``registration_reply``:: 'control' : 'tcp://...', # addr for control queue 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat 'task' : 'tcp://...', # addr for task queue, or None if no task queue running - # if error: - 'reason' : 'queue_id already registered' } Clients use the same socket as engines to start their connections. Connection requests @@ -172,8 +170,6 @@ Message type: ``result_reply``:: # values are the result messages 'pending' : ['msg_id','...'], # msg_ids still pending 'completed' : ['msg_id','...'], # list of completed msg_ids - # if error: - 'reason' : "explanation" } For memory management purposes, Clients can also instruct the controller to forget the @@ -199,9 +195,6 @@ Message type: ``purge_reply``:: content = { 'status' : 'ok', # or 'error' - - # if error: - 'reason' : "KeyError: no such msg_id 'whoda'" } :func:`apply` and :func:`apply_bound` @@ -210,7 +203,7 @@ Message type: ``purge_reply``:: The `Namespace `_ model suggests that execution be able to use the model:: - client.apply(f, *args, **kwargs) + ns.apply(f, *args, **kwargs) which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` on a remote engine, returning the result (or, for non-blocking, information facilitating @@ -220,8 +213,8 @@ as little data as we can. The `buffers` property of a Message was introduced for purpose. Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a -function signature and builds the correct buffer format for minimal data copying (exactly -zero copies of numpy array data). +function signature and builds a sendable buffer format for minimal data copying (exactly +zero copies of numpy array data or buffers or large strings). Message type: ``apply_request``:: @@ -234,6 +227,8 @@ Message type: ``apply_request``:: buffers = ['...'] # at least 3 in length # as built by build_apply_message(f,args,kwargs) +after/follow represent task dependencies + Message type: ``apply_reply``:: content = { diff --git a/examples/demo/dagdeps.py b/examples/demo/dag/dagdeps.py similarity index 100% rename from examples/demo/dagdeps.py rename to examples/demo/dag/dagdeps.py diff --git a/examples/demo/loadbalance.py b/examples/demo/loadbalance.py deleted file mode 100644 index f87015e..0000000 --- a/examples/demo/loadbalance.py +++ /dev/null @@ -1,20 +0,0 @@ -import time -from IPython.zmq.parallel.client import * - -def wait(t): - import time - time.sleep(t) - return t - -client = Client('tcp://127.0.0.1:10101') -view = client[None] - -tic = time.time() -for i in range(128): - view.apply(wait, 1e-2*i) - # limit to 1k msgs/s - time.sleep(1e-2) - -client.barrier() -toc = time.time() -print toc-tic