From b78cec8e3c84dd2812615c2e6e52a9305b8d3ad6 2011-04-08 00:38:10 From: MinRK Date: 2011-04-08 00:38:10 Subject: [PATCH] added py4science demos as examples + NetworkX DAG dependencies --- diff --git a/examples/demo/dagdeps.py b/examples/demo/dagdeps.py new file mode 100644 index 0000000..4988b6b --- /dev/null +++ b/examples/demo/dagdeps.py @@ -0,0 +1,103 @@ +"""Example for generating an arbitrary DAG as a dependency map. + +This demo uses networkx to generate the graph. + +Authors +------- +* MinRK +""" +import networkx as nx +from random import randint, random +from IPython.zmq.parallel import client as cmod + +def randomwait(): + import time + from random import random + time.sleep(random()) + return time.time() + + +def random_dag(nodes, edges): + """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" + G = nx.DiGraph() + for i in range(nodes): + G.add_node(i) + while edges > 0: + a = randint(0,nodes-1) + b=a + while b==a: + b = randint(0,nodes-1) + G.add_edge(a,b) + if nx.is_directed_acyclic_graph(G): + edges -= 1 + else: + # we closed a loop! + G.remove_edge(a,b) + return G + +def add_children(G, parent, level, n=2): + """Add children recursively to a binary tree.""" + if level == 0: + return + for i in range(n): + child = parent+str(i) + G.add_node(child) + G.add_edge(parent,child) + add_children(G, child, level-1, n) + +def make_bintree(levels): + """Make a symmetrical binary tree with @levels""" + G = nx.DiGraph() + root = '0' + G.add_node(root) + add_children(G, root, levels, 2) + return G + +def submit_jobs(client, G, jobs): + """Submit jobs via client where G describes the time dependencies.""" + msg_ids = {} + for node in nx.topological_sort(G): + deps = [ msg_ids[n] for n in G.predecessors(node) ] + msg_ids[node] = client.apply(jobs[node], after=deps) + return msg_ids + +def validate_tree(G, times): + """Validate that jobs executed after their dependencies.""" + for node in G: + t = times[node] + for parent in G.predecessors(node): + pt = times[parent] + assert t > pt, "%s should have happened after %s"%(node, parent) + +def main(nodes, edges): + """Generate a random graph, submit jobs, then validate that the + dependency order was enforced. + Finally, plot the graph, with time on the x-axis, and + in-degree on the y (just for spread). All arrows must + point at least slightly to the right if the graph is valid. + """ + G = random_dag(nodes, edges) + jobs = {} + msg_ids = {} + times = {} + pos = {} + for node in G: + jobs[node] = randomwait + + client = cmod.Client('tcp://127.0.0.1:10101') + + msg_ids = submit_jobs(client, G, jobs) + client.barrier() + for node in G: + times[node] = client.results[msg_ids[node]] + pos[node] = (times[node], G.in_degree(node)+random()) + + validate_tree(G, times) + nx.draw(G, pos) + return G,times,msg_ids + +if __name__ == '__main__': + import pylab + main(32,128) + pylab.show() + \ No newline at end of file diff --git a/examples/demo/dependencies.py b/examples/demo/dependencies.py new file mode 100644 index 0000000..a8fcaaf --- /dev/null +++ b/examples/demo/dependencies.py @@ -0,0 +1,35 @@ +from IPython.zmq.parallel.client import * + +client = Client('tcp://127.0.0.1:10101') + +@require('numpy') +def norm(A): + from numpy.linalg import norm + return norm(A,2) + +def checkpid(pid): + import os + return os.getpid() == pid + +def checkhostname(host): + import socket + return socket.gethostname() == host + +def getpid(): + import os + return os.getpid() + +pid0 = client.apply(getpid, targets=0, block=True) + +@depend(checkpid, pid0) +def getpid2(): + import os + return os.getpid() + +rns = client[None] +rns.block=True + +pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ] +pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ] +print pids1 +print pids2 diff --git a/examples/demo/loadbalance.py b/examples/demo/loadbalance.py new file mode 100644 index 0000000..f87015e --- /dev/null +++ b/examples/demo/loadbalance.py @@ -0,0 +1,20 @@ +import time +from IPython.zmq.parallel.client import * + +def wait(t): + import time + time.sleep(t) + return t + +client = Client('tcp://127.0.0.1:10101') +view = client[None] + +tic = time.time() +for i in range(128): + view.apply(wait, 1e-2*i) + # limit to 1k msgs/s + time.sleep(1e-2) + +client.barrier() +toc = time.time() +print toc-tic diff --git a/examples/demo/map.py b/examples/demo/map.py new file mode 100644 index 0000000..2a9c09a --- /dev/null +++ b/examples/demo/map.py @@ -0,0 +1,20 @@ +from IPython.zmq.parallel.client import * + +client = Client('tcp://127.0.0.1:10101') + +@remote(client, block=True) +def square(a): + """return square of a number""" + return a*a + +squares = map(square, range(42)) + +# but that blocked between each result, not exactly useful +square.block=False +msg_ids = map(square, range(42)) +# submitted very fast +# wait for them to be done: +client.barrier(msg_id) +squares2 = map(client.results.get, msg_ids) +print squares == squares2 +# True \ No newline at end of file diff --git a/examples/demo/noncopying.py b/examples/demo/noncopying.py new file mode 100644 index 0000000..6fe7217 --- /dev/null +++ b/examples/demo/noncopying.py @@ -0,0 +1,44 @@ +"""non-copying sends""" +import zmq +import numpy + +n = 10 +iface = 'inproc://pub' + +ctx = zmq.Context() + +p = ctx.socket(zmq.PUB) +p.bind(iface) + +# connect 2 subs +s1 = ctx.socket(zmq.SUB) +s1.connect(iface) +s1.setsockopt(zmq.SUBSCRIBE, '') + +s2 = ctx.socket(zmq.SUB) +s2.connect(iface) +s2.setsockopt(zmq.SUBSCRIBE, '') + +A = numpy.random.random((1024,1024)) + +# send +p.send(A, copy=False) +# recv on 1 non-copy +msg1 = s1.recv(copy=False) +B1 = numpy.frombuffer(msg1.buffer, dtype=A.dtype).reshape(A.shape) +# recv on 2 copy +msg2 = s2.recv(copy=False) +B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape) + +print (B1==B2).all() +print (B1==A).all() +A[0][0] += 10 +print "~" +# after changing A in-place, B1 changes too, proving non-copying sends +print (B1==A).all() +# but B2 is fixed, since it called the msg.bytes attr, which copies +print (B1==B2).all() + + + + diff --git a/examples/demo/remotefunction.py b/examples/demo/remotefunction.py new file mode 100644 index 0000000..059ca55 --- /dev/null +++ b/examples/demo/remotefunction.py @@ -0,0 +1,22 @@ +from IPython.zmq.parallel.client import * + +client = Client('tcp://127.0.0.1:10101') + +@remote(client, bound=True) +def getkey(name): + """fetch something from globals""" + return globals().get(name) + +@remote(client, bound=True, targets='all') +def setpids(): + import os + globals()['pid'] = os.getpid() + +# set pid in the globals +setpids() +getkey('pid') +getkey.targets=[1,2] +getkey('pid') +getkey.bound=False +getkey('pid') is None + diff --git a/examples/demo/throughput.py b/examples/demo/throughput.py new file mode 100644 index 0000000..787284d --- /dev/null +++ b/examples/demo/throughput.py @@ -0,0 +1,89 @@ +import time +import numpy as np +from IPython.zmq.parallel import client as clientmod + +nlist = map(int, np.logspace(2,9,16,base=2)) +nlist2 = map(int, np.logspace(2,8,15,base=2)) +tlist = map(int, np.logspace(7,22,16,base=2)) +nt = 16 +def wait(t=0): + import time + time.sleep(t) + +def echo(s=''): + return s + +def time_throughput(nmessages, t=0, f=wait): + client = clientmod.Client('tcp://127.0.0.1:10101') + view = client[None] + # do one ping before starting timing + if f is echo: + t = np.random.random(t/8) + view.apply_sync(echo, '') + client.spin() + tic = time.time() + for i in xrange(nmessages): + view.apply(f, t) + lap = time.time() + client.barrier() + toc = time.time() + return lap-tic, toc-tic + +def time_twisted(nmessages, t=0, f=wait): + from IPython.kernel import client as kc + client = kc.TaskClient() + if f is wait: + s = "import time; time.sleep(%f)"%t + task = kc.StringTask(s) + elif f is echo: + t = np.random.random(t/8) + s = "s=t" + task = kc.StringTask(s, push=dict(t=t), pull=['s']) + else: + raise + # do one ping before starting timing + client.barrier(client.run(task)) + tic = time.time() + tids = [] + for i in xrange(nmessages): + tids.append(client.run(task)) + lap = time.time() + client.barrier(tids) + toc = time.time() + return lap-tic, toc-tic + +def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput): + A = np.zeros((len(nlist),2)) + for i,n in enumerate(nlist): + t1 = t2 = 0 + for _ in range(trials): + time.sleep(.25) + ts = runner(n,t,f) + t1 += ts[0] + t2 += ts[1] + t1 /= trials + t2 /= trials + A[i] = (t1,t2) + A[i] = n/A[i] + print n,A[i] + return A + +def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput): + A = np.zeros((len(tlist),2)) + for i,t in enumerate(tlist): + t1 = t2 = 0 + for _ in range(trials): + time.sleep(.25) + ts = runner(n,t,f) + t1 += ts[0] + t2 += ts[1] + t1 /= trials + t2 /= trials + A[i] = (t1,t2) + A[i] = n/A[i] + print t,A[i] + return A + +def start_cluster(n, scheduler): + pass + \ No newline at end of file diff --git a/examples/demo/views.py b/examples/demo/views.py new file mode 100644 index 0000000..ada4504 --- /dev/null +++ b/examples/demo/views.py @@ -0,0 +1,15 @@ +from IPython.zmq.parallel.client import * + +client = Client('tcp://127.0.0.1:10101') + +for id in client.ids: + client.push(dict(ids=id*id), targets=id) + +rns = client[0] +rns['a'] = 5 + +print rns['a'] + +remotes = client[:] + +print remotes['ids'] \ No newline at end of file