Show More
@@ -0,0 +1,103 b'' | |||||
|
1 | """Example for generating an arbitrary DAG as a dependency map. | |||
|
2 | ||||
|
3 | This demo uses networkx to generate the graph. | |||
|
4 | ||||
|
5 | Authors | |||
|
6 | ------- | |||
|
7 | * MinRK | |||
|
8 | """ | |||
|
9 | import networkx as nx | |||
|
10 | from random import randint, random | |||
|
11 | from IPython.zmq.parallel import client as cmod | |||
|
12 | ||||
|
13 | def randomwait(): | |||
|
14 | import time | |||
|
15 | from random import random | |||
|
16 | time.sleep(random()) | |||
|
17 | return time.time() | |||
|
18 | ||||
|
19 | ||||
|
20 | def random_dag(nodes, edges): | |||
|
21 | """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" | |||
|
22 | G = nx.DiGraph() | |||
|
23 | for i in range(nodes): | |||
|
24 | G.add_node(i) | |||
|
25 | while edges > 0: | |||
|
26 | a = randint(0,nodes-1) | |||
|
27 | b=a | |||
|
28 | while b==a: | |||
|
29 | b = randint(0,nodes-1) | |||
|
30 | G.add_edge(a,b) | |||
|
31 | if nx.is_directed_acyclic_graph(G): | |||
|
32 | edges -= 1 | |||
|
33 | else: | |||
|
34 | # we closed a loop! | |||
|
35 | G.remove_edge(a,b) | |||
|
36 | return G | |||
|
37 | ||||
|
38 | def add_children(G, parent, level, n=2): | |||
|
39 | """Add children recursively to a binary tree.""" | |||
|
40 | if level == 0: | |||
|
41 | return | |||
|
42 | for i in range(n): | |||
|
43 | child = parent+str(i) | |||
|
44 | G.add_node(child) | |||
|
45 | G.add_edge(parent,child) | |||
|
46 | add_children(G, child, level-1, n) | |||
|
47 | ||||
|
48 | def make_bintree(levels): | |||
|
49 | """Make a symmetrical binary tree with @levels""" | |||
|
50 | G = nx.DiGraph() | |||
|
51 | root = '0' | |||
|
52 | G.add_node(root) | |||
|
53 | add_children(G, root, levels, 2) | |||
|
54 | return G | |||
|
55 | ||||
|
56 | def submit_jobs(client, G, jobs): | |||
|
57 | """Submit jobs via client where G describes the time dependencies.""" | |||
|
58 | msg_ids = {} | |||
|
59 | for node in nx.topological_sort(G): | |||
|
60 | deps = [ msg_ids[n] for n in G.predecessors(node) ] | |||
|
61 | msg_ids[node] = client.apply(jobs[node], after=deps) | |||
|
62 | return msg_ids | |||
|
63 | ||||
|
64 | def validate_tree(G, times): | |||
|
65 | """Validate that jobs executed after their dependencies.""" | |||
|
66 | for node in G: | |||
|
67 | t = times[node] | |||
|
68 | for parent in G.predecessors(node): | |||
|
69 | pt = times[parent] | |||
|
70 | assert t > pt, "%s should have happened after %s"%(node, parent) | |||
|
71 | ||||
|
72 | def main(nodes, edges): | |||
|
73 | """Generate a random graph, submit jobs, then validate that the | |||
|
74 | dependency order was enforced. | |||
|
75 | Finally, plot the graph, with time on the x-axis, and | |||
|
76 | in-degree on the y (just for spread). All arrows must | |||
|
77 | point at least slightly to the right if the graph is valid. | |||
|
78 | """ | |||
|
79 | G = random_dag(nodes, edges) | |||
|
80 | jobs = {} | |||
|
81 | msg_ids = {} | |||
|
82 | times = {} | |||
|
83 | pos = {} | |||
|
84 | for node in G: | |||
|
85 | jobs[node] = randomwait | |||
|
86 | ||||
|
87 | client = cmod.Client('tcp://127.0.0.1:10101') | |||
|
88 | ||||
|
89 | msg_ids = submit_jobs(client, G, jobs) | |||
|
90 | client.barrier() | |||
|
91 | for node in G: | |||
|
92 | times[node] = client.results[msg_ids[node]] | |||
|
93 | pos[node] = (times[node], G.in_degree(node)+random()) | |||
|
94 | ||||
|
95 | validate_tree(G, times) | |||
|
96 | nx.draw(G, pos) | |||
|
97 | return G,times,msg_ids | |||
|
98 | ||||
|
99 | if __name__ == '__main__': | |||
|
100 | import pylab | |||
|
101 | main(32,128) | |||
|
102 | pylab.show() | |||
|
103 | No newline at end of file |
@@ -0,0 +1,35 b'' | |||||
|
1 | from IPython.zmq.parallel.client import * | |||
|
2 | ||||
|
3 | client = Client('tcp://127.0.0.1:10101') | |||
|
4 | ||||
|
5 | @require('numpy') | |||
|
6 | def norm(A): | |||
|
7 | from numpy.linalg import norm | |||
|
8 | return norm(A,2) | |||
|
9 | ||||
|
10 | def checkpid(pid): | |||
|
11 | import os | |||
|
12 | return os.getpid() == pid | |||
|
13 | ||||
|
14 | def checkhostname(host): | |||
|
15 | import socket | |||
|
16 | return socket.gethostname() == host | |||
|
17 | ||||
|
18 | def getpid(): | |||
|
19 | import os | |||
|
20 | return os.getpid() | |||
|
21 | ||||
|
22 | pid0 = client.apply(getpid, targets=0, block=True) | |||
|
23 | ||||
|
24 | @depend(checkpid, pid0) | |||
|
25 | def getpid2(): | |||
|
26 | import os | |||
|
27 | return os.getpid() | |||
|
28 | ||||
|
29 | rns = client[None] | |||
|
30 | rns.block=True | |||
|
31 | ||||
|
32 | pids1 = [ rns.apply(getpid) for i in range(len(client.ids)) ] | |||
|
33 | pids2 = [ rns.apply(getpid2) for i in range(len(client.ids)) ] | |||
|
34 | print pids1 | |||
|
35 | print pids2 |
@@ -0,0 +1,20 b'' | |||||
|
1 | import time | |||
|
2 | from IPython.zmq.parallel.client import * | |||
|
3 | ||||
|
4 | def wait(t): | |||
|
5 | import time | |||
|
6 | time.sleep(t) | |||
|
7 | return t | |||
|
8 | ||||
|
9 | client = Client('tcp://127.0.0.1:10101') | |||
|
10 | view = client[None] | |||
|
11 | ||||
|
12 | tic = time.time() | |||
|
13 | for i in range(128): | |||
|
14 | view.apply(wait, 1e-2*i) | |||
|
15 | # limit to 1k msgs/s | |||
|
16 | time.sleep(1e-2) | |||
|
17 | ||||
|
18 | client.barrier() | |||
|
19 | toc = time.time() | |||
|
20 | print toc-tic |
@@ -0,0 +1,20 b'' | |||||
|
1 | from IPython.zmq.parallel.client import * | |||
|
2 | ||||
|
3 | client = Client('tcp://127.0.0.1:10101') | |||
|
4 | ||||
|
5 | @remote(client, block=True) | |||
|
6 | def square(a): | |||
|
7 | """return square of a number""" | |||
|
8 | return a*a | |||
|
9 | ||||
|
10 | squares = map(square, range(42)) | |||
|
11 | ||||
|
12 | # but that blocked between each result, not exactly useful | |||
|
13 | square.block=False | |||
|
14 | msg_ids = map(square, range(42)) | |||
|
15 | # submitted very fast | |||
|
16 | # wait for them to be done: | |||
|
17 | client.barrier(msg_id) | |||
|
18 | squares2 = map(client.results.get, msg_ids) | |||
|
19 | print squares == squares2 | |||
|
20 | # True No newline at end of file |
@@ -0,0 +1,44 b'' | |||||
|
1 | """non-copying sends""" | |||
|
2 | import zmq | |||
|
3 | import numpy | |||
|
4 | ||||
|
5 | n = 10 | |||
|
6 | iface = 'inproc://pub' | |||
|
7 | ||||
|
8 | ctx = zmq.Context() | |||
|
9 | ||||
|
10 | p = ctx.socket(zmq.PUB) | |||
|
11 | p.bind(iface) | |||
|
12 | ||||
|
13 | # connect 2 subs | |||
|
14 | s1 = ctx.socket(zmq.SUB) | |||
|
15 | s1.connect(iface) | |||
|
16 | s1.setsockopt(zmq.SUBSCRIBE, '') | |||
|
17 | ||||
|
18 | s2 = ctx.socket(zmq.SUB) | |||
|
19 | s2.connect(iface) | |||
|
20 | s2.setsockopt(zmq.SUBSCRIBE, '') | |||
|
21 | ||||
|
22 | A = numpy.random.random((1024,1024)) | |||
|
23 | ||||
|
24 | # send | |||
|
25 | p.send(A, copy=False) | |||
|
26 | # recv on 1 non-copy | |||
|
27 | msg1 = s1.recv(copy=False) | |||
|
28 | B1 = numpy.frombuffer(msg1.buffer, dtype=A.dtype).reshape(A.shape) | |||
|
29 | # recv on 2 copy | |||
|
30 | msg2 = s2.recv(copy=False) | |||
|
31 | B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape) | |||
|
32 | ||||
|
33 | print (B1==B2).all() | |||
|
34 | print (B1==A).all() | |||
|
35 | A[0][0] += 10 | |||
|
36 | print "~" | |||
|
37 | # after changing A in-place, B1 changes too, proving non-copying sends | |||
|
38 | print (B1==A).all() | |||
|
39 | # but B2 is fixed, since it called the msg.bytes attr, which copies | |||
|
40 | print (B1==B2).all() | |||
|
41 | ||||
|
42 | ||||
|
43 | ||||
|
44 |
@@ -0,0 +1,22 b'' | |||||
|
1 | from IPython.zmq.parallel.client import * | |||
|
2 | ||||
|
3 | client = Client('tcp://127.0.0.1:10101') | |||
|
4 | ||||
|
5 | @remote(client, bound=True) | |||
|
6 | def getkey(name): | |||
|
7 | """fetch something from globals""" | |||
|
8 | return globals().get(name) | |||
|
9 | ||||
|
10 | @remote(client, bound=True, targets='all') | |||
|
11 | def setpids(): | |||
|
12 | import os | |||
|
13 | globals()['pid'] = os.getpid() | |||
|
14 | ||||
|
15 | # set pid in the globals | |||
|
16 | setpids() | |||
|
17 | getkey('pid') | |||
|
18 | getkey.targets=[1,2] | |||
|
19 | getkey('pid') | |||
|
20 | getkey.bound=False | |||
|
21 | getkey('pid') is None | |||
|
22 |
@@ -0,0 +1,89 b'' | |||||
|
1 | import time | |||
|
2 | import numpy as np | |||
|
3 | from IPython.zmq.parallel import client as clientmod | |||
|
4 | ||||
|
5 | nlist = map(int, np.logspace(2,9,16,base=2)) | |||
|
6 | nlist2 = map(int, np.logspace(2,8,15,base=2)) | |||
|
7 | tlist = map(int, np.logspace(7,22,16,base=2)) | |||
|
8 | nt = 16 | |||
|
9 | def wait(t=0): | |||
|
10 | import time | |||
|
11 | time.sleep(t) | |||
|
12 | ||||
|
13 | def echo(s=''): | |||
|
14 | return s | |||
|
15 | ||||
|
16 | def time_throughput(nmessages, t=0, f=wait): | |||
|
17 | client = clientmod.Client('tcp://127.0.0.1:10101') | |||
|
18 | view = client[None] | |||
|
19 | # do one ping before starting timing | |||
|
20 | if f is echo: | |||
|
21 | t = np.random.random(t/8) | |||
|
22 | view.apply_sync(echo, '') | |||
|
23 | client.spin() | |||
|
24 | tic = time.time() | |||
|
25 | for i in xrange(nmessages): | |||
|
26 | view.apply(f, t) | |||
|
27 | lap = time.time() | |||
|
28 | client.barrier() | |||
|
29 | toc = time.time() | |||
|
30 | return lap-tic, toc-tic | |||
|
31 | ||||
|
32 | def time_twisted(nmessages, t=0, f=wait): | |||
|
33 | from IPython.kernel import client as kc | |||
|
34 | client = kc.TaskClient() | |||
|
35 | if f is wait: | |||
|
36 | s = "import time; time.sleep(%f)"%t | |||
|
37 | task = kc.StringTask(s) | |||
|
38 | elif f is echo: | |||
|
39 | t = np.random.random(t/8) | |||
|
40 | s = "s=t" | |||
|
41 | task = kc.StringTask(s, push=dict(t=t), pull=['s']) | |||
|
42 | else: | |||
|
43 | raise | |||
|
44 | # do one ping before starting timing | |||
|
45 | client.barrier(client.run(task)) | |||
|
46 | tic = time.time() | |||
|
47 | tids = [] | |||
|
48 | for i in xrange(nmessages): | |||
|
49 | tids.append(client.run(task)) | |||
|
50 | lap = time.time() | |||
|
51 | client.barrier(tids) | |||
|
52 | toc = time.time() | |||
|
53 | return lap-tic, toc-tic | |||
|
54 | ||||
|
55 | def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput): | |||
|
56 | A = np.zeros((len(nlist),2)) | |||
|
57 | for i,n in enumerate(nlist): | |||
|
58 | t1 = t2 = 0 | |||
|
59 | for _ in range(trials): | |||
|
60 | time.sleep(.25) | |||
|
61 | ts = runner(n,t,f) | |||
|
62 | t1 += ts[0] | |||
|
63 | t2 += ts[1] | |||
|
64 | t1 /= trials | |||
|
65 | t2 /= trials | |||
|
66 | A[i] = (t1,t2) | |||
|
67 | A[i] = n/A[i] | |||
|
68 | print n,A[i] | |||
|
69 | return A | |||
|
70 | ||||
|
71 | def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput): | |||
|
72 | A = np.zeros((len(tlist),2)) | |||
|
73 | for i,t in enumerate(tlist): | |||
|
74 | t1 = t2 = 0 | |||
|
75 | for _ in range(trials): | |||
|
76 | time.sleep(.25) | |||
|
77 | ts = runner(n,t,f) | |||
|
78 | t1 += ts[0] | |||
|
79 | t2 += ts[1] | |||
|
80 | t1 /= trials | |||
|
81 | t2 /= trials | |||
|
82 | A[i] = (t1,t2) | |||
|
83 | A[i] = n/A[i] | |||
|
84 | print t,A[i] | |||
|
85 | return A | |||
|
86 | ||||
|
87 | def start_cluster(n, scheduler): | |||
|
88 | pass | |||
|
89 | No newline at end of file |
@@ -0,0 +1,15 b'' | |||||
|
1 | from IPython.zmq.parallel.client import * | |||
|
2 | ||||
|
3 | client = Client('tcp://127.0.0.1:10101') | |||
|
4 | ||||
|
5 | for id in client.ids: | |||
|
6 | client.push(dict(ids=id*id), targets=id) | |||
|
7 | ||||
|
8 | rns = client[0] | |||
|
9 | rns['a'] = 5 | |||
|
10 | ||||
|
11 | print rns['a'] | |||
|
12 | ||||
|
13 | remotes = client[:] | |||
|
14 | ||||
|
15 | print remotes['ids'] No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now