Show More
@@ -0,0 +1,118 b'' | |||
|
1 | from IPython.zmq.parallel import error | |
|
2 | from IPython.zmq.parallel.dependency import Dependency | |
|
3 | from IPython.zmq.parallel.client import * | |
|
4 | ||
|
5 | client = Client() | |
|
6 | ||
|
7 | # this will only run on machines that can import numpy: | |
|
8 | @require('numpy') | |
|
9 | def norm(A): | |
|
10 | from numpy.linalg import norm | |
|
11 | return norm(A,2) | |
|
12 | ||
|
13 | def checkpid(pid): | |
|
14 | """return the pid of the engine""" | |
|
15 | import os | |
|
16 | return os.getpid() == pid | |
|
17 | ||
|
18 | def checkhostname(host): | |
|
19 | import socket | |
|
20 | return socket.gethostname() == host | |
|
21 | ||
|
22 | def getpid(): | |
|
23 | import os | |
|
24 | return os.getpid() | |
|
25 | ||
|
26 | pid0 = client[0].apply_sync(getpid) | |
|
27 | ||
|
28 | # this will depend on the pid being that of target 0: | |
|
29 | @depend(checkpid, pid0) | |
|
30 | def getpid2(): | |
|
31 | import os | |
|
32 | return os.getpid() | |
|
33 | ||
|
34 | view = client[None] | |
|
35 | view.block=True | |
|
36 | ||
|
37 | # will run on anything: | |
|
38 | pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ] | |
|
39 | print pids1 | |
|
40 | # will only run on e0: | |
|
41 | pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ] | |
|
42 | print pids2 | |
|
43 | ||
|
44 | print "now test some dependency behaviors" | |
|
45 | ||
|
46 | def wait(t): | |
|
47 | import time | |
|
48 | time.sleep(t) | |
|
49 | return t | |
|
50 | ||
|
51 | # fail after some time: | |
|
52 | def wait_and_fail(t): | |
|
53 | import time | |
|
54 | time.sleep(t) | |
|
55 | return 1/0 | |
|
56 | ||
|
57 | successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ] | |
|
58 | failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] | |
|
59 | ||
|
60 | mixed = [failures[0],successes[0]] | |
|
61 | d1a = Dependency(mixed, mode='any', success_only=False) # yes | |
|
62 | d1b = Dependency(mixed, mode='any', success_only=True) # yes | |
|
63 | d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow | |
|
64 | d2b = Dependency(mixed, mode='all', success_only=True) # no | |
|
65 | d3 = Dependency(failures, mode='any', success_only=True) # no | |
|
66 | d4 = Dependency(failures, mode='any', success_only=False) # yes | |
|
67 | d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow | |
|
68 | d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow | |
|
69 | ||
|
70 | client.block = False | |
|
71 | ||
|
72 | r1a = client.apply(getpid, after=d1a) | |
|
73 | r1b = client.apply(getpid, follow=d1b) | |
|
74 | r2a = client.apply(getpid, after=d2b, follow=d2a) | |
|
75 | r2b = client.apply(getpid, after=d2a, follow=d2b) | |
|
76 | r3 = client.apply(getpid, after=d3) | |
|
77 | r4a = client.apply(getpid, after=d4) | |
|
78 | r4b = client.apply(getpid, follow=d4) | |
|
79 | r4c = client.apply(getpid, after=d3, follow=d4) | |
|
80 | r5 = client.apply(getpid, after=d5) | |
|
81 | r5b = client.apply(getpid, follow=d5, after=d3) | |
|
82 | r6 = client.apply(getpid, follow=d6) | |
|
83 | r6b = client.apply(getpid, after=d6, follow=d2b) | |
|
84 | ||
|
85 | def should_fail(f): | |
|
86 | try: | |
|
87 | f() | |
|
88 | except error.KernelError: | |
|
89 | pass | |
|
90 | else: | |
|
91 | print 'should have raised' | |
|
92 | # raise Exception("should have raised") | |
|
93 | ||
|
94 | # print r1a.msg_ids | |
|
95 | r1a.get() | |
|
96 | # print r1b.msg_ids | |
|
97 | r1b.get() | |
|
98 | # print r2a.msg_ids | |
|
99 | should_fail(r2a.get) | |
|
100 | # print r2b.msg_ids | |
|
101 | should_fail(r2b.get) | |
|
102 | # print r3.msg_ids | |
|
103 | should_fail(r3.get) | |
|
104 | # print r4a.msg_ids | |
|
105 | r4a.get() | |
|
106 | # print r4b.msg_ids | |
|
107 | r4b.get() | |
|
108 | # print r4c.msg_ids | |
|
109 | should_fail(r4c.get) | |
|
110 | # print r5.msg_ids | |
|
111 | r5.get() | |
|
112 | # print r5b.msg_ids | |
|
113 | should_fail(r5b.get) | |
|
114 | # print r6.msg_ids | |
|
115 | should_fail(r6.get) # assuming > 1 engine | |
|
116 | # print r6b.msg_ids | |
|
117 | should_fail(r6b.get) | |
|
118 | print 'done' |
@@ -0,0 +1,36 b'' | |||
|
1 | from IPython.zmq.parallel.client import * | |
|
2 | ||
|
3 | client = Client() | |
|
4 | ||
|
5 | @remote(client, block=True) | |
|
6 | def square(a): | |
|
7 | """return square of a number""" | |
|
8 | return a*a | |
|
9 | ||
|
10 | squares = map(square, range(42)) | |
|
11 | ||
|
12 | # but that blocked between each result; not exactly useful | |
|
13 | ||
|
14 | square.block = False | |
|
15 | ||
|
16 | arlist = map(square, range(42)) | |
|
17 | # submitted very fast | |
|
18 | ||
|
19 | # wait for the results: | |
|
20 | squares2 = [ r.get() for r in arlist ] | |
|
21 | ||
|
22 | # now the more convenient @parallel decorator, which has a map method: | |
|
23 | ||
|
24 | @parallel(client, block=False) | |
|
25 | def psquare(a): | |
|
26 | """return square of a number""" | |
|
27 | return a*a | |
|
28 | ||
|
29 | # this chunks the data into n-negines jobs, not 42 jobs: | |
|
30 | ar = psquare.map(range(42)) | |
|
31 | ||
|
32 | # wait for the results to be done: | |
|
33 | squares3 = ar.get() | |
|
34 | ||
|
35 | print squares == squares2, squares3==squares | |
|
36 | # True No newline at end of file |
@@ -61,13 +61,13 b' def submit_jobs(client, G, jobs):' | |||
|
61 | 61 | results[node] = client.apply(jobs[node], after=deps) |
|
62 | 62 | return results |
|
63 | 63 | |
|
64 |
def validate_tree(G, |
|
|
64 | def validate_tree(G, results): | |
|
65 | 65 | """Validate that jobs executed after their dependencies.""" |
|
66 | 66 | for node in G: |
|
67 | t = times[node] | |
|
67 | started = results[node].metadata.started | |
|
68 | 68 | for parent in G.predecessors(node): |
|
69 | pt = times[parent] | |
|
70 |
assert t > |
|
|
69 | finished = results[parent].metadata.completed | |
|
70 | assert started > finished, "%s should have happened after %s"%(node, parent) | |
|
71 | 71 | |
|
72 | 72 | def main(nodes, edges): |
|
73 | 73 | """Generate a random graph, submit jobs, then validate that the |
@@ -76,28 +76,28 b' def main(nodes, edges):' | |||
|
76 | 76 | in-degree on the y (just for spread). All arrows must |
|
77 | 77 | point at least slightly to the right if the graph is valid. |
|
78 | 78 | """ |
|
79 | from matplotlib.dates import date2num | |
|
79 | 80 | print "building DAG" |
|
80 | 81 | G = random_dag(nodes, edges) |
|
81 | 82 | jobs = {} |
|
82 | msg_ids = {} | |
|
83 | times = {} | |
|
84 | 83 | pos = {} |
|
85 | 84 | for node in G: |
|
86 | 85 | jobs[node] = randomwait |
|
87 | 86 | |
|
88 |
client = cmod.Client( |
|
|
87 | client = cmod.Client() | |
|
89 | 88 | print "submitting tasks" |
|
90 | 89 | results = submit_jobs(client, G, jobs) |
|
91 | 90 | print "waiting for results" |
|
92 | 91 | client.barrier() |
|
93 | 92 | print "done" |
|
94 | 93 | for node in G: |
|
95 | times[node] = results[node].get() | |
|
96 | pos[node] = (times[node], G.in_degree(node)+random()) | |
|
94 | # times[node] = results[node].get() | |
|
95 | t = date2num(results[node].metadata.started) | |
|
96 | pos[node] = (t, G.in_degree(node)+random()) | |
|
97 | 97 | |
|
98 |
validate_tree(G, |
|
|
98 | validate_tree(G, results) | |
|
99 | 99 | nx.draw(G, pos) |
|
100 |
return G, |
|
|
100 | return G,results | |
|
101 | 101 | |
|
102 | 102 | if __name__ == '__main__': |
|
103 | 103 | import pylab |
|
1 | NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py |
@@ -1,6 +1,6 b'' | |||
|
1 | 1 | from IPython.zmq.parallel.client import * |
|
2 | 2 | |
|
3 | client = Client('tcp://127.0.0.1:10101') | |
|
3 | client = Client() | |
|
4 | 4 | |
|
5 | 5 | @remote(client, bound=True) |
|
6 | 6 | def getkey(name): |
@@ -14,7 +14,7 b" def echo(s=''):" | |||
|
14 | 14 | return s |
|
15 | 15 | |
|
16 | 16 | def time_throughput(nmessages, t=0, f=wait): |
|
17 |
client = clientmod.Client( |
|
|
17 | client = clientmod.Client() | |
|
18 | 18 | view = client[None] |
|
19 | 19 | # do one ping before starting timing |
|
20 | 20 | if f is echo: |
@@ -83,7 +83,4 b' def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):' | |||
|
83 | 83 | A[i] = n/A[i] |
|
84 | 84 | print t,A[i] |
|
85 | 85 | return A |
|
86 | ||
|
87 | def start_cluster(n, scheduler): | |
|
88 | pass | |
|
89 | 86 | No newline at end of file |
@@ -1,6 +1,6 b'' | |||
|
1 | 1 | from IPython.zmq.parallel.client import * |
|
2 | 2 | |
|
3 | client = Client('tcp://127.0.0.1:10101') | |
|
3 | client = Client() | |
|
4 | 4 | |
|
5 | 5 | for id in client.ids: |
|
6 | 6 | client.push(dict(ids=id*id), targets=id) |
|
1 | NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py |
@@ -10,12 +10,16 b' import sys' | |||
|
10 | 10 | |
|
11 | 11 | argv = sys.argv |
|
12 | 12 | |
|
13 |
from IPython.zmq.parallel.engine import |
|
|
13 | from IPython.zmq.parallel.engine import EngineFactory | |
|
14 | from IPython.zmq.parallel.ipengineapp import launch_new_instance | |
|
14 | 15 | |
|
15 | 16 | ns = {} |
|
16 | 17 | |
|
17 | 18 | # job |
|
18 | 19 | exec sys.argv[1] in ns |
|
19 | 20 | |
|
21 | # this should really use Config: | |
|
22 | EngineFactory.user_ns = ns | |
|
23 | ||
|
20 | 24 | # start engine with job namespace |
|
21 | main([], user_ns=ns) | |
|
25 | launch_new_instance() |
@@ -32,7 +32,7 b' def cleanup(controller, engines):' | |||
|
32 | 32 | if __name__ == '__main__': |
|
33 | 33 | |
|
34 | 34 | # Start controller in separate process |
|
35 | cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller']) | |
|
35 | cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp']) | |
|
36 | 36 | print('Started controller') |
|
37 | 37 | |
|
38 | 38 | # "Submit jobs" |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now