From 8d078bccd24ac1fdbc629ad56b518726cc7a1fc3 2011-04-08 00:38:16
From: MinRK <benjaminrk@gmail.com>
Date: 2011-04-08 00:38:16
Subject: [PATCH] updated newparallel examples, moved into docs

---

diff --git a/examples/demo/dag/dagdeps.py b/docs/examples/newparallel/demo/dag/dagdeps.py
similarity index 88%
rename from examples/demo/dag/dagdeps.py
rename to docs/examples/newparallel/demo/dag/dagdeps.py
index 8291962..c3b6837 100644
--- a/examples/demo/dag/dagdeps.py
+++ b/docs/examples/newparallel/demo/dag/dagdeps.py
@@ -61,13 +61,13 @@ def submit_jobs(client, G, jobs):
         results[node] = client.apply(jobs[node], after=deps)
     return results
 
-def validate_tree(G, times):
+def validate_tree(G, results):
     """Validate that jobs executed after their dependencies."""
     for node in G:
-        t = times[node]
+        started = results[node].metadata.started
         for parent in G.predecessors(node):
-            pt = times[parent]
-            assert t > pt, "%s should have happened after %s"%(node, parent)
+            finished = results[parent].metadata.completed
+            assert started > finished, "%s should have happened after %s"%(node, parent)
 
 def main(nodes, edges):
     """Generate a random graph, submit jobs, then validate that the
@@ -76,28 +76,28 @@ def main(nodes, edges):
     in-degree on the y (just for spread).  All arrows must
     point at least slightly to the right if the graph is valid.
     """
+    from matplotlib.dates import date2num
     print "building DAG"
     G = random_dag(nodes, edges)
     jobs = {}
-    msg_ids = {}
-    times = {}
     pos = {}
     for node in G:
         jobs[node] = randomwait
     
-    client = cmod.Client('tcp://127.0.0.1:10101')
+    client = cmod.Client()
     print "submitting tasks"
     results = submit_jobs(client, G, jobs)
     print "waiting for results"
     client.barrier()
     print "done"
     for node in G:
-        times[node] = results[node].get()
-        pos[node] = (times[node], G.in_degree(node)+random())
+        # times[node] = results[node].get()
+        t = date2num(results[node].metadata.started)
+        pos[node] = (t, G.in_degree(node)+random())
     
-    validate_tree(G, times)
+    validate_tree(G, results)
     nx.draw(G, pos)
-    return G,times,msg_ids
+    return G,results
 
 if __name__ == '__main__':
     import pylab
