##// END OF EJS Templates
updated newparallel examples, moved into docs
MinRK -
Show More
@@ -0,0 +1,118 b''
1 from IPython.zmq.parallel import error
2 from IPython.zmq.parallel.dependency import Dependency
3 from IPython.zmq.parallel.client import *
4
5 client = Client()
6
7 # this will only run on machines that can import numpy:
8 @require('numpy')
9 def norm(A):
10 from numpy.linalg import norm
11 return norm(A,2)
12
13 def checkpid(pid):
14 """return the pid of the engine"""
15 import os
16 return os.getpid() == pid
17
18 def checkhostname(host):
19 import socket
20 return socket.gethostname() == host
21
22 def getpid():
23 import os
24 return os.getpid()
25
26 pid0 = client[0].apply_sync(getpid)
27
28 # this will depend on the pid being that of target 0:
29 @depend(checkpid, pid0)
30 def getpid2():
31 import os
32 return os.getpid()
33
34 view = client[None]
35 view.block=True
36
37 # will run on anything:
38 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
39 print pids1
40 # will only run on e0:
41 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
42 print pids2
43
44 print "now test some dependency behaviors"
45
46 def wait(t):
47 import time
48 time.sleep(t)
49 return t
50
51 # fail after some time:
52 def wait_and_fail(t):
53 import time
54 time.sleep(t)
55 return 1/0
56
57 successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids)) ]
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59
60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
69
70 client.block = False
71
72 r1a = client.apply(getpid, after=d1a)
73 r1b = client.apply(getpid, follow=d1b)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
76 r3 = client.apply(getpid, after=d3)
77 r4a = client.apply(getpid, after=d4)
78 r4b = client.apply(getpid, follow=d4)
79 r4c = client.apply(getpid, after=d3, follow=d4)
80 r5 = client.apply(getpid, after=d5)
81 r5b = client.apply(getpid, follow=d5, after=d3)
82 r6 = client.apply(getpid, follow=d6)
83 r6b = client.apply(getpid, after=d6, follow=d2b)
84
85 def should_fail(f):
86 try:
87 f()
88 except error.KernelError:
89 pass
90 else:
91 print 'should have raised'
92 # raise Exception("should have raised")
93
94 # print r1a.msg_ids
95 r1a.get()
96 # print r1b.msg_ids
97 r1b.get()
98 # print r2a.msg_ids
99 should_fail(r2a.get)
100 # print r2b.msg_ids
101 should_fail(r2b.get)
102 # print r3.msg_ids
103 should_fail(r3.get)
104 # print r4a.msg_ids
105 r4a.get()
106 # print r4b.msg_ids
107 r4b.get()
108 # print r4c.msg_ids
109 should_fail(r4c.get)
110 # print r5.msg_ids
111 r5.get()
112 # print r5b.msg_ids
113 should_fail(r5b.get)
114 # print r6.msg_ids
115 should_fail(r6.get) # assuming > 1 engine
116 # print r6b.msg_ids
117 should_fail(r6b.get)
118 print 'done'
@@ -0,0 +1,36 b''
1 from IPython.zmq.parallel.client import *
2
3 client = Client()
4
5 @remote(client, block=True)
6 def square(a):
7 """return square of a number"""
8 return a*a
9
10 squares = map(square, range(42))
11
12 # but that blocked between each result; not exactly useful
13
14 square.block = False
15
16 arlist = map(square, range(42))
17 # submitted very fast
18
19 # wait for the results:
20 squares2 = [ r.get() for r in arlist ]
21
22 # now the more convenient @parallel decorator, which has a map method:
23
24 @parallel(client, block=False)
25 def psquare(a):
26 """return square of a number"""
27 return a*a
28
29 # this chunks the data into n-negines jobs, not 42 jobs:
30 ar = psquare.map(range(42))
31
32 # wait for the results to be done:
33 squares3 = ar.get()
34
35 print squares == squares2, squares3==squares
36 # True No newline at end of file
@@ -1,106 +1,106 b''
1 """Example for generating an arbitrary DAG as a dependency map.
1 """Example for generating an arbitrary DAG as a dependency map.
2
2
3 This demo uses networkx to generate the graph.
3 This demo uses networkx to generate the graph.
4
4
5 Authors
5 Authors
6 -------
6 -------
7 * MinRK
7 * MinRK
8 """
8 """
9 import networkx as nx
9 import networkx as nx
10 from random import randint, random
10 from random import randint, random
11 from IPython.zmq.parallel import client as cmod
11 from IPython.zmq.parallel import client as cmod
12
12
13 def randomwait():
13 def randomwait():
14 import time
14 import time
15 from random import random
15 from random import random
16 time.sleep(random())
16 time.sleep(random())
17 return time.time()
17 return time.time()
18
18
19
19
20 def random_dag(nodes, edges):
20 def random_dag(nodes, edges):
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
22 G = nx.DiGraph()
22 G = nx.DiGraph()
23 for i in range(nodes):
23 for i in range(nodes):
24 G.add_node(i)
24 G.add_node(i)
25 while edges > 0:
25 while edges > 0:
26 a = randint(0,nodes-1)
26 a = randint(0,nodes-1)
27 b=a
27 b=a
28 while b==a:
28 while b==a:
29 b = randint(0,nodes-1)
29 b = randint(0,nodes-1)
30 G.add_edge(a,b)
30 G.add_edge(a,b)
31 if nx.is_directed_acyclic_graph(G):
31 if nx.is_directed_acyclic_graph(G):
32 edges -= 1
32 edges -= 1
33 else:
33 else:
34 # we closed a loop!
34 # we closed a loop!
35 G.remove_edge(a,b)
35 G.remove_edge(a,b)
36 return G
36 return G
37
37
38 def add_children(G, parent, level, n=2):
38 def add_children(G, parent, level, n=2):
39 """Add children recursively to a binary tree."""
39 """Add children recursively to a binary tree."""
40 if level == 0:
40 if level == 0:
41 return
41 return
42 for i in range(n):
42 for i in range(n):
43 child = parent+str(i)
43 child = parent+str(i)
44 G.add_node(child)
44 G.add_node(child)
45 G.add_edge(parent,child)
45 G.add_edge(parent,child)
46 add_children(G, child, level-1, n)
46 add_children(G, child, level-1, n)
47
47
48 def make_bintree(levels):
48 def make_bintree(levels):
49 """Make a symmetrical binary tree with @levels"""
49 """Make a symmetrical binary tree with @levels"""
50 G = nx.DiGraph()
50 G = nx.DiGraph()
51 root = '0'
51 root = '0'
52 G.add_node(root)
52 G.add_node(root)
53 add_children(G, root, levels, 2)
53 add_children(G, root, levels, 2)
54 return G
54 return G
55
55
56 def submit_jobs(client, G, jobs):
56 def submit_jobs(client, G, jobs):
57 """Submit jobs via client where G describes the time dependencies."""
57 """Submit jobs via client where G describes the time dependencies."""
58 results = {}
58 results = {}
59 for node in nx.topological_sort(G):
59 for node in nx.topological_sort(G):
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
61 results[node] = client.apply(jobs[node], after=deps)
61 results[node] = client.apply(jobs[node], after=deps)
62 return results
62 return results
63
63
64 def validate_tree(G, times):
64 def validate_tree(G, results):
65 """Validate that jobs executed after their dependencies."""
65 """Validate that jobs executed after their dependencies."""
66 for node in G:
66 for node in G:
67 t = times[node]
67 started = results[node].metadata.started
68 for parent in G.predecessors(node):
68 for parent in G.predecessors(node):
69 pt = times[parent]
69 finished = results[parent].metadata.completed
70 assert t > pt, "%s should have happened after %s"%(node, parent)
70 assert started > finished, "%s should have happened after %s"%(node, parent)
71
71
72 def main(nodes, edges):
72 def main(nodes, edges):
73 """Generate a random graph, submit jobs, then validate that the
73 """Generate a random graph, submit jobs, then validate that the
74 dependency order was enforced.
74 dependency order was enforced.
75 Finally, plot the graph, with time on the x-axis, and
75 Finally, plot the graph, with time on the x-axis, and
76 in-degree on the y (just for spread). All arrows must
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 from matplotlib.dates import date2num
79 print "building DAG"
80 print "building DAG"
80 G = random_dag(nodes, edges)
81 G = random_dag(nodes, edges)
81 jobs = {}
82 jobs = {}
82 msg_ids = {}
83 times = {}
84 pos = {}
83 pos = {}
85 for node in G:
84 for node in G:
86 jobs[node] = randomwait
85 jobs[node] = randomwait
87
86
88 client = cmod.Client('tcp://127.0.0.1:10101')
87 client = cmod.Client()
89 print "submitting tasks"
88 print "submitting tasks"
90 results = submit_jobs(client, G, jobs)
89 results = submit_jobs(client, G, jobs)
91 print "waiting for results"
90 print "waiting for results"
92 client.barrier()
91 client.barrier()
93 print "done"
92 print "done"
94 for node in G:
93 for node in G:
95 times[node] = results[node].get()
94 # times[node] = results[node].get()
96 pos[node] = (times[node], G.in_degree(node)+random())
95 t = date2num(results[node].metadata.started)
96 pos[node] = (t, G.in_degree(node)+random())
97
97
98 validate_tree(G, times)
98 validate_tree(G, results)
99 nx.draw(G, pos)
99 nx.draw(G, pos)
100 return G,times,msg_ids
100 return G,results
101
101
102 if __name__ == '__main__':
102 if __name__ == '__main__':
103 import pylab
103 import pylab
104 main(32,128)
104 main(32,128)
105 pylab.show()
105 pylab.show()
106 No newline at end of file
106
1 NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
NO CONTENT: file renamed from examples/demo/noncopying.py to docs/examples/newparallel/demo/noncopying.py
@@ -1,22 +1,22 b''
1 from IPython.zmq.parallel.client import *
1 from IPython.zmq.parallel.client import *
2
2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4
4
5 @remote(client, bound=True)
5 @remote(client, bound=True)
6 def getkey(name):
6 def getkey(name):
7 """fetch something from globals"""
7 """fetch something from globals"""
8 return globals().get(name)
8 return globals().get(name)
9
9
10 @remote(client, bound=True, targets='all')
10 @remote(client, bound=True, targets='all')
11 def setpids():
11 def setpids():
12 import os
12 import os
13 globals()['pid'] = os.getpid()
13 globals()['pid'] = os.getpid()
14
14
15 # set pid in the globals
15 # set pid in the globals
16 setpids()
16 setpids()
17 getkey('pid')
17 getkey('pid')
18 getkey.targets=[1,2]
18 getkey.targets=[1,2]
19 getkey('pid')
19 getkey('pid')
20 getkey.bound=False
20 getkey.bound=False
21 getkey('pid') is None
21 getkey('pid') is None
22
22
@@ -1,89 +1,86 b''
1 import time
1 import time
2 import numpy as np
2 import numpy as np
3 from IPython.zmq.parallel import client as clientmod
3 from IPython.zmq.parallel import client as clientmod
4
4
5 nlist = map(int, np.logspace(2,9,16,base=2))
5 nlist = map(int, np.logspace(2,9,16,base=2))
6 nlist2 = map(int, np.logspace(2,8,15,base=2))
6 nlist2 = map(int, np.logspace(2,8,15,base=2))
7 tlist = map(int, np.logspace(7,22,16,base=2))
7 tlist = map(int, np.logspace(7,22,16,base=2))
8 nt = 16
8 nt = 16
9 def wait(t=0):
9 def wait(t=0):
10 import time
10 import time
11 time.sleep(t)
11 time.sleep(t)
12
12
13 def echo(s=''):
13 def echo(s=''):
14 return s
14 return s
15
15
16 def time_throughput(nmessages, t=0, f=wait):
16 def time_throughput(nmessages, t=0, f=wait):
17 client = clientmod.Client('tcp://127.0.0.1:10101')
17 client = clientmod.Client()
18 view = client[None]
18 view = client[None]
19 # do one ping before starting timing
19 # do one ping before starting timing
20 if f is echo:
20 if f is echo:
21 t = np.random.random(t/8)
21 t = np.random.random(t/8)
22 view.apply_sync(echo, '')
22 view.apply_sync(echo, '')
23 client.spin()
23 client.spin()
24 tic = time.time()
24 tic = time.time()
25 for i in xrange(nmessages):
25 for i in xrange(nmessages):
26 view.apply(f, t)
26 view.apply(f, t)
27 lap = time.time()
27 lap = time.time()
28 client.barrier()
28 client.barrier()
29 toc = time.time()
29 toc = time.time()
30 return lap-tic, toc-tic
30 return lap-tic, toc-tic
31
31
32 def time_twisted(nmessages, t=0, f=wait):
32 def time_twisted(nmessages, t=0, f=wait):
33 from IPython.kernel import client as kc
33 from IPython.kernel import client as kc
34 client = kc.TaskClient()
34 client = kc.TaskClient()
35 if f is wait:
35 if f is wait:
36 s = "import time; time.sleep(%f)"%t
36 s = "import time; time.sleep(%f)"%t
37 task = kc.StringTask(s)
37 task = kc.StringTask(s)
38 elif f is echo:
38 elif f is echo:
39 t = np.random.random(t/8)
39 t = np.random.random(t/8)
40 s = "s=t"
40 s = "s=t"
41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
42 else:
42 else:
43 raise
43 raise
44 # do one ping before starting timing
44 # do one ping before starting timing
45 client.barrier(client.run(task))
45 client.barrier(client.run(task))
46 tic = time.time()
46 tic = time.time()
47 tids = []
47 tids = []
48 for i in xrange(nmessages):
48 for i in xrange(nmessages):
49 tids.append(client.run(task))
49 tids.append(client.run(task))
50 lap = time.time()
50 lap = time.time()
51 client.barrier(tids)
51 client.barrier(tids)
52 toc = time.time()
52 toc = time.time()
53 return lap-tic, toc-tic
53 return lap-tic, toc-tic
54
54
55 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
55 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
56 A = np.zeros((len(nlist),2))
56 A = np.zeros((len(nlist),2))
57 for i,n in enumerate(nlist):
57 for i,n in enumerate(nlist):
58 t1 = t2 = 0
58 t1 = t2 = 0
59 for _ in range(trials):
59 for _ in range(trials):
60 time.sleep(.25)
60 time.sleep(.25)
61 ts = runner(n,t,f)
61 ts = runner(n,t,f)
62 t1 += ts[0]
62 t1 += ts[0]
63 t2 += ts[1]
63 t2 += ts[1]
64 t1 /= trials
64 t1 /= trials
65 t2 /= trials
65 t2 /= trials
66 A[i] = (t1,t2)
66 A[i] = (t1,t2)
67 A[i] = n/A[i]
67 A[i] = n/A[i]
68 print n,A[i]
68 print n,A[i]
69 return A
69 return A
70
70
71 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
71 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
72 A = np.zeros((len(tlist),2))
72 A = np.zeros((len(tlist),2))
73 for i,t in enumerate(tlist):
73 for i,t in enumerate(tlist):
74 t1 = t2 = 0
74 t1 = t2 = 0
75 for _ in range(trials):
75 for _ in range(trials):
76 time.sleep(.25)
76 time.sleep(.25)
77 ts = runner(n,t,f)
77 ts = runner(n,t,f)
78 t1 += ts[0]
78 t1 += ts[0]
79 t2 += ts[1]
79 t2 += ts[1]
80 t1 /= trials
80 t1 /= trials
81 t2 /= trials
81 t2 /= trials
82 A[i] = (t1,t2)
82 A[i] = (t1,t2)
83 A[i] = n/A[i]
83 A[i] = n/A[i]
84 print t,A[i]
84 print t,A[i]
85 return A
85 return A
86
87 def start_cluster(n, scheduler):
88 pass
89 No newline at end of file
86
@@ -1,15 +1,15 b''
1 from IPython.zmq.parallel.client import *
1 from IPython.zmq.parallel.client import *
2
2
3 client = Client('tcp://127.0.0.1:10101')
3 client = Client()
4
4
5 for id in client.ids:
5 for id in client.ids:
6 client.push(dict(ids=id*id), targets=id)
6 client.push(dict(ids=id*id), targets=id)
7
7
8 rns = client[0]
8 rns = client[0]
9 rns['a'] = 5
9 rns['a'] = 5
10
10
11 print rns['a']
11 print rns['a']
12
12
13 remotes = client[:]
13 remotes = client[:]
14
14
15 print remotes['ids'] No newline at end of file
15 print remotes['ids']
1 NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
NO CONTENT: file renamed from examples/workflow/client.py to docs/examples/newparallel/workflow/client.py
@@ -1,21 +1,25 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Python wrapper around a submitted workflow job.
2 """Python wrapper around a submitted workflow job.
3
3
4 In reality this would be a more sophisticated script, here we only illustrate
4 In reality this would be a more sophisticated script, here we only illustrate
5 the basic idea by considering that a submitted 'job' is a Python string to be
5 the basic idea by considering that a submitted 'job' is a Python string to be
6 executed.
6 executed.
7 """
7 """
8
8
9 import sys
9 import sys
10
10
11 argv = sys.argv
11 argv = sys.argv
12
12
13 from IPython.zmq.parallel.engine import main
13 from IPython.zmq.parallel.engine import EngineFactory
14 from IPython.zmq.parallel.ipengineapp import launch_new_instance
14
15
15 ns = {}
16 ns = {}
16
17
17 # job
18 # job
18 exec sys.argv[1] in ns
19 exec sys.argv[1] in ns
19
20
21 # this should really use Config:
22 EngineFactory.user_ns = ns
23
20 # start engine with job namespace
24 # start engine with job namespace
21 main([], user_ns=ns)
25 launch_new_instance()
@@ -1,44 +1,44 b''
1 """Mock workflow manager.
1 """Mock workflow manager.
2
2
3 This is a mock work manager whose submitted 'jobs' simply consist of executing
3 This is a mock work manager whose submitted 'jobs' simply consist of executing
4 a python string. What we want is to see the implementation of the ipython
4 a python string. What we want is to see the implementation of the ipython
5 controller part.
5 controller part.
6 """
6 """
7
7
8 from __future__ import print_function
8 from __future__ import print_function
9
9
10 import atexit
10 import atexit
11 import sys
11 import sys
12
12
13 from subprocess import Popen
13 from subprocess import Popen
14
14
15 def cleanup(controller, engines):
15 def cleanup(controller, engines):
16 """Cleanup routine to shut down all subprocesses we opened."""
16 """Cleanup routine to shut down all subprocesses we opened."""
17 import signal, time
17 import signal, time
18
18
19 print('Starting cleanup')
19 print('Starting cleanup')
20 print('Stopping engines...')
20 print('Stopping engines...')
21 for e in engines:
21 for e in engines:
22 e.send_signal(signal.SIGINT)
22 e.send_signal(signal.SIGINT)
23 print('Stopping controller...')
23 print('Stopping controller...')
24 # so it can shut down its queues
24 # so it can shut down its queues
25 controller.send_signal(signal.SIGINT)
25 controller.send_signal(signal.SIGINT)
26 time.sleep(0.1)
26 time.sleep(0.1)
27 print('Killing controller...')
27 print('Killing controller...')
28 controller.kill()
28 controller.kill()
29 print('Cleanup done')
29 print('Cleanup done')
30
30
31
31
32 if __name__ == '__main__':
32 if __name__ == '__main__':
33
33
34 # Start controller in separate process
34 # Start controller in separate process
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.controller'])
35 cont = Popen(['python', '-m', 'IPython.zmq.parallel.ipcontrollerapp'])
36 print('Started controller')
36 print('Started controller')
37
37
38 # "Submit jobs"
38 # "Submit jobs"
39 eng = []
39 eng = []
40 for i in range(4):
40 for i in range(4):
41 eng.append(Popen(['python', 'job_wrapper.py','x=%s' % i]))
41 eng.append(Popen(['python', 'job_wrapper.py','x=%s' % i]))
42
42
43 # Ensure that all subpro
43 # Ensure that all subpro
44 atexit.register(lambda : cleanup(cont, eng))
44 atexit.register(lambda : cleanup(cont, eng))
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now