##// END OF EJS Templates
updated newparallel examples, moved into docs
MinRK -
Show More
@@ -0,0 +1,118 b''
1 from IPython.zmq.parallel import error
2 from IPython.zmq.parallel.dependency import Dependency
3 from IPython.zmq.parallel.client import *
4
5 client = Client()
6
7 # this will only run on machines that can import numpy:
8 @require('numpy')
9 def norm(A):
10 from numpy.linalg import norm
11 return norm(A,2)
12
13 def checkpid(pid):
14 """return the pid of the engine"""
15 import os
16 return os.getpid() == pid
17
18 def checkhostname(host):
19 import socket
20 return socket.gethostname() == host
21
22 def getpid():
23 import os
24 return os.getpid()
25
26 pid0 = client[0].apply_sync(getpid)
27
28 # this will depend on the pid being that of target 0:
29 @depend(checkpid, pid0)
30 def getpid2():
31 import os
32 return os.getpid()
33
34 view = client[None]
35 view.block=True
36
37 # will run on anything:
38 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
39 print pids1
40 # will only run on e0:
41 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
42 print pids2
43
44 print "now test some dependency behaviors"
45
46 def wait(t):
47 import time
48 time.sleep(t)
49 return t
50
51 # fail after some time:
52 def wait_and_fail(t):
53 import time
54 time.sleep(t)
55 return 1/0
56
57 successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59
60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
69
70 client.block = False
71
72 r1a = client.apply(getpid, after=d1a)
73 r1b = client.apply(getpid, follow=d1b)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
76 r3 = client.apply(getpid, after=d3)
77 r4a = client.apply(getpid, after=d4)
78 r4b = client.apply(getpid, follow=d4)
79 r4c = client.apply(getpid, after=d3, follow=d4)
80 r5 = client.apply(getpid, after=d5)
81 r5b = client.apply(getpid, follow=d5, after=d3)
82 r6 = client.apply(getpid, follow=d6)
83 r6b = client.apply(getpid, after=d6, follow=d2b)
84
85 def should_fail(f):
86 try:
87 f()
88 except error.KernelError:
89 pass
90 else:
91 print 'should have raised'
92 # raise Exception("should have raised")
93
94 # print r1a.msg_ids
95 r1a.get()
96 # print r1b.msg_ids
97 r1b.get()
98 # print r2a.msg_ids
99 should_fail(r2a.get)
100 # print r2b.msg_ids
101 should_fail(r2b.get)
102 # print r3.msg_ids
103 should_fail(r3.get)
104 # print r4a.msg_ids
105 r4a.get()
106 # print r4b.msg_ids
107 r4b.get()
108 # print r4c.msg_ids
109 should_fail(r4c.get)
110 # print r5.msg_ids
111 r5.get()
112 # print r5b.msg_ids
113 should_fail(r5b.get)
114 # print r6.msg_ids
115 should_fail(r6.get) # assuming > 1 engine
116 # print r6b.msg_ids
117 should_fail(r6b.get)
118 print 'done'
@@ -0,0 +1,36 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client()
4
5 @remote(client, block=True)
6 def square(a):
7 """return square of a number"""
8 return a*a
9
10 squares = map(square, range(42))
11
12 # but that blocked between each result; not exactly useful
13
14 square.block = False
15
16 arlist = map(square, range(42))
17 # submitted very fast
18
19 # wait for the results:
20 squares2 = [ r.get() for r in arlist ]
21
22 # now the more convenient @parallel decorator, which has a map method:
23
24 @parallel(client, block=False)
25 def psquare(a):
26 """return square of a number"""
27 return a*a
28
29 # this chunks the data into n-negines jobs, not 42 jobs:
30 ar = psquare.map(range(42))
31
32 # wait for the results to be done:
33 squares3 = ar.get()
34
35 print squares == squares2, squares3==squares
36 # True No newline at end of file
@@ -1,106 +1,106 b''
1 1 """Example for generating an arbitrary DAG as a dependency map.
2 2
3 3 This demo uses networkx to generate the graph.
4 4
5 5 Authors
6 6 -------
7 7 * MinRK
8 8 """
9 9 import networkx as nx
10 10 from random import randint, random
11 11 from IPython.zmq.parallel import client as cmod
12 12
13 13 def randomwait():
14 14 import time
15 15 from random import random
16 16 time.sleep(random())
17 17 return time.time()
18 18
19 19
20 20 def random_dag(nodes, edges):
21 21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
22 22 G = nx.DiGraph()
23 23 for i in range(nodes):
24 24 G.add_node(i)
25 25 while edges > 0:
26 26 a = randint(0,nodes-1)
27 27 b=a
28 28 while b==a:
29 29 b = randint(0,nodes-1)
30 30 G.add_edge(a,b)
31 31 if nx.is_directed_acyclic_graph(G):
32 32 edges -= 1
33 33 else:
34 34 # we closed a loop!
35 35 G.remove_edge(a,b)
36 36 return G
37 37
38 38 def add_children(G, parent, level, n=2):
39 39 """Add children recursively to a binary tree."""
40 40 if level == 0:
41 41 return
42 42 for i in range(n):
43 43 child = parent+str(i)
44 44 G.add_node(child)
45 45 G.add_edge(parent,child)
46 46 add_children(G, child, level-1, n)
47 47
48 48 def make_bintree(levels):
49 49 """Make a symmetrical binary tree with @levels"""
50 50 G = nx.DiGraph()
51 51 root = '0'
52 52 G.add_node(root)
53 53 add_children(G, root, levels, 2)
54 54 return G
55 55
56 56 def submit_jobs(client, G, jobs):
57 57 """Submit jobs via client where G describes the time dependencies."""
58 58 results = {}
59 59 for node in nx.topological_sort(G):
60 60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
61 61 results[node] = client.apply(jobs[node], after=deps)
62 62 return results
63 63
64 def validate_tree(G, times):
64 def validate_tree(G, results):
65 65 """Validate that jobs executed after their dependencies."""
66 66 for node in G:
67 t = times[node]
67 started = results[node].metadata.started
68 68 for parent in G.predecessors(node):
69 pt = times[parent]
70 assert t > pt, "%s should have happened after %s"%(node, parent)
69 finished = results[parent].metadata.completed
70 assert started > finished, "%s should have happened after %s"%(node, parent)
71 71
72 72 def main(nodes, edges):
73 73 """Generate a random graph, submit jobs, then validate that the
74 74 dependency order was enforced.
75 75 Finally, plot the graph, with time on the x-axis, and
76 76 in-degree on the y (just for spread). All arrows must
77 77 point at least slightly to the right if the graph is valid.
78 78 """
79 from matplotlib.dates import date2num
79 80 print "building DAG"
80 81 G = random_dag(nodes, edges)
81 82 jobs = {}
82 msg_ids = {}
83 times = {}
84 83 pos = {}
85 84 for node in G:
86 85 jobs[node] = randomwait
87 86
88 client = cmod.Client('tcp://127.0.0.1:10101')
87 client = cmod.Client()
89 88 print "submitting tasks"
90 89 results = submit_jobs(client, G, jobs)
91 90 print "waiting for results"
92 91 client.barrier()
93 92 print "done"
94 93 for node in G:
95 times[node] = results[node].get()
96 pos[node] = (times[node], G.in_degree(node)+random())
94 # times[node] = results[node].get()
95 t = date2num(results[node].metadata.started)
96 pos[node] = (t, G.in_degree(node)+random())
97 97
98 validate_tree(G, times)
98 validate_tree(G, results)
99 99 nx.draw(G, pos)
100 return G,times,msg_ids
100 return G,results
101 101
102 102 if __name__ == '__main__':
103 103 import pylab
104 104 main(32,128)
105 105 pylab.show()
106 106 No newline at end of file
1 NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
@@ -1,22 +1,22 b''
1 1 from IPython.zmq.parallel.client import *
2 2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4 4
5 5 @remote(client, bound=True)
6 6 def getkey(name):
7 7 """fetch something from globals"""
8 8 return globals().get(name)
9 9
10 10 @remote(client, bound=True, targets='all')
11 11 def setpids():
12 12 import os
13 13 globals()['pid'] = os.getpid()
14 14
15 15 # set pid in the globals
16 16 setpids()
17 17 getkey('pid')
18 18 getkey.targets=[1,2]
19 19 getkey('pid')
20 20 getkey.bound=False
21 21 getkey('pid') is None
22 22
@@ -1,89 +1,86 b''
1 1 import time
2 2 import numpy as np
3 3 from IPython.zmq.parallel import client as clientmod
4 4
5 5 nlist = map(int, np.logspace(2,9,16,base=2))
6 6 nlist2 = map(int, np.logspace(2,8,15,base=2))
7 7 tlist = map(int, np.logspace(7,22,16,base=2))
8 8 nt = 16
9 9 def wait(t=0):
10 10 import time
11 11 time.sleep(t)
12 12
13 13 def echo(s=''):
14 14 return s
15 15
16 16 def time_throughput(nmessages, t=0, f=wait):
17 client = clientmod.Client('tcp://127.0.0.1:10101')
17 client = clientmod.Client()
18 18 view = client[None]
19 19 # do one ping before starting timing
20 20 if f is echo:
21 21 t = np.random.random(t/8)
22 22 view.apply_sync(echo, '')
23 23 client.spin()
24 24 tic = time.time()
25 25 for i in xrange(nmessages):
26 26 view.apply(f, t)
27 27 lap = time.time()
28 28 client.barrier()
29 29 toc = time.time()
30 30 return lap-tic, toc-tic
31 31
32 32 def time_twisted(nmessages, t=0, f=wait):
33 33 from IPython.kernel import client as kc
34 34 client = kc.TaskClient()
35 35 if f is wait:
36 36 s = "import time; time.sleep(%f)"%t
37 37 task = kc.StringTask(s)
38 38 elif f is echo:
39 39 t = np.random.random(t/8)
40 40 s = "s=t"
41 41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
42 42 else:
43 43 raise
44 44 # do one ping before starting timing
45 45 client.barrier(client.run(task))
46 46 tic = time.time()
47 47 tids = []
48 48 for i in xrange(nmessages):
49 49 tids.append(client.run(task))
50 50 lap = time.time()
51 51 client.barrier(tids)
52 52 toc = time.time()
53 53 return lap-tic, toc-tic
54 54
55 55 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
56 56 A = np.zeros((len(nlist),2))
57 57 for i,n in enumerate(nlist):
58 58 t1 = t2 = 0
59 59 for _ in range(trials):
60 60 time.sleep(.25)
61 61 ts = runner(n,t,f)
62 62 t1 += ts[0]
63 63 t2 += ts[1]
64 64 t1 /= trials
65 65 t2 /= trials
66 66 A[i] = (t1,t2)
67 67 A[i] = n/A[i]
68 68 print n,A[i]
69 69 return A
70 70
71 71 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
72 72 A = np.zeros((len(tlist),2))
73 73 for i,t in enumerate(tlist):
74 74 t1 = t2 = 0
75 75 for _ in range(trials):
76 76 time.sleep(.25)
77 77 ts = runner(n,t,f)
78 78 t1 += ts[0]
79 79 t2 += ts[1]
80 80 t1 /= trials
81 81 t2 /= trials
82 82 A[i] = (t1,t2)
83 83 A[i] = n/A[i]
84 84 print t,A[i]
85 85 return A
86
87 def start_cluster(n, scheduler):
88 pass
89 86 No newline at end of file
@@ -1,15 +1,15 b''
1 1 from IPython.zmq.parallel.client import *
2 2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4 4
5 5 for id in client.ids:
6 6 client.push(dict(ids=id*id), targets=id)
7 7
8 8 rns = client[0]
9 9 rns['a'] = 5
10 10
11 11 print rns['a']
12 12
13 13 remotes = client[:]
14 14
15 15 print remotes['ids'] No newline at end of file
1 NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
@@ -1,21 +1,25 b''
1 1 #!/usr/bin/env python
2 2 """Python wrapper around a submitted workflow job.
3 3
4 4 In reality this would be a more sophisticated script, here we only illustrate
5 5 the basic idea by considering that a submitted 'job' is a Python string to be
6 6 executed.
7 7 """
8 8
9 9 import sys
10 10
11 11 argv = sys.argv
12 12
13 from IPython.zmq.parallel.engine import main
13 from IPython.zmq.parallel.engine import EngineFactory
14 from IPython.zmq.parallel.ipengineapp import launch_new_instance
14 15
15 16 ns = {}
16 17
17 18 # job
18 19 exec sys.argv[1] in ns
19 20
21 # this should really use Config:
22 EngineFactory.user_ns = ns
23
20 24 # start engine with job namespace
21 main([], user_ns=ns)
25 launch_new_instance()
@@ -1,44 +1,44 b''
1 1 """Mock workflow manager.
2 2
3 3 This is a mock work manager whose submitted 'jobs' simply consist of executing
4 4 a python string. What we want is to see the implementation of the ipython
5 5 controller part.
6 6 """
7 7
8 8 from __future__ import print_function
9 9
10 10 import atexit
11 11 import sys
12 12
13 13 from subprocess import Popen
14 14
15 15 def cleanup(controller, engines):
16 16 """Cleanup routine to shut down all subprocesses we opened."""
17 17 import signal, time
18 18
19 19 print('Starting cleanup')
20 20 print('Stopping engines...')
21 21 for e in engines:
22 22 e.send_signal(signal.SIGINT)
23 23 print('Stopping controller...')
24 24 # so it can shut down its queues
25 25 controller.send_signal(signal.SIGINT)
26 26 time.sleep(0.1)
27 27 print('Killing controller...')
28 28 controller.kill()
29 29 print('Cleanup done')
30 30
31 31
32 32 if __name__ == '__main__':
33 33
34 34 # Start controller in separate process
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
36 36 print('Started controller')
37 37
38 38 # "Submit jobs"
39 39 eng = []
40 40 for i in range(4):
41 41 eng.append(Popen(['python', 'job_wrapper.py','x=%s' % i]))
42 42
43 43 # Ensure that all subpro
44 44 atexit.register(lambda : cleanup(cont, eng))
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now