diff --git a/docs/examples/newparallel/demo/dependencies.py b/docs/examples/newparallel/demo/dependencies.py
new file mode 100644
index 0000000..492690d
--- /dev/null
+++ b/docs/examples/newparallel/demo/dependencies.py
@@ -0,0 +1,118 @@
+from IPython.zmq.parallel import error
+from IPython.zmq.parallel.dependency import Dependency
+from IPython.zmq.parallel.client import *
+
+client = Client()
+
+# this will only run on machines that can import numpy:
+@require('numpy')
+def norm(A):
+    from numpy.linalg import norm
+    return norm(A,2)
+
+def checkpid(pid):
+    """return the pid of the engine"""
+    import os
+    return os.getpid() == pid
+
+def checkhostname(host):
+    import socket
+    return socket.gethostname() == host
+
+def getpid():
+    import os
+    return os.getpid()
+
+pid0 = client[0].apply_sync(getpid)
+
+# this will depend on the pid being that of target 0:
+@depend(checkpid, pid0)
+def getpid2():
+    import os
+    return os.getpid()
+
+view = client[None]
+view.block=True
+
+# will run on anything:
+pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
+print pids1
+# will only run on e0:
+pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
+print pids2
+
+print "now test some dependency behaviors"
+
+def wait(t):
+    import time
+    time.sleep(t)
+    return t
+
+# fail after some time:
+def wait_and_fail(t):
+    import time
+    time.sleep(t)
+    return 1/0
+
+successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
+failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
+
+mixed = [failures[0],successes[0]]
+d1a = Dependency(mixed, mode='any', success_only=False) # yes
+d1b = Dependency(mixed, mode='any', success_only=True) # yes
+d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
+d2b = Dependency(mixed, mode='all', success_only=True) # no
+d3 = Dependency(failures, mode='any', success_only=True) # no
+d4 = Dependency(failures, mode='any', success_only=False) # yes
+d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
+d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
+
+client.block = False
+
+r1a = client.apply(getpid, after=d1a)
+r1b = client.apply(getpid, follow=d1b)
+r2a = client.apply(getpid, after=d2b, follow=d2a)
+r2b = client.apply(getpid, after=d2a, follow=d2b)
+r3 = client.apply(getpid, after=d3)
+r4a = client.apply(getpid, after=d4)
+r4b = client.apply(getpid, follow=d4)
+r4c = client.apply(getpid, after=d3, follow=d4)
+r5 = client.apply(getpid, after=d5)
+r5b = client.apply(getpid, follow=d5, after=d3)
+r6 = client.apply(getpid, follow=d6)
+r6b = client.apply(getpid, after=d6, follow=d2b)
+
+def should_fail(f):
+    try:
+        f()
+    except error.KernelError:
+        pass
+    else:
+        print 'should have raised'
+        # raise Exception("should have raised")
+
+# print r1a.msg_ids
+r1a.get()
+# print r1b.msg_ids
+r1b.get()
+# print r2a.msg_ids
+should_fail(r2a.get)
+# print r2b.msg_ids
+should_fail(r2b.get)
+# print r3.msg_ids
+should_fail(r3.get)
+# print r4a.msg_ids
+r4a.get()
+# print r4b.msg_ids
+r4b.get()
+# print r4c.msg_ids
+should_fail(r4c.get)
+# print r5.msg_ids
+r5.get()
+# print r5b.msg_ids
+should_fail(r5b.get)
+# print r6.msg_ids
+should_fail(r6.get) # assuming > 1 engine
+# print r6b.msg_ids
+should_fail(r6b.get)
+print 'done'
diff --git a/docs/examples/newparallel/demo/map.py b/docs/examples/newparallel/demo/map.py
new file mode 100644
index 0000000..ccaa070
--- /dev/null
+++ b/docs/examples/newparallel/demo/map.py
@@ -0,0 +1,36 @@
+from IPython.zmq.parallel.client import *
+
+client = Client()
+
+@remote(client, block=True)
+def square(a):
+    """return square of a number"""
+    return a*a
+
+squares = map(square, range(42))
+
+# but that blocked between each result; not exactly useful
+
+square.block = False
+
+arlist = map(square, range(42))
+# submitted very fast
+
+# wait for the results:
+squares2 = [ r.get() for r in arlist ]
+
+# now the more convenient @parallel decorator, which has a map method:
+
+@parallel(client, block=False)
+def psquare(a):
+    """return square of a number"""
+    return a*a
+
+# this chunks the data into n-negines jobs, not 42 jobs:
+ar = psquare.map(range(42))
+
+# wait for the results to be done:
+squares3 = ar.get()
+
+print squares == squares2, squares3==squares
+# True
\ No newline at end of file
diff --git a/examples/demo/noncopying.py b/docs/examples/newparallel/demo/noncopying.py
similarity index 100%
rename from examples/demo/noncopying.py
rename to docs/examples/newparallel/demo/noncopying.py
diff --git a/examples/demo/remotefunction.py b/docs/examples/newparallel/demo/remotefunction.py
similarity index 94%
rename from examples/demo/remotefunction.py
rename to docs/examples/newparallel/demo/remotefunction.py
index 059ca55..84a360f 100644
--- a/examples/demo/remotefunction.py
+++ b/docs/examples/newparallel/demo/remotefunction.py
@@ -1,6 +1,6 @@
 from IPython.zmq.parallel.client import *
 
-client = Client('tcp://127.0.0.1:10101')
+client = Client()
 
 @remote(client, bound=True)
 def getkey(name):
diff --git a/examples/demo/throughput.py b/docs/examples/newparallel/demo/throughput.py
similarity index 97%
rename from examples/demo/throughput.py
rename to docs/examples/newparallel/demo/throughput.py
index 787284d..57a85f3 100644
--- a/examples/demo/throughput.py
+++ b/docs/examples/newparallel/demo/throughput.py
@@ -14,7 +14,7 @@ def echo(s=''):
     return s
 
 def time_throughput(nmessages, t=0, f=wait):
-    client = clientmod.Client('tcp://127.0.0.1:10101')
+    client = clientmod.Client()
     view = client[None]
     # do one ping before starting timing
     if f is echo:
@@ -83,7 +83,4 @@ def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
         A[i] = n/A[i]
         print t,A[i]
     return A
-
-def start_cluster(n, scheduler):
-    pass
     
