##// END OF EJS Templates
updated newparallel examples, moved into docs
MinRK -
Show More
@@ -0,0 +1,118 b''
1 from IPython.zmq.parallel import error
2 from IPython.zmq.parallel.dependency import Dependency
3 from IPython.zmq.parallel.client import *
4
5 client = Client()
6
7 # this will only run on machines that can import numpy:
8 @require('numpy')
9 def norm(A):
10 from numpy.linalg import norm
11 return norm(A,2)
12
13 def checkpid(pid):
14 """return the pid of the engine"""
15 import os
16 return os.getpid() == pid
17
18 def checkhostname(host):
19 import socket
20 return socket.gethostname() == host
21
22 def getpid():
23 import os
24 return os.getpid()
25
26 pid0 = client[0].apply_sync(getpid)
27
28 # this will depend on the pid being that of target 0:
29 @depend(checkpid, pid0)
30 def getpid2():
31 import os
32 return os.getpid()
33
34 view = client[None]
35 view.block=True
36
37 # will run on anything:
38 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
39 print pids1
40 # will only run on e0:
41 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
42 print pids2
43
44 print "now test some dependency behaviors"
45
46 def wait(t):
47 import time
48 time.sleep(t)
49 return t
50
51 # fail after some time:
52 def wait_and_fail(t):
53 import time
54 time.sleep(t)
55 return 1/0
56
57 successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59
60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
69
70 client.block = False
71
72 r1a = client.apply(getpid, after=d1a)
73 r1b = client.apply(getpid, follow=d1b)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
76 r3 = client.apply(getpid, after=d3)
77 r4a = client.apply(getpid, after=d4)
78 r4b = client.apply(getpid, follow=d4)
79 r4c = client.apply(getpid, after=d3, follow=d4)
80 r5 = client.apply(getpid, after=d5)
81 r5b = client.apply(getpid, follow=d5, after=d3)
82 r6 = client.apply(getpid, follow=d6)
83 r6b = client.apply(getpid, after=d6, follow=d2b)
84
85 def should_fail(f):
86 try:
87 f()
88 except error.KernelError:
89 pass
90 else:
91 print 'should have raised'
92 # raise Exception("should have raised")
93
94 # print r1a.msg_ids
95 r1a.get()
96 # print r1b.msg_ids
97 r1b.get()
98 # print r2a.msg_ids
99 should_fail(r2a.get)
100 # print r2b.msg_ids
101 should_fail(r2b.get)
102 # print r3.msg_ids
103 should_fail(r3.get)
104 # print r4a.msg_ids
105 r4a.get()
106 # print r4b.msg_ids
107 r4b.get()
108 # print r4c.msg_ids
109 should_fail(r4c.get)
110 # print r5.msg_ids
111 r5.get()
112 # print r5b.msg_ids
113 should_fail(r5b.get)
114 # print r6.msg_ids
115 should_fail(r6.get) # assuming > 1 engine
116 # print r6b.msg_ids
117 should_fail(r6b.get)
118 print 'done'
@@ -0,0 +1,36 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client()
4
5 @remote(client, block=True)
6 def square(a):
7 """return square of a number"""
8 return a*a
9
10 squares = map(square, range(42))
11
12 # but that blocked between each result; not exactly useful
13
14 square.block = False
15
16 arlist = map(square, range(42))
17 # submitted very fast
18
19 # wait for the results:
20 squares2 = [ r.get() for r in arlist ]
21
22 # now the more convenient @parallel decorator, which has a map method:
23
24 @parallel(client, block=False)
25 def psquare(a):
26 """return square of a number"""
27 return a*a
28
29 # this chunks the data into n-negines jobs, not 42 jobs:
30 ar = psquare.map(range(42))
31
32 # wait for the results to be done:
33 squares3 = ar.get()
34
35 print squares == squares2, squares3==squares
36 # True No newline at end of file
@@ -61,13 +61,13 b' def submit_jobs(client, G, jobs):'
61 results[node] = client.apply(jobs[node], after=deps)
61 results[node] = client.apply(jobs[node], after=deps)
62 return results
62 return results
63
63
64 def validate_tree(G, times):
64 def validate_tree(G, results):
65 """Validate that jobs executed after their dependencies."""
65 """Validate that jobs executed after their dependencies."""
66 for node in G:
66 for node in G:
67 t = times[node]
67 started = results[node].metadata.started
68 for parent in G.predecessors(node):
68 for parent in G.predecessors(node):
69 pt = times[parent]
69 finished = results[parent].metadata.completed
70 assert t > pt, "%s should have happened after %s"%(node, parent)
70 assert started > finished, "%s should have happened after %s"%(node, parent)
71
71
72 def main(nodes, edges):
72 def main(nodes, edges):
73 """Generate a random graph, submit jobs, then validate that the
73 """Generate a random graph, submit jobs, then validate that the
@@ -76,28 +76,28 b' def main(nodes, edges):'
76 in-degree on the y (just for spread). All arrows must
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 from matplotlib.dates import date2num
79 print "building DAG"
80 print "building DAG"
80 G = random_dag(nodes, edges)
81 G = random_dag(nodes, edges)
81 jobs = {}
82 jobs = {}
82 msg_ids = {}
83 times = {}
84 pos = {}
83 pos = {}
85 for node in G:
84 for node in G:
86 jobs[node] = randomwait
85 jobs[node] = randomwait
87
86
88 client = cmod.Client('tcp://127.0.0.1:10101')
87 client = cmod.Client()
89 print "submitting tasks"
88 print "submitting tasks"
90 results = submit_jobs(client, G, jobs)
89 results = submit_jobs(client, G, jobs)
91 print "waiting for results"
90 print "waiting for results"
92 client.barrier()
91 client.barrier()
93 print "done"
92 print "done"
94 for node in G:
93 for node in G:
95 times[node] = results[node].get()
94 # times[node] = results[node].get()
96 pos[node] = (times[node], G.in_degree(node)+random())
95 t = date2num(results[node].metadata.started)
96 pos[node] = (t, G.in_degree(node)+random())
97
97
98 validate_tree(G, times)
98 validate_tree(G, results)
99 nx.draw(G, pos)
99 nx.draw(G, pos)
100 return G,times,msg_ids
100 return G,results
101
101
102 if __name__ == '__main__':
102 if __name__ == '__main__':
103 import pylab
103 import pylab
1 NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
@@ -1,6 +1,6 b''
1 from IPython.zmq.parallel.client import *
1 from IPython.zmq.parallel.client import *
2
2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4
4
5 @remote(client, bound=True)
5 @remote(client, bound=True)
6 def getkey(name):
6 def getkey(name):
@@ -14,7 +14,7 b" def echo(s=''):"
14 return s
14 return s
15
15
16 def time_throughput(nmessages, t=0, f=wait):
16 def time_throughput(nmessages, t=0, f=wait):
17 client = clientmod.Client('tcp://127.0.0.1:10101')
17 client = clientmod.Client()
18 view = client[None]
18 view = client[None]
19 # do one ping before starting timing
19 # do one ping before starting timing
20 if f is echo:
20 if f is echo:
@@ -83,7 +83,4 b' def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):'
83 A[i] = n/A[i]
83 A[i] = n/A[i]
84 print t,A[i]
84 print t,A[i]
85 return A
85 return A
86
87 def start_cluster(n, scheduler):
88 pass
89 No newline at end of file
86
@@ -1,6 +1,6 b''
1 from IPython.zmq.parallel.client import *
1 from IPython.zmq.parallel.client import *
2
2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4
4
5 for id in client.ids:
5 for id in client.ids:
6 client.push(dict(ids=id*id), targets=id)
6 client.push(dict(ids=id*id), targets=id)
1 NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
@@ -10,12 +10,16 b' import sys'
10
10
11 argv = sys.argv
11 argv = sys.argv
12
12
13 from IPython.zmq.parallel.engine import main
13 from IPython.zmq.parallel.engine import EngineFactory
14 from IPython.zmq.parallel.ipengineapp import launch_new_instance
14
15
15 ns = {}
16 ns = {}
16
17
17 # job
18 # job
18 exec sys.argv[1] in ns
19 exec sys.argv[1] in ns
19
20
21 # this should really use Config:
22 EngineFactory.user_ns = ns
23
20 # start engine with job namespace
24 # start engine with job namespace
21 main([], user_ns=ns)
25 launch_new_instance()
@@ -32,7 +32,7 b' def cleanup(controller, engines):'
32 if __name__ == '__main__':
32 if __name__ == '__main__':
33
33
34 # Start controller in separate process
34 # Start controller in separate process
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
36 print('Started controller')
36 print('Started controller')
37
37
38 # "Submit jobs"
38 # "Submit jobs"
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now