##// END OF EJS Templates
updated newparallel examples, moved into docs
MinRK -
Show More
@@ -0,0 +1,118 b''
1 from IPython.zmq.parallel import error
2 from IPython.zmq.parallel.dependency import Dependency
3 from IPython.zmq.parallel.client import *
4
5 client = Client()
6
7 # this will only run on machines that can import numpy:
8 @require('numpy')
9 def norm(A):
10 from numpy.linalg import norm
11 return norm(A,2)
12
13 def checkpid(pid):
14 """return the pid of the engine"""
15 import os
16 return os.getpid() == pid
17
18 def checkhostname(host):
19 import socket
20 return socket.gethostname() == host
21
22 def getpid():
23 import os
24 return os.getpid()
25
26 pid0 = client[0].apply_sync(getpid)
27
28 # this will depend on the pid being that of target 0:
29 @depend(checkpid, pid0)
30 def getpid2():
31 import os
32 return os.getpid()
33
34 view = client[None]
35 view.block=True
36
37 # will run on anything:
38 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
39 print pids1
40 # will only run on e0:
41 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
42 print pids2
43
44 print "now test some dependency behaviors"
45
46 def wait(t):
47 import time
48 time.sleep(t)
49 return t
50
51 # fail after some time:
52 def wait_and_fail(t):
53 import time
54 time.sleep(t)
55 return 1/0
56
57 successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59
60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
69
70 client.block = False
71
72 r1a = client.apply(getpid, after=d1a)
73 r1b = client.apply(getpid, follow=d1b)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
76 r3 = client.apply(getpid, after=d3)
77 r4a = client.apply(getpid, after=d4)
78 r4b = client.apply(getpid, follow=d4)
79 r4c = client.apply(getpid, after=d3, follow=d4)
80 r5 = client.apply(getpid, after=d5)
81 r5b = client.apply(getpid, follow=d5, after=d3)
82 r6 = client.apply(getpid, follow=d6)
83 r6b = client.apply(getpid, after=d6, follow=d2b)
84
85 def should_fail(f):
86 try:
87 f()
88 except error.KernelError:
89 pass
90 else:
91 print 'should have raised'
92 # raise Exception("should have raised")
93
94 # print r1a.msg_ids
95 r1a.get()
96 # print r1b.msg_ids
97 r1b.get()
98 # print r2a.msg_ids
99 should_fail(r2a.get)
100 # print r2b.msg_ids
101 should_fail(r2b.get)
102 # print r3.msg_ids
103 should_fail(r3.get)
104 # print r4a.msg_ids
105 r4a.get()
106 # print r4b.msg_ids
107 r4b.get()
108 # print r4c.msg_ids
109 should_fail(r4c.get)
110 # print r5.msg_ids
111 r5.get()
112 # print r5b.msg_ids
113 should_fail(r5b.get)
114 # print r6.msg_ids
115 should_fail(r6.get) # assuming > 1 engine
116 # print r6b.msg_ids
117 should_fail(r6b.get)
118 print 'done'
@@ -0,0 +1,36 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client()
4
5 @remote(client, block=True)
6 def square(a):
7 """return square of a number"""
8 return a*a
9
10 squares = map(square, range(42))
11
12 # but that blocked between each result; not exactly useful
13
14 square.block = False
15
16 arlist = map(square, range(42))
17 # submitted very fast
18
19 # wait for the results:
20 squares2 = [ r.get() for r in arlist ]
21
22 # now the more convenient @parallel decorator, which has a map method:
23
24 @parallel(client, block=False)
25 def psquare(a):
26 """return square of a number"""
27 return a*a
28
29 # this chunks the data into n-negines jobs, not 42 jobs:
30 ar = psquare.map(range(42))
31
32 # wait for the results to be done:
33 squares3 = ar.get()
34
35 print squares == squares2, squares3==squares
36 # True No newline at end of file
@@ -61,13 +61,13 b' def submit_jobs(client, G, jobs):'
61 61 results[node] = client.apply(jobs[node], after=deps)
62 62 return results
63 63
64 def validate_tree(G, times):
64 def validate_tree(G, results):
65 65 """Validate that jobs executed after their dependencies."""
66 66 for node in G:
67 t = times[node]
67 started = results[node].metadata.started
68 68 for parent in G.predecessors(node):
69 pt = times[parent]
70 assert t > pt, "%s should have happened after %s"%(node, parent)
69 finished = results[parent].metadata.completed
70 assert started > finished, "%s should have happened after %s"%(node, parent)
71 71
72 72 def main(nodes, edges):
73 73 """Generate a random graph, submit jobs, then validate that the
@@ -76,28 +76,28 b' def main(nodes, edges):'
76 76 in-degree on the y (just for spread). All arrows must
77 77 point at least slightly to the right if the graph is valid.
78 78 """
79 from matplotlib.dates import date2num
79 80 print "building DAG"
80 81 G = random_dag(nodes, edges)
81 82 jobs = {}
82 msg_ids = {}
83 times = {}
84 83 pos = {}
85 84 for node in G:
86 85 jobs[node] = randomwait
87 86
88 client = cmod.Client('tcp://127.0.0.1:10101')
87 client = cmod.Client()
89 88 print "submitting tasks"
90 89 results = submit_jobs(client, G, jobs)
91 90 print "waiting for results"
92 91 client.barrier()
93 92 print "done"
94 93 for node in G:
95 times[node] = results[node].get()
96 pos[node] = (times[node], G.in_degree(node)+random())
94 # times[node] = results[node].get()
95 t = date2num(results[node].metadata.started)
96 pos[node] = (t, G.in_degree(node)+random())
97 97
98 validate_tree(G, times)
98 validate_tree(G, results)
99 99 nx.draw(G, pos)
100 return G,times,msg_ids
100 return G,results
101 101
102 102 if __name__ == '__main__':
103 103 import pylab
1 NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
@@ -1,6 +1,6 b''
1 1 from IPython.zmq.parallel.client import *
2 2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4 4
5 5 @remote(client, bound=True)
6 6 def getkey(name):
@@ -14,7 +14,7 b" def echo(s=''):"
14 14 return s
15 15
16 16 def time_throughput(nmessages, t=0, f=wait):
17 client = clientmod.Client('tcp://127.0.0.1:10101')
17 client = clientmod.Client()
18 18 view = client[None]
19 19 # do one ping before starting timing
20 20 if f is echo:
@@ -83,7 +83,4 b' def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):'
83 83 A[i] = n/A[i]
84 84 print t,A[i]
85 85 return A
86
87 def start_cluster(n, scheduler):
88 pass
89 86 No newline at end of file
@@ -1,6 +1,6 b''
1 1 from IPython.zmq.parallel.client import *
2 2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4 4
5 5 for id in client.ids:
6 6 client.push(dict(ids=id*id), targets=id)
1 NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
@@ -10,12 +10,16 b' import sys'
10 10
11 11 argv = sys.argv
12 12
13 from IPython.zmq.parallel.engine import main
13 from IPython.zmq.parallel.engine import EngineFactory
14 from IPython.zmq.parallel.ipengineapp import launch_new_instance
14 15
15 16 ns = {}
16 17
17 18 # job
18 19 exec sys.argv[1] in ns
19 20
21 # this should really use Config:
22 EngineFactory.user_ns = ns
23
20 24 # start engine with job namespace
21 main([], user_ns=ns)
25 launch_new_instance()
@@ -32,7 +32,7 b' def cleanup(controller, engines):'
32 32 if __name__ == '__main__':
33 33
34 34 # Start controller in separate process
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
36 36 print('Started controller')
37 37
38 38 # "Submit jobs"
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now