\ No newline at end of file
diff --git a/examples/demo/views.py b/docs/examples/newparallel/demo/views.py
similarity index 88%
rename from examples/demo/views.py
rename to docs/examples/newparallel/demo/views.py
index ada4504..f5bb4c1 100644
--- a/examples/demo/views.py
+++ b/docs/examples/newparallel/demo/views.py
@@ -1,6 +1,6 @@
 from IPython.zmq.parallel.client import *
 
-client = Client('tcp://127.0.0.1:10101')
+client = Client()
 
 for id in client.ids:
     client.push(dict(ids=id*id), targets=id)
diff --git a/examples/workflow/client.py b/docs/examples/newparallel/workflow/client.py
similarity index 100%
rename from examples/workflow/client.py
rename to docs/examples/newparallel/workflow/client.py
diff --git a/examples/workflow/job_wrapper.py b/docs/examples/newparallel/workflow/job_wrapper.py
similarity index 77%
rename from examples/workflow/job_wrapper.py
rename to docs/examples/newparallel/workflow/job_wrapper.py
index 55448ca..933611d 100755
--- a/examples/workflow/job_wrapper.py
+++ b/docs/examples/newparallel/workflow/job_wrapper.py
@@ -10,12 +10,16 @@ import sys
 
 argv = sys.argv
 
-from IPython.zmq.parallel.engine import main
+from IPython.zmq.parallel.engine import EngineFactory
+from IPython.zmq.parallel.ipengineapp import launch_new_instance
 
 ns = {}
 
 # job
 exec sys.argv[1] in ns
 
+# this should really use Config:
+EngineFactory.user_ns = ns
+
 # start engine with job namespace
-main([], user_ns=ns)
+launch_new_instance()
diff --git a/examples/workflow/wmanager.py b/docs/examples/newparallel/workflow/wmanager.py
similarity index 96%
rename from examples/workflow/wmanager.py
rename to docs/examples/newparallel/workflow/wmanager.py
index a6ce80c..6565920 100644
--- a/examples/workflow/wmanager.py
+++ b/docs/examples/newparallel/workflow/wmanager.py
@@ -32,7 +32,7 @@ def cleanup(controller, engines):
 if __name__ == '__main__':
 
     # Start controller in separate process
-    cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
+    cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
     print('Started controller')
 
     # "Submit jobs"
