##// END OF EJS Templates
added py4science demos as examples + NetworkX DAG dependencies
MinRK -
Show More
@@ -0,0 +1,103 b''
1 """Example for generating an arbitrary DAG as a dependency map.
2
3 This demo uses networkx to generate the graph.
4
5 Authors
6 -------
7 * MinRK
8 """
9 import networkx as nx
10 from random import randint, random
11 from IPython.zmq.parallel import client as cmod
12
13 def randomwait():
14 import time
15 from random import random
16 time.sleep(random())
17 return time.time()
18
19
20 def random_dag(nodes, edges):
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
22 G = nx.DiGraph()
23 for i in range(nodes):
24 G.add_node(i)
25 while edges > 0:
26 a = randint(0,nodes-1)
27 b=a
28 while b==a:
29 b = randint(0,nodes-1)
30 G.add_edge(a,b)
31 if nx.is_directed_acyclic_graph(G):
32 edges -= 1
33 else:
34 # we closed a loop!
35 G.remove_edge(a,b)
36 return G
37
38 def add_children(G, parent, level, n=2):
39 """Add children recursively to a binary tree."""
40 if level == 0:
41 return
42 for i in range(n):
43 child = parent+str(i)
44 G.add_node(child)
45 G.add_edge(parent,child)
46 add_children(G, child, level-1, n)
47
48 def make_bintree(levels):
49 """Make a symmetrical binary tree with @levels"""
50 G = nx.DiGraph()
51 root = '0'
52 G.add_node(root)
53 add_children(G, root, levels, 2)
54 return G
55
56 def submit_jobs(client, G, jobs):
57 """Submit jobs via client where G describes the time dependencies."""
58 msg_ids = {}
59 for node in nx.topological_sort(G):
60 deps = [ msg_ids[n] for n in G.predecessors(node) ]
61 msg_ids[node] = client.apply(jobs[node], after=deps)
62 return msg_ids
63
64 def validate_tree(G, times):
65 """Validate that jobs executed after their dependencies."""
66 for node in G:
67 t = times[node]
68 for parent in G.predecessors(node):
69 pt = times[parent]
70 assert t > pt, "%s should have happened after %s"%(node, parent)
71
72 def main(nodes, edges):
73 """Generate a random graph, submit jobs, then validate that the
74 dependency order was enforced.
75 Finally, plot the graph, with time on the x-axis, and
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
78 """
79 G = random_dag(nodes, edges)
80 jobs = {}
81 msg_ids = {}
82 times = {}
83 pos = {}
84 for node in G:
85 jobs[node] = randomwait
86
87 client = cmod.Client('tcp://127.0.0.1:10101')
88
89 msg_ids = submit_jobs(client, G, jobs)
90 client.barrier()
91 for node in G:
92 times[node] = client.results[msg_ids[node]]
93 pos[node] = (times[node], G.in_degree(node)+random())
94
95 validate_tree(G, times)
96 nx.draw(G, pos)
97 return G,times,msg_ids
98
99 if __name__ == '__main__':
100 import pylab
101 main(32,128)
102 pylab.show()
103 No newline at end of file
@@ -0,0 +1,35 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client('tcp://127.0.0.1:10101')
4
5 @require('numpy')
6 def norm(A):
7 from numpy.linalg import norm
8 return norm(A,2)
9
10 def checkpid(pid):
11 import os
12 return os.getpid() == pid
13
14 def checkhostname(host):
15 import socket
16 return socket.gethostname() == host
17
18 def getpid():
19 import os
20 return os.getpid()
21
22 pid0 = client.apply(getpid, targets=0, block=True)
23
24 @depend(checkpid, pid0)
25 def getpid2():
26 import os
27 return os.getpid()
28
29 rns = client[None]
30 rns.block=True
31
32 pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ]
33 pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ]
34 print pids1
35 print pids2
@@ -0,0 +1,20 b''
1 import time
2 from IPython.zmq.parallel.client import *
3
4 def wait(t):
5 import time
6 time.sleep(t)
7 return t
8
9 client = Client('tcp://127.0.0.1:10101')
10 view = client[None]
11
12 tic = time.time()
13 for i in range(128):
14 view.apply(wait, 1e-2*i)
15 # limit to 1k msgs/s
16 time.sleep(1e-2)
17
18 client.barrier()
19 toc = time.time()
20 print toc-tic
@@ -0,0 +1,20 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client('tcp://127.0.0.1:10101')
4
5 @remote(client, block=True)
6 def square(a):
7 """return square of a number"""
8 return a*a
9
10 squares = map(square, range(42))
11
12 # but that blocked between each result, not exactly useful
13 square.block=False
14 msg_ids = map(square, range(42))
15 # submitted very fast
16 # wait for them to be done:
17 client.barrier(msg_id)
18 squares2 = map(client.results.get, msg_ids)
19 print squares == squares2
20 # True No newline at end of file
@@ -0,0 +1,44 b''
1 """non-copying sends"""
2 import zmq
3 import numpy
4
5 n = 10
6 iface = 'inproc://pub'
7
8 ctx = zmq.Context()
9
10 p = ctx.socket(zmq.PUB)
11 p.bind(iface)
12
13 # connect 2 subs
14 s1 = ctx.socket(zmq.SUB)
15 s1.connect(iface)
16 s1.setsockopt(zmq.SUBSCRIBE, '')
17
18 s2 = ctx.socket(zmq.SUB)
19 s2.connect(iface)
20 s2.setsockopt(zmq.SUBSCRIBE, '')
21
22 A = numpy.random.random((1024,1024))
23
24 # send
25 p.send(A, copy=False)
26 # recv on 1 non-copy
27 msg1 = s1.recv(copy=False)
28 B1 = numpy.frombuffer(msg1.buffer, dtype=A.dtype).reshape(A.shape)
29 # recv on 2 copy
30 msg2 = s2.recv(copy=False)
31 B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape)
32
33 print (B1==B2).all()
34 print (B1==A).all()
35 A[0][0] += 10
36 print "~"
37 # after changing A in-place, B1 changes too, proving non-copying sends
38 print (B1==A).all()
39 # but B2 is fixed, since it called the msg.bytes attr, which copies
40 print (B1==B2).all()
41
42
43
44
@@ -0,0 +1,22 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client('tcp://127.0.0.1:10101')
4
5 @remote(client, bound=True)
6 def getkey(name):
7 """fetch something from globals"""
8 return globals().get(name)
9
10 @remote(client, bound=True, targets='all')
11 def setpids():
12 import os
13 globals()['pid'] = os.getpid()
14
15 # set pid in the globals
16 setpids()
17 getkey('pid')
18 getkey.targets=[1,2]
19 getkey('pid')
20 getkey.bound=False
21 getkey('pid') is None
22
@@ -0,0 +1,89 b''
1 import time
2 import numpy as np
3 from IPython.zmq.parallel import client as clientmod
4
5 nlist = map(int, np.logspace(2,9,16,base=2))
6 nlist2 = map(int, np.logspace(2,8,15,base=2))
7 tlist = map(int, np.logspace(7,22,16,base=2))
8 nt = 16
9 def wait(t=0):
10 import time
11 time.sleep(t)
12
13 def echo(s=''):
14 return s
15
16 def time_throughput(nmessages, t=0, f=wait):
17 client = clientmod.Client('tcp://127.0.0.1:10101')
18 view = client[None]
19 # do one ping before starting timing
20 if f is echo:
21 t = np.random.random(t/8)
22 view.apply_sync(echo, '')
23 client.spin()
24 tic = time.time()
25 for i in xrange(nmessages):
26 view.apply(f, t)
27 lap = time.time()
28 client.barrier()
29 toc = time.time()
30 return lap-tic, toc-tic
31
32 def time_twisted(nmessages, t=0, f=wait):
33 from IPython.kernel import client as kc
34 client = kc.TaskClient()
35 if f is wait:
36 s = "import time; time.sleep(%f)"%t
37 task = kc.StringTask(s)
38 elif f is echo:
39 t = np.random.random(t/8)
40 s = "s=t"
41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
42 else:
43 raise
44 # do one ping before starting timing
45 client.barrier(client.run(task))
46 tic = time.time()
47 tids = []
48 for i in xrange(nmessages):
49 tids.append(client.run(task))
50 lap = time.time()
51 client.barrier(tids)
52 toc = time.time()
53 return lap-tic, toc-tic
54
55 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
56 A = np.zeros((len(nlist),2))
57 for i,n in enumerate(nlist):
58 t1 = t2 = 0
59 for _ in range(trials):
60 time.sleep(.25)
61 ts = runner(n,t,f)
62 t1 += ts[0]
63 t2 += ts[1]
64 t1 /= trials
65 t2 /= trials
66 A[i] = (t1,t2)
67 A[i] = n/A[i]
68 print n,A[i]
69 return A
70
71 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
72 A = np.zeros((len(tlist),2))
73 for i,t in enumerate(tlist):
74 t1 = t2 = 0
75 for _ in range(trials):
76 time.sleep(.25)
77 ts = runner(n,t,f)
78 t1 += ts[0]
79 t2 += ts[1]
80 t1 /= trials
81 t2 /= trials
82 A[i] = (t1,t2)
83 A[i] = n/A[i]
84 print t,A[i]
85 return A
86
87 def start_cluster(n, scheduler):
88 pass
89 No newline at end of file
@@ -0,0 +1,15 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client('tcp://127.0.0.1:10101')
4
5 for id in client.ids:
6 client.push(dict(ids=id*id), targets=id)
7
8 rns = client[0]
9 rns['a'] = 5
10
11 print rns['a']
12
13 remotes = client[:]
14
15 print remotes['ids'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now