diff --git a/examples/demo/dependencies.py b/examples/demo/dependencies.py
deleted file mode 100644
index a8fcaaf..0000000
--- a/examples/demo/dependencies.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from IPython.zmq.parallel.client import *
-
-client = Client('tcp://127.0.0.1:10101')
-
-@require('numpy')
-def norm(A):
-    from numpy.linalg import norm
-    return norm(A,2)
-
-def checkpid(pid):
-    import os
-    return os.getpid() == pid
-
-def checkhostname(host):
-    import socket
-    return socket.gethostname() == host
-
-def getpid():
-    import os
-    return os.getpid()
-
-pid0 = client.apply(getpid, targets=0, block=True)
-
-@depend(checkpid, pid0)
-def getpid2():
-    import os
-    return os.getpid()
-
-rns = client[None]
-rns.block=True
-
-pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ]
-pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ]
-print pids1
-print pids2
diff --git a/examples/demo/map.py b/examples/demo/map.py
deleted file mode 100644
index 2a9c09a..0000000
--- a/examples/demo/map.py
+++ /dev/null
@@ -1,20 +0,0 @@
-from IPython.zmq.parallel.client import *
-
-client = Client('tcp://127.0.0.1:10101')
-
-@remote(client, block=True)
-def square(a):
-    """return square of a number"""
-    return a*a
-
-squares = map(square, range(42))
-
-# but that blocked between each result, not exactly useful
-square.block=False
-msg_ids = map(square, range(42))
-# submitted very fast
-# wait for them to be done:
-client.barrier(msg_id)
-squares2 = map(client.results.get, msg_ids)
-print squares == squares2
-# True
\ No newline at end of file
diff --git a/examples/zmqontroller/config.py b/examples/zmqontroller/config.py
deleted file mode 100644
index 1e1a1e2..0000000
--- a/examples/zmqontroller/config.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""setup the ports"""
-config = {
-    'interface': 'tcp://127.0.0.1',
-    'regport': 10101,
-    'heartport': 10102,
-    
-    'cqueueport': 10211,
-    'equeueport': 10111,
-
-    'ctaskport': 10221,
-    'etaskport': 10121,
-    
-    'ccontrolport': 10231,
-    'econtrolport': 10131,
-    
-    'clientport': 10201,
-    'notifierport': 10202,
-    
-    'logport': 20202
-}
-
-
-
diff --git a/examples/zmqontroller/controller.py b/examples/zmqontroller/controller.py
deleted file mode 100644
index f890d46..0000000
--- a/examples/zmqontroller/controller.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-"""A script to launch a controller with all its queues and connect it to a logger"""
-
-import time
-import logging
-
-import zmq
-from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
-from zmq.eventloop import ioloop
-from zmq.eventloop.zmqstream import ZMQStream
-from zmq.log import handlers
-
-from IPython.zmq import log
-from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
-
-  
-  
-
-def setup():
-    """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
-    ctx = zmq.Context()
-    loop = ioloop.IOLoop.instance()
-    
-    # port config
-    # config={}
-    execfile('config.py', globals())
-    iface = config['interface']
-    logport = config['logport']
-    rport = config['regport']
-    cport = config['clientport']
-    cqport = config['cqueueport']
-    eqport = config['equeueport']
-    ctport = config['ctaskport']
-    etport = config['etaskport']
-    ccport = config['ccontrolport']
-    ecport = config['econtrolport']
-    hport = config['heartport']
-    nport = config['notifierport']
-    
-    # setup logging
-    lsock = ctx.socket(zmq.PUB)
-    lsock.connect('%s:%i'%(iface,logport))
-    # connected=False
-    # while not connected:
-    #     try:
-    #     except:
-    #         logport = logport + 1
-    #     else:
-    #         connected=True
-    #         
-    handler = handlers.PUBHandler(lsock)
-    handler.setLevel(logging.DEBUG)
-    handler.root_topic = "controller"
-    log.logger.addHandler(handler)
-    time.sleep(.5)
-    
-    ### Engine connections ###
-    
-    # Engine registrar socket
-    reg = ZMQStream(ctx.socket(zmq.XREP), loop)
-    reg.bind("%s:%i"%(iface, rport))
-    
-    # heartbeat
-    hpub = ctx.socket(zmq.PUB)
-    hpub.bind("%s:%i"%(iface, hport))
-    hrep = ctx.socket(zmq.XREP)
-    hrep.bind("%s:%i"%(iface, hport+1))
-    
-    hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
-    hb.start()
-    
-    ### Client connections ###
-    # Clientele socket
-    c = ZMQStream(ctx.socket(zmq.XREP), loop)
-    c.bind("%s:%i"%(iface, cport))
-    
-    n = ZMQStream(ctx.socket(zmq.PUB), loop)
-    n.bind("%s:%i"%(iface, nport))
-    
-    thesession = session.StreamSession(username="controller")
-    
-    
-    
-    # build and launch the queue
-    sub = ctx.socket(zmq.SUB)
-    sub.setsockopt(zmq.SUBSCRIBE, "")
-    monport = sub.bind_to_random_port(iface)
-    sub = ZMQStream(sub, loop)
-    
-    # Multiplexer Queue (in a Process)
-    q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
-    q.bind_in("%s:%i"%(iface, cqport))
-    q.bind_out("%s:%i"%(iface, eqport))
-    q.connect_mon("%s:%i"%(iface, monport))
-    q.daemon=True
-    q.start()
-    
-    # Control Queue (in a Process)
-    q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
-    q.bind_in("%s:%i"%(iface, ccport))
-    q.bind_out("%s:%i"%(iface, ecport))
-    q.connect_mon("%s:%i"%(iface, monport))
-    q.daemon=True
-    q.start()
-    
-    # Task Queue (in a Process)
-    q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
-    q.bind_in("%s:%i"%(iface, ctport))
-    q.bind_out("%s:%i"%(iface, etport))
-    q.connect_mon("%s:%i"%(iface, monport))
-    q.daemon=True
-    q.start()
-    
-    time.sleep(.25)
-    
-    # build connection dicts
-    engine_addrs = {
-        'control' : "%s:%i"%(iface, ecport),
-        'queue': "%s:%i"%(iface, eqport),
-        'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
-        'task' : "%s:%i"%(iface, etport),
-        'monitor' : "%s:%i"%(iface, monport),
-        }
-    
-    client_addrs = {
-        'control' : "%s:%i"%(iface, ccport),
-        'query': "%s:%i"%(iface, cport),
-        'queue': "%s:%i"%(iface, cqport),
-        'task' : "%s:%i"%(iface, ctport),
-        'notification': "%s:%i"%(iface, nport)
-        }
-    con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
-    
-    return loop
-    
-
-if __name__ == '__main__':
-    loop = setup()
-    loop.start()
\ No newline at end of file
diff --git a/examples/zmqontroller/floodclient.py b/examples/zmqontroller/floodclient.py
deleted file mode 100644
index b7dd578..0000000
--- a/examples/zmqontroller/floodclient.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python
-import time
-import zmq
-from zmq.eventloop import ioloop
-from zmq.eventloop.zmqstream import ZMQStream
-from IPython.zmq import streamsession as session
-Message = session.Message
-# from IPython.zmq.messages import send_message_pickle as send_message
-import uuid
-
-thesession = session.StreamSession()
-
-max_messages=10000
-printstep=1000
-
-counter = dict(count=0, engines=1)
-
-def poit(msg):
-    print "POIT"
-    print msg
-
-def count(msg):
-    count = counter["count"] = counter["count"]+1
-    if not count % printstep:
-        print "#########################"
-        print count, time.time()-counter['tic']
-
-def unpack_and_print(msg):
-    global msg_counter
-    msg_counter += 1
-    print msg
-    try:
-        msg = thesession.unpack_message(msg[-3:])
-    except Exception, e:
-        print e
-        # pass
-    print msg
-
-
-ctx = zmq.Context()
-
-loop = ioloop.IOLoop()
-sock = ctx.socket(zmq.XREQ)
-queue = ZMQStream(ctx.socket(zmq.XREQ), loop)
-client = ZMQStream(sock, loop)
-client.on_send(poit)
-def check_engines(msg):
-    # client.on_recv(unpack_and_print)
-    queue.on_recv(count)
-    idents = msg[:-3]
-    msg = thesession.unpack_message(msg[-3:])
-    msg = Message(msg)
-    print msg
-    queue.connect(str(msg.content.queue))
-    engines = dict(msg.content.engines)
-    # global tic
-    N=max_messages
-    if engines:
-        tic = time.time()
-        counter['tic']= tic
-        for i in xrange(N/len(engines)):
-          for eid,key in engines.iteritems():
-            thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key))
-        toc = time.time()
-        print "#####################################"
-        print N, toc-tic
-        print "#####################################"
-    
-    
-    
-
-client.on_recv(check_engines)
-
-sock.connect('tcp://127.0.0.1:10102')
-sock.setsockopt(zmq.IDENTITY, thesession.username)
-# stream = ZMQStream()
-# header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0)
-parent = dict(targets=2)
-# content = "GARBAGE"
-thesession.send(client, "connection_request")
-
-# send_message(client, (header, content))
-# print thesession.recv(client, 0)
-
-loop.start()
diff --git a/examples/zmqontroller/logwatcher.py b/examples/zmqontroller/logwatcher.py
deleted file mode 100644
index 7887818..0000000
--- a/examples/zmqontroller/logwatcher.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/env python
-"""A simple log process that prints messages incoming from"""
-
-#
-#    Copyright (c) 2010 Min Ragan-Kelley
-#
-#    This file is part of pyzmq.
-#
-#    pyzmq is free software; you can redistribute it and/or modify it under
-#    the terms of the Lesser GNU General Public License as published by
-#    the Free Software Foundation; either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    pyzmq is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    Lesser GNU General Public License for more details.
-#
-#    You should have received a copy of the Lesser GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-import sys
-import zmq
-logport = 20202
-def main(topics, addrs):
-    
-    context = zmq.Context()
-    socket = context.socket(zmq.SUB)
-    for topic in topics:
-        print "Subscribing to: %r"%topic
-        socket.setsockopt(zmq.SUBSCRIBE, topic)
-    if addrs:
-        for addr in addrs:
-            print "Connecting to: ", addr
-            socket.connect(addr)
-    else:
-        socket.bind('tcp://*:%i'%logport)
-
-    while True:
-        # topic = socket.recv()
-        # print topic
-        # print 'tic'
-        raw = socket.recv_multipart()
-        if len(raw) != 2:
-            print "!!! invalid log message: %s"%raw
-        else:
-            topic, msg = raw
-            # don't newline, since log messages always newline:
-            print "%s | %s" % (topic, msg),
-            sys.stdout.flush()
-
-if __name__ == '__main__':
-    import sys
-    topics = []
-    addrs = []
-    for arg in sys.argv[1:]:
-        if '://' in arg:
-            addrs.append(arg)
-        else:
-            topics.append(arg)
-    if not topics:
-        # default to everything
-        topics = ['']
-    if len(addrs) < 1:
-        print "binding instead of connecting"
-        # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)]
-    #     print "usage: display.py <address> [ <topic> <address>...]"
-        # raise SystemExit
-    
-    main(topics, addrs)