##// END OF EJS Templates
Fixed most of the examples. A few still don't work, but this is a start.
Brian E Granger -
Show More
@@ -1,23 +1,23 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 # A super simple example showing how to use all of this in a fully
4 # A super simple example showing how to use all of this in a fully
5 # asynchronous manner. The TaskClient also works in this mode.
5 # asynchronous manner. The TaskClient also works in this mode.
6
6
7 from twisted.internet import reactor, defer
7 from twisted.internet import reactor, defer
8 from ipython1.kernel import asyncclient
8 from IPython.kernel import asyncclient
9
9
10 def printer(r):
10 def printer(r):
11 print r
11 print r
12 return r
12 return r
13
13
14 def submit(client):
14 def submit(client):
15 d = client.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0,block=True)
15 d = client.push(dict(a=5, b='asdf', c=[1,2,3]),targets=0,block=True)
16 d.addCallback(lambda _: client.pull(('a','b','c'),targets=0,block=True))
16 d.addCallback(lambda _: client.pull(('a','b','c'),targets=0,block=True))
17 d.addBoth(printer)
17 d.addBoth(printer)
18 d.addCallback(lambda _: reactor.stop())
18 d.addCallback(lambda _: reactor.stop())
19
19
20 d = asyncclient.get_multiengine_client()
20 d = asyncclient.get_multiengine_client()
21 d.addCallback(submit)
21 d.addCallback(submit)
22
22
23 reactor.run() No newline at end of file
23 reactor.run()
@@ -1,31 +1,32 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 # This example shows how the AsynTaskClient can be used
4 # This example shows how the AsynTaskClient can be used
5 # This example is currently broken
5
6
6 from twisted.internet import reactor, defer
7 from twisted.internet import reactor, defer
7 from ipython1.kernel import asynclient
8 from IPython.kernel import asyncclient
8
9
9 mec = asynclient.AsynMultiEngineClient(('localhost', 10105))
10 mec = asyncclient.AsyncMultiEngineClient(('localhost', 10105))
10 tc = asynclient.AsynTaskClient(('localhost',10113))
11 tc = asyncclient.AsyncTaskClient(('localhost',10113))
11
12
12 cmd1 = """\
13 cmd1 = """\
13 a = 5
14 a = 5
14 b = 10*d
15 b = 10*d
15 c = a*b*d
16 c = a*b*d
16 """
17 """
17
18
18 t1 = asynclient.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
19 t1 = asyncclient.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
19
20
20 d = mec.push(dict(d=30))
21 d = mec.push(dict(d=30))
21
22
22 def raise_and_print(tr):
23 def raise_and_print(tr):
23 tr.raiseException()
24 tr.raiseException()
24 print "a, b: ", tr.ns.a, tr.ns.b
25 print "a, b: ", tr.ns.a, tr.ns.b
25 return tr
26 return tr
26
27
27 d.addCallback(lambda _: tc.run(t1))
28 d.addCallback(lambda _: tc.run(t1))
28 d.addCallback(lambda tid: tc.get_task_result(tid,block=True))
29 d.addCallback(lambda tid: tc.get_task_result(tid,block=True))
29 d.addCallback(raise_and_print)
30 d.addCallback(raise_and_print)
30 d.addCallback(lambda _: reactor.stop())
31 d.addCallback(lambda _: reactor.stop())
31 reactor.run()
32 reactor.run()
@@ -1,90 +1,90 b''
1 """
1 """
2 An exceptionally lousy site spider
2 An exceptionally lousy site spider
3 Ken Kinder <ken@kenkinder.com>
3 Ken Kinder <ken@kenkinder.com>
4
4
5 This module gives an example of how the TaskClient interface to the
5 This module gives an example of how the TaskClient interface to the
6 IPython controller works. Before running this script start the IPython controller
6 IPython controller works. Before running this script start the IPython controller
7 and some engines using something like::
7 and some engines using something like::
8
8
9 ipcluster -n 4
9 ipcluster -n 4
10 """
10 """
11 from twisted.python.failure import Failure
11 from twisted.python.failure import Failure
12 from ipython1.kernel import client
12 from IPython.kernel import client
13 import time
13 import time
14
14
15 fetchParse = """
15 fetchParse = """
16 from twisted.web import microdom
16 from twisted.web import microdom
17 import urllib2
17 import urllib2
18 import urlparse
18 import urlparse
19
19
20 def fetchAndParse(url, data=None):
20 def fetchAndParse(url, data=None):
21 links = []
21 links = []
22 try:
22 try:
23 page = urllib2.urlopen(url, data=data)
23 page = urllib2.urlopen(url, data=data)
24 except Exception:
24 except Exception:
25 return links
25 return links
26 else:
26 else:
27 if page.headers.type == 'text/html':
27 if page.headers.type == 'text/html':
28 doc = microdom.parseString(page.read(), beExtremelyLenient=True)
28 doc = microdom.parseString(page.read(), beExtremelyLenient=True)
29 for node in doc.getElementsByTagName('a'):
29 for node in doc.getElementsByTagName('a'):
30 if node.getAttribute('href'):
30 if node.getAttribute('href'):
31 links.append(urlparse.urljoin(url, node.getAttribute('href')))
31 links.append(urlparse.urljoin(url, node.getAttribute('href')))
32 return links
32 return links
33 """
33 """
34
34
35 class DistributedSpider(object):
35 class DistributedSpider(object):
36
36
37 # Time to wait between polling for task results.
37 # Time to wait between polling for task results.
38 pollingDelay = 0.5
38 pollingDelay = 0.5
39
39
40 def __init__(self, site):
40 def __init__(self, site):
41 self.tc = client.TaskClient()
41 self.tc = client.TaskClient()
42 self.rc = client.MultiEngineClient()
42 self.rc = client.MultiEngineClient()
43 self.rc.execute(fetchParse)
43 self.rc.execute(fetchParse)
44
44
45 self.allLinks = []
45 self.allLinks = []
46 self.linksWorking = {}
46 self.linksWorking = {}
47 self.linksDone = {}
47 self.linksDone = {}
48
48
49 self.site = site
49 self.site = site
50
50
51 def visitLink(self, url):
51 def visitLink(self, url):
52 if url not in self.allLinks:
52 if url not in self.allLinks:
53 self.allLinks.append(url)
53 self.allLinks.append(url)
54 if url.startswith(self.site):
54 if url.startswith(self.site):
55 print ' ', url
55 print ' ', url
56 self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
56 self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
57
57
58 def onVisitDone(self, result, url):
58 def onVisitDone(self, result, url):
59 print url, ':'
59 print url, ':'
60 self.linksDone[url] = None
60 self.linksDone[url] = None
61 del self.linksWorking[url]
61 del self.linksWorking[url]
62 if isinstance(result.failure, Failure):
62 if isinstance(result.failure, Failure):
63 txt = result.failure.getTraceback()
63 txt = result.failure.getTraceback()
64 for line in txt.split('\n'):
64 for line in txt.split('\n'):
65 print ' ', line
65 print ' ', line
66 else:
66 else:
67 for link in result.ns.links:
67 for link in result.ns.links:
68 self.visitLink(link)
68 self.visitLink(link)
69
69
70 def run(self):
70 def run(self):
71 self.visitLink(self.site)
71 self.visitLink(self.site)
72 while self.linksWorking:
72 while self.linksWorking:
73 print len(self.linksWorking), 'pending...'
73 print len(self.linksWorking), 'pending...'
74 self.synchronize()
74 self.synchronize()
75 time.sleep(self.pollingDelay)
75 time.sleep(self.pollingDelay)
76
76
77 def synchronize(self):
77 def synchronize(self):
78 for url, taskId in self.linksWorking.items():
78 for url, taskId in self.linksWorking.items():
79 # Calling get_task_result with block=False will return None if the
79 # Calling get_task_result with block=False will return None if the
80 # task is not done yet. This provides a simple way of polling.
80 # task is not done yet. This provides a simple way of polling.
81 result = self.tc.get_task_result(taskId, block=False)
81 result = self.tc.get_task_result(taskId, block=False)
82 if result is not None:
82 if result is not None:
83 self.onVisitDone(result, url)
83 self.onVisitDone(result, url)
84
84
85 def main():
85 def main():
86 distributedSpider = DistributedSpider(raw_input('Enter site to crawl: '))
86 distributedSpider = DistributedSpider(raw_input('Enter site to crawl: '))
87 distributedSpider.run()
87 distributedSpider.run()
88
88
89 if __name__ == '__main__':
89 if __name__ == '__main__':
90 main()
90 main()
@@ -1,14 +1,14 b''
1 """
1 """
2 A Distributed Hello world
2 A Distributed Hello world
3 Ken Kinder <ken@kenkinder.com>
3 Ken Kinder <ken@kenkinder.com>
4 """
4 """
5 from ipython1.kernel import client
5 from IPython.kernel import client
6
6
7 tc = client.TaskClient()
7 tc = client.TaskClient()
8 mec = client.MultiEngineClient()
8 mec = client.MultiEngineClient()
9
9
10 mec.execute('import time')
10 mec.execute('import time')
11 hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word')))
11 hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word')))
12 world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word')))
12 world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word')))
13 print "Submitted tasks:", hello_taskid, world_taskid
13 print "Submitted tasks:", hello_taskid, world_taskid
14 print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word
14 print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word
@@ -1,70 +1,70 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """Run a Monte-Carlo options pricer in parallel."""
3 """Run a Monte-Carlo options pricer in parallel."""
4
4
5 from ipython1.kernel import client
5 from IPython.kernel import client
6 import numpy as N
6 import numpy as N
7 from mcpricer import MCOptionPricer
7 from mcpricer import MCOptionPricer
8
8
9
9
10 tc = client.TaskClient()
10 tc = client.TaskClient()
11 rc = client.MultiEngineClient()
11 rc = client.MultiEngineClient()
12
12
13 # Initialize the common code on the engines
13 # Initialize the common code on the engines
14 rc.run('mcpricer.py')
14 rc.run('mcpricer.py')
15
15
16 # Push the variables that won't change (stock print, interest rate, days and MC paths)
16 # Push the variables that won't change (stock print, interest rate, days and MC paths)
17 rc.push(dict(S=100.0, r=0.05, days=260, paths=10000))
17 rc.push(dict(S=100.0, r=0.05, days=260, paths=10000))
18
18
19 task_string = """\
19 task_string = """\
20 op = MCOptionPricer(S,K,sigma,r,days,paths)
20 op = MCOptionPricer(S,K,sigma,r,days,paths)
21 op.run()
21 op.run()
22 vp, ap, vc, ac = op.vanilla_put, op.asian_put, op.vanilla_call, op.asian_call
22 vp, ap, vc, ac = op.vanilla_put, op.asian_put, op.vanilla_call, op.asian_call
23 """
23 """
24
24
25 # Create arrays of strike prices and volatilities
25 # Create arrays of strike prices and volatilities
26 K_vals = N.arange(90.0,110.0,2.0)
26 K_vals = N.arange(90.0,110.0,2.0)
27 sigma_vals = N.arange(0.02, 0.3, 0.02)
27 sigma_vals = N.arange(0.02, 0.3, 0.02)
28
28
29 # Submit tasks
29 # Submit tasks
30 taskids = []
30 taskids = []
31 for K in K_vals:
31 for K in K_vals:
32 for sigma in sigma_vals:
32 for sigma in sigma_vals:
33 t = client.Task(task_string,
33 t = client.Task(task_string,
34 push=dict(sigma=sigma,K=K),
34 push=dict(sigma=sigma,K=K),
35 pull=('vp','ap','vc','ac','sigma','K'))
35 pull=('vp','ap','vc','ac','sigma','K'))
36 taskids.append(tc.run(t))
36 taskids.append(tc.run(t))
37
37
38 print "Submitted tasks: ", taskids
38 print "Submitted tasks: ", taskids
39
39
40 # Block until tasks are completed
40 # Block until tasks are completed
41 tc.barrier(taskids)
41 tc.barrier(taskids)
42
42
43 # Get the results
43 # Get the results
44 results = [tc.get_task_result(tid) for tid in taskids]
44 results = [tc.get_task_result(tid) for tid in taskids]
45
45
46 # Assemble the result
46 # Assemble the result
47 vc = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
47 vc = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
48 vp = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
48 vp = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
49 ac = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
49 ac = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
50 ap = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
50 ap = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
51 for i, tr in enumerate(results):
51 for i, tr in enumerate(results):
52 ns = tr.ns
52 ns = tr.ns
53 vc[i] = ns.vc
53 vc[i] = ns.vc
54 vp[i] = ns.vp
54 vp[i] = ns.vp
55 ac[i] = ns.ac
55 ac[i] = ns.ac
56 ap[i] = ns.ap
56 ap[i] = ns.ap
57 vc.shape = (K_vals.shape[0],sigma_vals.shape[0])
57 vc.shape = (K_vals.shape[0],sigma_vals.shape[0])
58 vp.shape = (K_vals.shape[0],sigma_vals.shape[0])
58 vp.shape = (K_vals.shape[0],sigma_vals.shape[0])
59 ac.shape = (K_vals.shape[0],sigma_vals.shape[0])
59 ac.shape = (K_vals.shape[0],sigma_vals.shape[0])
60 ap.shape = (K_vals.shape[0],sigma_vals.shape[0])
60 ap.shape = (K_vals.shape[0],sigma_vals.shape[0])
61
61
62
62
63 def plot_options(K_vals, sigma_vals, prices):
63 def plot_options(K_vals, sigma_vals, prices):
64 """Make a contour plot of the option prices."""
64 """Make a contour plot of the option prices."""
65 import pylab
65 import pylab
66 pylab.contourf(sigma_vals, K_vals, prices)
66 pylab.contourf(sigma_vals, K_vals, prices)
67 pylab.colorbar()
67 pylab.colorbar()
68 pylab.title("Option Price")
68 pylab.title("Option Price")
69 pylab.xlabel("Volatility")
69 pylab.xlabel("Volatility")
70 pylab.ylabel("Strike Price")
70 pylab.ylabel("Strike Price")
@@ -1,227 +1,227 b''
1 #-------------------------------------------------------------------------------
1 #-------------------------------------------------------------------------------
2 # Imports
2 # Imports
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4
4
5 import time
5 import time
6 import numpy
6 import numpy
7
7
8 import ipython1.kernel.magic
8 import IPython.kernel.magic
9 from ipython1.kernel import client
9 from IPython.kernel import client
10 from ipython1.kernel.error import *
10 from IPython.kernel.error import *
11
11
12 mec = client.MultiEngineClient()
12 mec = client.MultiEngineClient()
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Setup
15 # Setup
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 mec.reset()
18 mec.reset()
19 mec.activate()
19 mec.activate()
20 mec.block = True
20 mec.block = True
21 mec.get_ids()
21 mec.get_ids()
22 n = len(mec)
22 n = len(mec)
23 assert n >= 4, "Not Enough Engines: %i, 4 needed for this script"%n
23 assert n >= 4, "Not Enough Engines: %i, 4 needed for this script"%n
24
24
25 values = [
25 values = [
26 10,
26 10,
27 1.0,
27 1.0,
28 range(100),
28 range(100),
29 ('asdf', 1000),
29 ('asdf', 1000),
30 {'a': 10, 'b': 20}
30 {'a': 10, 'b': 20}
31 ]
31 ]
32
32
33 keys = ['a','b','c','d','e']
33 keys = ['a','b','c','d','e']
34
34
35 sequences = [
35 sequences = [
36 range(100),
36 range(100),
37 numpy.arange(100)
37 numpy.arange(100)
38 ]
38 ]
39
39
40 #-------------------------------------------------------------------------------
40 #-------------------------------------------------------------------------------
41 # Blocking execution
41 # Blocking execution
42 #-------------------------------------------------------------------------------
42 #-------------------------------------------------------------------------------
43
43
44 # Execute
44 # Execute
45
45
46 mec.execute('import math')
46 mec.execute('import math')
47 mec.execute('a = 2.0*math.pi')
47 mec.execute('a = 2.0*math.pi')
48 mec.execute('print a')
48 mec.execute('print a')
49
49
50 for id in mec.get_ids():
50 for id in mec.get_ids():
51 mec.execute('b=%d' % id, targets=id)
51 mec.execute('b=%d' % id, targets=id)
52
52
53
53
54 mec.execute('print b')
54 mec.execute('print b')
55
55
56 try:
56 try:
57 mec.execute('b = 10',targets=-1)
57 mec.execute('b = 10',targets=-1)
58 except InvalidEngineID:
58 except InvalidEngineID:
59 print "Caught invalid engine ID OK."
59 print "Caught invalid engine ID OK."
60
60
61 try:
61 try:
62 mec.execute('a=5; 1/0')
62 mec.execute('a=5; 1/0')
63 except CompositeError:
63 except CompositeError:
64 print "Caught 1/0 correctly."
64 print "Caught 1/0 correctly."
65
65
66
66
67
67
68 %px print a, b
68 %px print a, b
69 try:
69 try:
70 %px 1/0
70 %px 1/0
71 except CompositeError:
71 except CompositeError:
72 print "Caught 1/0 correctly."
72 print "Caught 1/0 correctly."
73
73
74
74
75 %autopx
75 %autopx
76
76
77 import numpy
77 import numpy
78 a = numpy.random.rand(4,4)
78 a = numpy.random.rand(4,4)
79 a = a+a.transpose()
79 a = a+a.transpose()
80 print numpy.linalg.eigvals(a)
80 print numpy.linalg.eigvals(a)
81
81
82 %autopx
82 %autopx
83
83
84
84
85 mec.targets = [0,2]
85 mec.targets = [0,2]
86 %px a = 5
86 %px a = 5
87 mec.targets = [1,3]
87 mec.targets = [1,3]
88 %px a = 10
88 %px a = 10
89 mec.targets = 'all'
89 mec.targets = 'all'
90 %px print a
90 %px print a
91
91
92
92
93 # Push/Pull
93 # Push/Pull
94
94
95 mec.push(dict(a=10, b=30, c={'f':range(10)}))
95 mec.push(dict(a=10, b=30, c={'f':range(10)}))
96 mec.pull(('a', 'b'))
96 mec.pull(('a', 'b'))
97 mec.zip_pull(('a', 'b'))
97 mec.zip_pull(('a', 'b'))
98
98
99 for id in mec.get_ids():
99 for id in mec.get_ids():
100 mec.push(dict(a=id), targets=id)
100 mec.push(dict(a=id), targets=id)
101
101
102
102
103 for id in mec.get_ids():
103 for id in mec.get_ids():
104 mec.pull('a', targets=id)
104 mec.pull('a', targets=id)
105
105
106
106
107 mec.pull('a')
107 mec.pull('a')
108
108
109
109
110 mec['a'] = 100
110 mec['a'] = 100
111 mec['a']
111 mec['a']
112
112
113 # get_result/reset/keys
113 # get_result/reset/keys
114
114
115 mec.get_result()
115 mec.get_result()
116 %result
116 %result
117 mec.keys()
117 mec.keys()
118 mec.reset()
118 mec.reset()
119 mec.keys()
119 mec.keys()
120
120
121 try:
121 try:
122 %result
122 %result
123 except CompositeError:
123 except CompositeError:
124 print "Caught IndexError ok."
124 print "Caught IndexError ok."
125
125
126
126
127 %px a = 5
127 %px a = 5
128 mec.get_result(1)
128 mec.get_result(1)
129 mec.keys()
129 mec.keys()
130
130
131 # Queue management methods
131 # Queue management methods
132
132
133 %px import time
133 %px import time
134 prs = [mec.execute('time.sleep(2.0)',block=False) for x in range(5)]
134 prs = [mec.execute('time.sleep(2.0)',block=False) for x in range(5)]
135
135
136
136
137 mec.queue_status()
137 mec.queue_status()
138 time.sleep(3.0)
138 time.sleep(3.0)
139 mec.clear_queue()
139 mec.clear_queue()
140 mec.queue_status()
140 mec.queue_status()
141 time.sleep(2.0)
141 time.sleep(2.0)
142 mec.queue_status()
142 mec.queue_status()
143
143
144 mec.barrier(prs)
144 mec.barrier(prs)
145
145
146 for pr in prs:
146 for pr in prs:
147 try:
147 try:
148 pr.r
148 pr.r
149 except CompositeError:
149 except CompositeError:
150 print "Caught QueueCleared OK."
150 print "Caught QueueCleared OK."
151
151
152
152
153 # scatter/gather
153 # scatter/gather
154
154
155 mec.scatter('a', range(10))
155 mec.scatter('a', range(10))
156 mec.gather('a')
156 mec.gather('a')
157 mec.scatter('b', numpy.arange(10))
157 mec.scatter('b', numpy.arange(10))
158 mec.gather('b')
158 mec.gather('b')
159
159
160 #-------------------------------------------------------------------------------
160 #-------------------------------------------------------------------------------
161 # Non-Blocking execution
161 # Non-Blocking execution
162 #-------------------------------------------------------------------------------
162 #-------------------------------------------------------------------------------
163
163
164 mec.block = False
164 mec.block = False
165
165
166 # execute
166 # execute
167
167
168 pr1 = mec.execute('a=5')
168 pr1 = mec.execute('a=5')
169 pr2 = mec.execute('import sets')
169 pr2 = mec.execute('import sets')
170
170
171 mec.barrier((pr1, pr2))
171 mec.barrier((pr1, pr2))
172
172
173 pr1 = mec.execute('1/0')
173 pr1 = mec.execute('1/0')
174 pr2 = mec.execute('c = sets.Set()')
174 pr2 = mec.execute('c = sets.Set()')
175
175
176 mec.barrier((pr1, pr2))
176 mec.barrier((pr1, pr2))
177 try:
177 try:
178 pr1.r
178 pr1.r
179 except CompositeError:
179 except CompositeError:
180 print "Caught ZeroDivisionError OK."
180 print "Caught ZeroDivisionError OK."
181
181
182 pr = mec.execute("print 'hi'")
182 pr = mec.execute("print 'hi'")
183 pr.r
183 pr.r
184
184
185 pr = mec.execute('1/0')
185 pr = mec.execute('1/0')
186 try:
186 try:
187 pr.r
187 pr.r
188 except CompositeError:
188 except CompositeError:
189 print "Caught ZeroDivisionError OK."
189 print "Caught ZeroDivisionError OK."
190
190
191 # Make sure we can reraise it!
191 # Make sure we can reraise it!
192 try:
192 try:
193 pr.r
193 pr.r
194 except CompositeError:
194 except CompositeError:
195 print "Caught ZeroDivisionError OK."
195 print "Caught ZeroDivisionError OK."
196
196
197 # push/pull
197 # push/pull
198
198
199 pr1 = mec.push(dict(a=10))
199 pr1 = mec.push(dict(a=10))
200 pr1.get_result()
200 pr1.get_result()
201 pr2 = mec.pull('a')
201 pr2 = mec.pull('a')
202 pr2.r
202 pr2.r
203
203
204 # flush
204 # flush
205
205
206 mec.flush()
206 mec.flush()
207 pd1 = mec.execute('a=30')
207 pd1 = mec.execute('a=30')
208 pd2 = mec.pull('a')
208 pd2 = mec.pull('a')
209 mec.flush()
209 mec.flush()
210
210
211 try:
211 try:
212 pd1.get_result()
212 pd1.get_result()
213 except InvalidDeferredID:
213 except InvalidDeferredID:
214 print "PendingResult object was cleared OK."
214 print "PendingResult object was cleared OK."
215
215
216
216
217 try:
217 try:
218 pd2.get_result()
218 pd2.get_result()
219 except InvalidDeferredID:
219 except InvalidDeferredID:
220 print "PendingResult object was cleared OK."
220 print "PendingResult object was cleared OK."
221
221
222
222
223
223
224 # This is a command to make sure the end of the file is happy.
224 # This is a command to make sure the end of the file is happy.
225
225
226 print "The tests are done!"
226 print "The tests are done!"
227
227
@@ -1,38 +1,38 b''
1 #-------------------------------------------------------------------------------
1 #-------------------------------------------------------------------------------
2 # Imports
2 # Imports
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4
4
5 import time
5 import time
6 import numpy
6 import numpy
7
7
8 import ipython1.kernel.magic
8 import IPython.kernel.magic
9 from ipython1.kernel import client
9 from IPython.kernel import client
10 from ipython1.kernel.error import *
10 from IPython.kernel.error import *
11
11
12 mec = client.MultiEngineClient()
12 mec = client.MultiEngineClient()
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Setup
15 # Setup
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 mec.reset()
18 mec.reset()
19 # print mec.keys()
19 # print mec.keys()
20 mec.activate()
20 mec.activate()
21 # mec.block=True
21 # mec.block=True
22 mec.get_ids()
22 mec.get_ids()
23 n = len(mec)
23 n = len(mec)
24 assert n >= 4, "Not Enough Engines: %i, 4 needed for this script"%n
24 assert n >= 4, "Not Enough Engines: %i, 4 needed for this script"%n
25
25
26 mec.block=False
26 mec.block=False
27
27
28 pr1 = mec.execute('import time')
28 pr1 = mec.execute('import time')
29 pr2 = mec.execute('time.sleep(5)')
29 pr2 = mec.execute('time.sleep(5)')
30 pr3 = mec.push(dict(a=10,b=30,c=range(20000),d='The dog went swimming.'))
30 pr3 = mec.push(dict(a=10,b=30,c=range(20000),d='The dog went swimming.'))
31 pr4 = mec.pull(('a','b','d'))
31 pr4 = mec.pull(('a','b','d'))
32
32
33 print "Try a non-blocking get_result"
33 print "Try a non-blocking get_result"
34 assert pr4.get_result(block=False, default='not done')=='not done'
34 assert pr4.get_result(block=False, default='not done')=='not done'
35
35
36 print "Now wait for all the results"
36 print "Now wait for all the results"
37 mec.barrier((pr1,pr2,pr3,pr4))
37 mec.barrier((pr1,pr2,pr3,pr4))
38 print "The last pull got:", pr4.r
38 print "The last pull got:", pr4.r
@@ -1,120 +1,120 b''
1 """Example showing how to merge multiple remote data streams.
1 """Example showing how to merge multiple remote data streams.
2 """
2 """
3 # Slightly modified version of:
3 # Slightly modified version of:
4 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
4 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
5
5
6 import heapq
6 import heapq
7 from ipython1.kernel.error import CompositeError
7 from IPython.kernel.error import CompositeError
8
8
9 def mergesort(list_of_lists, key=None):
9 def mergesort(list_of_lists, key=None):
10 """ Perform an N-way merge operation on sorted lists.
10 """ Perform an N-way merge operation on sorted lists.
11
11
12 @param list_of_lists: (really iterable of iterable) of sorted elements
12 @param list_of_lists: (really iterable of iterable) of sorted elements
13 (either by naturally or by C{key})
13 (either by naturally or by C{key})
14 @param key: specify sort key function (like C{sort()}, C{sorted()})
14 @param key: specify sort key function (like C{sort()}, C{sorted()})
15
15
16 Yields tuples of the form C{(item, iterator)}, where the iterator is the
16 Yields tuples of the form C{(item, iterator)}, where the iterator is the
17 built-in list iterator or something you pass in, if you pre-generate the
17 built-in list iterator or something you pass in, if you pre-generate the
18 iterators.
18 iterators.
19
19
20 This is a stable merge; complexity O(N lg N)
20 This is a stable merge; complexity O(N lg N)
21
21
22 Examples::
22 Examples::
23
23
24 >>> print list(mergesort([[1,2,3,4],
24 >>> print list(mergesort([[1,2,3,4],
25 ... [2,3.25,3.75,4.5,6,7],
25 ... [2,3.25,3.75,4.5,6,7],
26 ... [2.625,3.625,6.625,9]]))
26 ... [2.625,3.625,6.625,9]]))
27 [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
27 [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
28
28
29 # note stability
29 # note stability
30 >>> print list(mergesort([[1,2,3,4],
30 >>> print list(mergesort([[1,2,3,4],
31 ... [2,3.25,3.75,4.5,6,7],
31 ... [2,3.25,3.75,4.5,6,7],
32 ... [2.625,3.625,6.625,9]],
32 ... [2.625,3.625,6.625,9]],
33 ... key=int))
33 ... key=int))
34 [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
34 [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
35
35
36
36
37 >>> print list(mergesort([[4, 3, 2, 1],
37 >>> print list(mergesort([[4, 3, 2, 1],
38 ... [7, 6, 4.5, 3.75, 3.25, 2],
38 ... [7, 6, 4.5, 3.75, 3.25, 2],
39 ... [9, 6.625, 3.625, 2.625]],
39 ... [9, 6.625, 3.625, 2.625]],
40 ... key=lambda x: -x))
40 ... key=lambda x: -x))
41 [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
41 [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
42 """
42 """
43
43
44 heap = []
44 heap = []
45 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
45 for i, itr in enumerate(iter(pl) for pl in list_of_lists):
46 try:
46 try:
47 item = itr.next()
47 item = itr.next()
48 toadd = (key(item), i, item, itr) if key else (item, i, itr)
48 toadd = (key(item), i, item, itr) if key else (item, i, itr)
49 heap.append(toadd)
49 heap.append(toadd)
50 except StopIteration:
50 except StopIteration:
51 pass
51 pass
52 heapq.heapify(heap)
52 heapq.heapify(heap)
53
53
54 if key:
54 if key:
55 while heap:
55 while heap:
56 _, idx, item, itr = heap[0]
56 _, idx, item, itr = heap[0]
57 yield item
57 yield item
58 try:
58 try:
59 item = itr.next()
59 item = itr.next()
60 heapq.heapreplace(heap, (key(item), idx, item, itr) )
60 heapq.heapreplace(heap, (key(item), idx, item, itr) )
61 except StopIteration:
61 except StopIteration:
62 heapq.heappop(heap)
62 heapq.heappop(heap)
63
63
64 else:
64 else:
65 while heap:
65 while heap:
66 item, idx, itr = heap[0]
66 item, idx, itr = heap[0]
67 yield item
67 yield item
68 try:
68 try:
69 heapq.heapreplace(heap, (itr.next(), idx, itr))
69 heapq.heapreplace(heap, (itr.next(), idx, itr))
70 except StopIteration:
70 except StopIteration:
71 heapq.heappop(heap)
71 heapq.heappop(heap)
72
72
73
73
74 def remote_iterator(rc,engine,name):
74 def remote_iterator(rc,engine,name):
75 """Return an iterator on an object living on a remote engine.
75 """Return an iterator on an object living on a remote engine.
76 """
76 """
77 # Check that the object exists on the engine and pin a reference to it
77 # Check that the object exists on the engine and pin a reference to it
78 iter_name = '_%s_rmt_iter_' % name
78 iter_name = '_%s_rmt_iter_' % name
79 rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine)
79 rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine)
80 tpl = '_tmp = %s.next()' % iter_name
80 tpl = '_tmp = %s.next()' % iter_name
81 while True:
81 while True:
82 try:
82 try:
83 rc.execute(tpl, targets=engine)
83 rc.execute(tpl, targets=engine)
84 result = rc.pull('_tmp', targets=engine)[0]
84 result = rc.pull('_tmp', targets=engine)[0]
85 # This causes the StopIteration exception to be raised.
85 # This causes the StopIteration exception to be raised.
86 except CompositeError, e:
86 except CompositeError, e:
87 e.raise_exception()
87 e.raise_exception()
88 else:
88 else:
89 yield result
89 yield result
90
90
91 # Main, interactive testing
91 # Main, interactive testing
92 if __name__ == '__main__':
92 if __name__ == '__main__':
93
93
94 from ipython1.kernel import client
94 from IPython.kernel import client
95 ipc = client.MultiEngineClient()
95 ipc = client.MultiEngineClient()
96 print 'Engine IDs:',ipc.get_ids()
96 print 'Engine IDs:',ipc.get_ids()
97
97
98 # Make a set of 'sorted datasets'
98 # Make a set of 'sorted datasets'
99 a0 = range(5,20)
99 a0 = range(5,20)
100 a1 = range(10)
100 a1 = range(10)
101 a2 = range(15,25)
101 a2 = range(15,25)
102
102
103 # Now, imagine these had been created in the remote engines by some long
103 # Now, imagine these had been created in the remote engines by some long
104 # computation. In this simple example, we just send them over into the
104 # computation. In this simple example, we just send them over into the
105 # remote engines. They will all be called 'a' in each engine.
105 # remote engines. They will all be called 'a' in each engine.
106 ipc.push(dict(a=a0), targets=0)
106 ipc.push(dict(a=a0), targets=0)
107 ipc.push(dict(a=a1), targets=1)
107 ipc.push(dict(a=a1), targets=1)
108 ipc.push(dict(a=a2), targets=2)
108 ipc.push(dict(a=a2), targets=2)
109
109
110 # And we now make a local object which represents the remote iterator
110 # And we now make a local object which represents the remote iterator
111 aa0 = remote_iterator(ipc,0,'a')
111 aa0 = remote_iterator(ipc,0,'a')
112 aa1 = remote_iterator(ipc,1,'a')
112 aa1 = remote_iterator(ipc,1,'a')
113 aa2 = remote_iterator(ipc,2,'a')
113 aa2 = remote_iterator(ipc,2,'a')
114
114
115 # Let's merge them, both locally and remotely:
115 # Let's merge them, both locally and remotely:
116 print 'Merge the local datasets:'
116 print 'Merge the local datasets:'
117 print list(mergesort([a0,a1,a2]))
117 print list(mergesort([a0,a1,a2]))
118
118
119 print 'Locally merge the remote sets:'
119 print 'Locally merge the remote sets:'
120 print list(mergesort([aa0,aa1,aa2]))
120 print list(mergesort([aa0,aa1,aa2]))
@@ -1,46 +1,46 b''
1 """Example of how to use pylab to plot parallel data.
1 """Example of how to use pylab to plot parallel data.
2
2
3 The idea here is to run matplotlib is the same IPython session
3 The idea here is to run matplotlib is the same IPython session
4 as an ipython RemoteController client. That way matplotlib
4 as an ipython RemoteController client. That way matplotlib
5 can be used to plot parallel data that is gathered using
5 can be used to plot parallel data that is gathered using
6 RemoteController.
6 RemoteController.
7
7
8 To run this example, first start the IPython controller and 4
8 To run this example, first start the IPython controller and 4
9 engines::
9 engines::
10
10
11 ipcluster -n 4
11 ipcluster -n 4
12
12
13 Then start ipython in pylab mode::
13 Then start ipython in pylab mode::
14
14
15 ipython -pylab
15 ipython -pylab
16
16
17 Then a simple "run parallel_pylab.ipy" in IPython will run the
17 Then a simple "run parallel_pylab.ipy" in IPython will run the
18 example.
18 example.
19 """
19 """
20
20
21 import numpy as N
21 import numpy as N
22 from pylab import *
22 from pylab import *
23 from ipython1.kernel import client
23 from IPython.kernel import client
24
24
25 # Get an IPython1 client
25 # Get an IPython1 client
26 rc = client.MultiEngineClient()
26 rc = client.MultiEngineClient()
27 rc.get_ids()
27 rc.get_ids()
28 rc.activate()
28 rc.activate()
29
29
30 # Create random arrays on the engines
30 # Create random arrays on the engines
31 # This is to simulate arrays that you have calculated in parallel
31 # This is to simulate arrays that you have calculated in parallel
32 # on the engines.
32 # on the engines.
33 # Anymore that length 10000 arrays, matplotlib starts to be slow
33 # Anymore that length 10000 arrays, matplotlib starts to be slow
34 %px import numpy as N
34 %px import numpy as N
35 %px x = N.random.standard_normal(10000)
35 %px x = N.random.standard_normal(10000)
36 %px y = N.random.standard_normal(10000)
36 %px y = N.random.standard_normal(10000)
37
37
38 %px print x[0:10]
38 %px print x[0:10]
39 %px print y[0:10]
39 %px print y[0:10]
40
40
41 # Bring back the data
41 # Bring back the data
42 x_local = rc.gather('x')
42 x_local = rc.gather('x')
43 y_local = rc.gather('y')
43 y_local = rc.gather('y')
44
44
45 # Make a scatter plot of the gathered data
45 # Make a scatter plot of the gathered data
46 plot(x_local, y_local,'ro')
46 plot(x_local, y_local,'ro')
@@ -1,52 +1,52 b''
1 """An example of how to use IPython1 for plotting remote parallel data
1 """An example of how to use IPython1 for plotting remote parallel data
2
2
3 The two files plotting_frontend.ipy and plotting_backend.py go together.
3 The two files plotting_frontend.ipy and plotting_backend.py go together.
4
4
5 To run this example, first start the IPython controller and 4
5 To run this example, first start the IPython controller and 4
6 engines::
6 engines::
7
7
8 ipcluster -n 4
8 ipcluster -n 4
9
9
10 Then start ipython in pylab mode::
10 Then start ipython in pylab mode::
11
11
12 ipython -pylab
12 ipython -pylab
13
13
14 Then a simple "run plotting_frontend.ipy" in IPython will run the
14 Then a simple "run plotting_frontend.ipy" in IPython will run the
15 example. When this is done, all the variables (such as number, downx, etc.)
15 example. When this is done, all the variables (such as number, downx, etc.)
16 are available in IPython, so for example you can make additional plots.
16 are available in IPython, so for example you can make additional plots.
17 """
17 """
18
18
19 import numpy as N
19 import numpy as N
20 from pylab import *
20 from pylab import *
21 from ipython1.kernel import client
21 from IPython.kernel import client
22
22
23 # Get an IPython1 client
23 # Get an IPython1 client
24 rc = client.MultiEngineClient()
24 rc = client.MultiEngineClient()
25 rc.get_ids()
25 rc.get_ids()
26
26
27 # Run the simulation on all the engines
27 # Run the simulation on all the engines
28 rc.run('plotting_backend.py')
28 rc.run('plotting_backend.py')
29
29
30 # Bring back the data
30 # Bring back the data
31 number = rc.pull('number')
31 number = rc.pull('number')
32 d_number = rc.pull('d_number')
32 d_number = rc.pull('d_number')
33 downx = rc.gather('downx')
33 downx = rc.gather('downx')
34 downy = rc.gather('downy')
34 downy = rc.gather('downy')
35 downpx = rc.gather('downpx')
35 downpx = rc.gather('downpx')
36 downpy = rc.gather('downpy')
36 downpy = rc.gather('downpy')
37
37
38 print "number: ", sum(number)
38 print "number: ", sum(number)
39 print "downsampled number: ", sum(d_number)
39 print "downsampled number: ", sum(d_number)
40
40
41 # Make a scatter plot of the gathered data
41 # Make a scatter plot of the gathered data
42 # These calls to matplotlib could be replaced by calls to pygist or
42 # These calls to matplotlib could be replaced by calls to pygist or
43 # another plotting package.
43 # another plotting package.
44 figure(1)
44 figure(1)
45 scatter(downx, downy)
45 scatter(downx, downy)
46 xlabel('x')
46 xlabel('x')
47 ylabel('y')
47 ylabel('y')
48 figure(2)
48 figure(2)
49 scatter(downpx, downpy)
49 scatter(downpx, downpy)
50 xlabel('px')
50 xlabel('px')
51 ylabel('py')
51 ylabel('py')
52 show() No newline at end of file
52 show()
@@ -1,46 +1,46 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Parallel word frequency counter."""
2 """Parallel word frequency counter."""
3
3
4
4
5 from itertools import repeat
5 from itertools import repeat
6 from wordfreq import print_wordfreq, wordfreq
6 from wordfreq import print_wordfreq, wordfreq
7
7
8 def pwordfreq(rc, text):
8 def pwordfreq(rc, text):
9 """Parallel word frequency counter.
9 """Parallel word frequency counter.
10
10
11 rc - An IPython RemoteController
11 rc - An IPython RemoteController
12 text - The name of a string on the engines to do the freq count on.
12 text - The name of a string on the engines to do the freq count on.
13 """
13 """
14
14
15 rc.execute('freqs = wordfreq(%s)' %text)
15 rc.execute('freqs = wordfreq(%s)' %text)
16 freqs_list = rc.pull('freqs')
16 freqs_list = rc.pull('freqs')
17 word_set = set()
17 word_set = set()
18 for f in freqs_list:
18 for f in freqs_list:
19 word_set.update(f.keys())
19 word_set.update(f.keys())
20 freqs = dict(zip(word_set, repeat(0)))
20 freqs = dict(zip(word_set, repeat(0)))
21 for f in freqs_list:
21 for f in freqs_list:
22 for word, count in f.iteritems():
22 for word, count in f.iteritems():
23 freqs[word] += count
23 freqs[word] += count
24 return freqs
24 return freqs
25
25
26 if __name__ == '__main__':
26 if __name__ == '__main__':
27 # Create a MultiEngineClient
27 # Create a MultiEngineClient
28 from ipython1.kernel import client
28 from IPython.kernel import client
29 ipc = client.MultiEngineClient()
29 ipc = client.MultiEngineClient()
30
30
31 # Run the wordfreq script on the engines.
31 # Run the wordfreq script on the engines.
32 ipc.run('wordfreq.py')
32 ipc.run('wordfreq.py')
33
33
34 # Run the serial version
34 # Run the serial version
35 print "Serial word frequency count:"
35 print "Serial word frequency count:"
36 text = open('davinci.txt').read()
36 text = open('davinci.txt').read()
37 freqs = wordfreq(text)
37 freqs = wordfreq(text)
38 print_wordfreq(freqs, 10)
38 print_wordfreq(freqs, 10)
39
39
40 # The parallel version
40 # The parallel version
41 print "\nParallel word frequency count:"
41 print "\nParallel word frequency count:"
42 files = ['davinci%i.txt' % i for i in range(4)]
42 files = ['davinci%i.txt' % i for i in range(4)]
43 ipc.scatter('textfile', files)
43 ipc.scatter('textfile', files)
44 ipc.execute('text = open(textfile[0]).read()')
44 ipc.execute('text = open(textfile[0]).read()')
45 pfreqs = pwordfreq(ipc,'text')
45 pfreqs = pwordfreq(ipc,'text')
46 print_wordfreq(freqs)
46 print_wordfreq(freqs)
@@ -1,35 +1,35 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Parallel word frequency counter."""
2 """Parallel word frequency counter."""
3
3
4
4
5 from itertools import repeat
5 from itertools import repeat
6 from wordfreq import print_wordfreq, wordfreq
6 from wordfreq import print_wordfreq, wordfreq
7
7
8 def pwordfreq(rc, text):
8 def pwordfreq(rc, text):
9 """Parallel word frequency counter.
9 """Parallel word frequency counter.
10
10
11 rc - An IPython RemoteController
11 rc - An IPython RemoteController
12 text - The name of a string on the engines to do the freq count on.
12 text - The name of a string on the engines to do the freq count on.
13 """
13 """
14
14
15 if __name__ == '__main__':
15 if __name__ == '__main__':
16 # Create a MultiEngineClient
16 # Create a MultiEngineClient
17 from ipython1.kernel import client
17 from IPython.kernel import client
18 ipc = client.MultiEngineClient()
18 ipc = client.MultiEngineClient()
19
19
20 # Run the wordfreq script on the engines.
20 # Run the wordfreq script on the engines.
21 ipc.run('wordfreq.py')
21 ipc.run('wordfreq.py')
22
22
23 # Run the serial version
23 # Run the serial version
24 print "Serial word frequency count:"
24 print "Serial word frequency count:"
25 text = open('davinci.txt').read()
25 text = open('davinci.txt').read()
26 freqs = wordfreq(text)
26 freqs = wordfreq(text)
27 print_wordfreq(freqs, 10)
27 print_wordfreq(freqs, 10)
28
28
29 # The parallel version
29 # The parallel version
30 print "\nParallel word frequency count:"
30 print "\nParallel word frequency count:"
31 files = ['davinci%i.txt' % i for i in range(4)]
31 files = ['davinci%i.txt' % i for i in range(4)]
32 ipc.scatter('textfile', files)
32 ipc.scatter('textfile', files)
33 ipc.execute('text = open(textfile[0]).read()')
33 ipc.execute('text = open(textfile[0]).read()')
34 pfreqs = pwordfreq(ipc,'text')
34 pfreqs = pwordfreq(ipc,'text')
35 print_wordfreq(freqs)
35 print_wordfreq(freqs)
@@ -1,57 +1,57 b''
1 #-------------------------------------------------------------------------------
1 #-------------------------------------------------------------------------------
2 # Driver code that the client runs.
2 # Driver code that the client runs.
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # To run this code start a controller and engines using:
4 # To run this code start a controller and engines using:
5 # ipcluster -n 2
5 # ipcluster -n 2
6 # Then run the scripts by doing irunner rmt.ipy or by starting ipython and
6 # Then run the scripts by doing irunner rmt.ipy or by starting ipython and
7 # doing run rmt.ipy.
7 # doing run rmt.ipy.
8
8
9 from rmtkernel import *
9 from rmtkernel import *
10 from ipython1.kernel import client
10 from IPython.kernel import client
11
11
12
12
13 def wignerDistribution(s):
13 def wignerDistribution(s):
14 """Returns (s, rho(s)) for the Wigner GOE distribution."""
14 """Returns (s, rho(s)) for the Wigner GOE distribution."""
15 return (numpy.pi*s/2.0) * numpy.exp(-numpy.pi*s**2/4.)
15 return (numpy.pi*s/2.0) * numpy.exp(-numpy.pi*s**2/4.)
16
16
17
17
18 def generateWignerData():
18 def generateWignerData():
19 s = numpy.linspace(0.0,4.0,400)
19 s = numpy.linspace(0.0,4.0,400)
20 rhos = wignerDistribution(s)
20 rhos = wignerDistribution(s)
21 return s, rhos
21 return s, rhos
22
22
23
23
24 def serialDiffs(num, N):
24 def serialDiffs(num, N):
25 diffs = ensembleDiffs(num, N)
25 diffs = ensembleDiffs(num, N)
26 normalizedDiffs = normalizeDiffs(diffs)
26 normalizedDiffs = normalizeDiffs(diffs)
27 return normalizedDiffs
27 return normalizedDiffs
28
28
29
29
30 def parallelDiffs(rc, num, N):
30 def parallelDiffs(rc, num, N):
31 nengines = len(rc.get_ids())
31 nengines = len(rc.get_ids())
32 num_per_engine = num/nengines
32 num_per_engine = num/nengines
33 print "Running with", num_per_engine, "per engine."
33 print "Running with", num_per_engine, "per engine."
34 rc.push(dict(num_per_engine=num_per_engine, N=N))
34 rc.push(dict(num_per_engine=num_per_engine, N=N))
35 rc.execute('diffs = ensembleDiffs(num_per_engine, N)')
35 rc.execute('diffs = ensembleDiffs(num_per_engine, N)')
36 # gather blocks always for now
36 # gather blocks always for now
37 pr = rc.gather('diffs')
37 pr = rc.gather('diffs')
38 return pr.r
38 return pr.r
39
39
40
40
41 # Main code
41 # Main code
42 if __name__ == '__main__':
42 if __name__ == '__main__':
43 rc = client.MultiEngineClient()
43 rc = client.MultiEngineClient()
44 print "Distributing code to engines..."
44 print "Distributing code to engines..."
45 r = rc.run('rmtkernel.py')
45 r = rc.run('rmtkernel.py')
46 rc.block = False
46 rc.block = False
47
47
48 # Simulation parameters
48 # Simulation parameters
49 nmats = 100
49 nmats = 100
50 matsize = 30
50 matsize = 30
51
51
52 %timeit -n1 -r1 serialDiffs(nmats,matsize)
52 %timeit -n1 -r1 serialDiffs(nmats,matsize)
53 %timeit -n1 -r1 parallelDiffs(rc, nmats, matsize)
53 %timeit -n1 -r1 parallelDiffs(rc, nmats, matsize)
54
54
55 # Uncomment these to plot the histogram
55 # Uncomment these to plot the histogram
56 import pylab
56 import pylab
57 pylab.hist(parallelDiffs(rc,matsize,matsize))
57 pylab.hist(parallelDiffs(rc,matsize,matsize))
@@ -1,18 +1,18 b''
1 from ipython1.kernel import client
1 from IPython.kernel import client
2
2
3 tc = client.TaskClient()
3 tc = client.TaskClient()
4 rc = client.MultiEngineClient()
4 rc = client.MultiEngineClient()
5
5
6 rc.push(dict(d=30))
6 rc.push(dict(d=30))
7
7
8 cmd1 = """\
8 cmd1 = """\
9 a = 5
9 a = 5
10 b = 10*d
10 b = 10*d
11 c = a*b*d
11 c = a*b*d
12 """
12 """
13
13
14 t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
14 t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
15 tid1 = tc.run(t1)
15 tid1 = tc.run(t1)
16 tr1 = tc.get_task_result(tid1,block=True)
16 tr1 = tc.get_task_result(tid1,block=True)
17 tr1.raiseException()
17 tr1.raiseException()
18 print "a, b: ", tr1.ns.a, tr1.ns.b No newline at end of file
18 print "a, b: ", tr1.ns.a, tr1.ns.b
@@ -1,44 +1,44 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 from ipython1.kernel import client
4 from IPython.kernel import client
5 import time
5 import time
6
6
7 tc = client.TaskClient()
7 tc = client.TaskClient()
8 mec = client.MultiEngineClient()
8 mec = client.MultiEngineClient()
9
9
10 mec.execute('import time')
10 mec.execute('import time')
11
11
12 for i in range(24):
12 for i in range(24):
13 tc.irun('time.sleep(1)')
13 tc.irun('time.sleep(1)')
14
14
15 for i in range(6):
15 for i in range(6):
16 time.sleep(1.0)
16 time.sleep(1.0)
17 print "Queue status (vebose=False)"
17 print "Queue status (vebose=False)"
18 print tc.queue_status()
18 print tc.queue_status()
19
19
20 for i in range(24):
20 for i in range(24):
21 tc.irun('time.sleep(1)')
21 tc.irun('time.sleep(1)')
22
22
23 for i in range(6):
23 for i in range(6):
24 time.sleep(1.0)
24 time.sleep(1.0)
25 print "Queue status (vebose=True)"
25 print "Queue status (vebose=True)"
26 print tc.queue_status(True)
26 print tc.queue_status(True)
27
27
28 for i in range(12):
28 for i in range(12):
29 tc.irun('time.sleep(2)')
29 tc.irun('time.sleep(2)')
30
30
31 print "Queue status (vebose=True)"
31 print "Queue status (vebose=True)"
32 print tc.queue_status(True)
32 print tc.queue_status(True)
33
33
34 qs = tc.queue_status(True)
34 qs = tc.queue_status(True)
35 sched = qs['scheduled']
35 sched = qs['scheduled']
36
36
37 for tid in sched[-4:]:
37 for tid in sched[-4:]:
38 tc.abort(tid)
38 tc.abort(tid)
39
39
40 for i in range(6):
40 for i in range(6):
41 time.sleep(1.0)
41 time.sleep(1.0)
42 print "Queue status (vebose=True)"
42 print "Queue status (vebose=True)"
43 print tc.queue_status(True)
43 print tc.queue_status(True)
44
44
@@ -1,77 +1,77 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Test the performance of the task farming system.
2 """Test the performance of the task farming system.
3
3
4 This script submits a set of tasks to the TaskClient. The tasks
4 This script submits a set of tasks to the TaskClient. The tasks
5 are basically just a time.sleep(t), where t is a random number between
5 are basically just a time.sleep(t), where t is a random number between
6 two limits that can be configured at the command line. To run
6 two limits that can be configured at the command line. To run
7 the script there must first be an IPython controller and engines running::
7 the script there must first be an IPython controller and engines running::
8
8
9 ipcluster -n 16
9 ipcluster -n 16
10
10
11 A good test to run with 16 engines is::
11 A good test to run with 16 engines is::
12
12
13 python task_profiler.py -n 128 -t 0.01 -T 1.0
13 python task_profiler.py -n 128 -t 0.01 -T 1.0
14
14
15 This should show a speedup of 13-14x. The limitation here is that the
15 This should show a speedup of 13-14x. The limitation here is that the
16 overhead of a single task is about 0.001-0.01 seconds.
16 overhead of a single task is about 0.001-0.01 seconds.
17 """
17 """
18 import random, sys
18 import random, sys
19 from optparse import OptionParser
19 from optparse import OptionParser
20
20
21 from IPython.genutils import time
21 from IPython.genutils import time
22 from ipython1.kernel import client
22 from IPython.kernel import client
23
23
24 def main():
24 def main():
25 parser = OptionParser()
25 parser = OptionParser()
26 parser.set_defaults(n=100)
26 parser.set_defaults(n=100)
27 parser.set_defaults(tmin=1)
27 parser.set_defaults(tmin=1)
28 parser.set_defaults(tmax=60)
28 parser.set_defaults(tmax=60)
29 parser.set_defaults(controller='localhost')
29 parser.set_defaults(controller='localhost')
30 parser.set_defaults(meport=10105)
30 parser.set_defaults(meport=10105)
31 parser.set_defaults(tport=10113)
31 parser.set_defaults(tport=10113)
32
32
33 parser.add_option("-n", type='int', dest='n',
33 parser.add_option("-n", type='int', dest='n',
34 help='the number of tasks to run')
34 help='the number of tasks to run')
35 parser.add_option("-t", type='float', dest='tmin',
35 parser.add_option("-t", type='float', dest='tmin',
36 help='the minimum task length in seconds')
36 help='the minimum task length in seconds')
37 parser.add_option("-T", type='float', dest='tmax',
37 parser.add_option("-T", type='float', dest='tmax',
38 help='the maximum task length in seconds')
38 help='the maximum task length in seconds')
39 parser.add_option("-c", type='string', dest='controller',
39 parser.add_option("-c", type='string', dest='controller',
40 help='the address of the controller')
40 help='the address of the controller')
41 parser.add_option("-p", type='int', dest='meport',
41 parser.add_option("-p", type='int', dest='meport',
42 help="the port on which the controller listens for the MultiEngine/RemoteController client")
42 help="the port on which the controller listens for the MultiEngine/RemoteController client")
43 parser.add_option("-P", type='int', dest='tport',
43 parser.add_option("-P", type='int', dest='tport',
44 help="the port on which the controller listens for the TaskClient client")
44 help="the port on which the controller listens for the TaskClient client")
45
45
46 (opts, args) = parser.parse_args()
46 (opts, args) = parser.parse_args()
47 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
47 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
48
48
49 rc = client.MultiEngineClient()
49 rc = client.MultiEngineClient()
50 tc = client.TaskClient()
50 tc = client.TaskClient()
51 print tc.task_controller
51 print tc.task_controller
52 rc.block=True
52 rc.block=True
53 nengines = len(rc.get_ids())
53 nengines = len(rc.get_ids())
54 rc.execute('from IPython.genutils import time')
54 rc.execute('from IPython.genutils import time')
55
55
56 # the jobs should take a random time within a range
56 # the jobs should take a random time within a range
57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
58 tasks = [client.Task("time.sleep(%f)"%t) for t in times]
58 tasks = [client.Task("time.sleep(%f)"%t) for t in times]
59 stime = sum(times)
59 stime = sum(times)
60
60
61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
62 time.sleep(1)
62 time.sleep(1)
63 start = time.time()
63 start = time.time()
64 taskids = [tc.run(t) for t in tasks]
64 taskids = [tc.run(t) for t in tasks]
65 tc.barrier(taskids)
65 tc.barrier(taskids)
66 stop = time.time()
66 stop = time.time()
67
67
68 ptime = stop-start
68 ptime = stop-start
69 scale = stime/ptime
69 scale = stime/ptime
70
70
71 print "executed %.1f secs in %.1f secs"%(stime, ptime)
71 print "executed %.1f secs in %.1f secs"%(stime, ptime)
72 print "%.3fx parallel performance on %i engines"%(scale, nengines)
72 print "%.3fx parallel performance on %i engines"%(scale, nengines)
73 print "%.1f%% of theoretical max"%(100*scale/nengines)
73 print "%.1f%% of theoretical max"%(100*scale/nengines)
74
74
75
75
76 if __name__ == '__main__':
76 if __name__ == '__main__':
77 main()
77 main()
@@ -1,27 +1,27 b''
1 ========================
1 ========================
2 New configuration system
2 New configuration system
3 ========================
3 ========================
4
4
5 IPython has a configuration system. When running IPython for the first time,
5 IPython has a configuration system. When running IPython for the first time,
6 reasonable defaults are used for the configuration. The configuration of IPython
6 reasonable defaults are used for the configuration. The configuration of IPython
7 can be changed in two ways:
7 can be changed in two ways:
8
8
9 * Configuration files
9 * Configuration files
10 * Commands line options (which override the configuration files)
10 * Commands line options (which override the configuration files)
11
11
12 IPython has a separate configuration file for each subpackage. Thus, the main
12 IPython has a separate configuration file for each subpackage. Thus, the main
13 configuration files are (in your ``~/.ipython`` directory):
13 configuration files are (in your ``~/.ipython`` directory):
14
14
15 * ``ipython1.core.ini``
15 * ``ipython1.core.ini``
16 * ``ipython1.kernel.ini``
16 * ``ipython1.kernel.ini``
17 * ``ipython1.notebook.ini``
17 * ``ipython1.notebook.ini``
18
18
19 To create these files for the first time, do the following::
19 To create these files for the first time, do the following::
20
20
21 from ipython1.kernel.config import config_manager as kernel_config
21 from IPython.kernel.config import config_manager as kernel_config
22 kernel_config.write_default_config_file()
22 kernel_config.write_default_config_file()
23
23
24 But, you should only need to do this if you need to modify the defaults. If needed
24 But, you should only need to do this if you need to modify the defaults. If needed
25 repeat this process with the ``notebook`` and ``core`` configuration as well. If you
25 repeat this process with the ``notebook`` and ``core`` configuration as well. If you
26 are running into problems with IPython, you might try deleting these configuration
26 are running into problems with IPython, you might try deleting these configuration
27 files. No newline at end of file
27 files.
@@ -1,242 +1,242 b''
1 .. _ip1par:
1 .. _ip1par:
2
2
3 ======================================
3 ======================================
4 Using IPython for parallel computing
4 Using IPython for parallel computing
5 ======================================
5 ======================================
6
6
7 .. contents::
7 .. contents::
8
8
9 Introduction
9 Introduction
10 ============
10 ============
11
11
12 This file gives an overview of IPython. IPython has a sophisticated and
12 This file gives an overview of IPython. IPython has a sophisticated and
13 powerful architecture for parallel and distributed computing. This
13 powerful architecture for parallel and distributed computing. This
14 architecture abstracts out parallelism in a very general way, which
14 architecture abstracts out parallelism in a very general way, which
15 enables IPython to support many different styles of parallelism
15 enables IPython to support many different styles of parallelism
16 including:
16 including:
17
17
18 * Single program, multiple data (SPMD) parallelism.
18 * Single program, multiple data (SPMD) parallelism.
19 * Multiple program, multiple data (MPMD) parallelism.
19 * Multiple program, multiple data (MPMD) parallelism.
20 * Message passing using ``MPI``.
20 * Message passing using ``MPI``.
21 * Task farming.
21 * Task farming.
22 * Data parallel.
22 * Data parallel.
23 * Combinations of these approaches.
23 * Combinations of these approaches.
24 * Custom user defined approaches.
24 * Custom user defined approaches.
25
25
26 Most importantly, IPython enables all types of parallel applications to
26 Most importantly, IPython enables all types of parallel applications to
27 be developed, executed, debugged and monitored *interactively*. Hence,
27 be developed, executed, debugged and monitored *interactively*. Hence,
28 the ``I`` in IPython. The following are some example usage cases for IPython:
28 the ``I`` in IPython. The following are some example usage cases for IPython:
29
29
30 * Quickly parallelize algorithms that are embarrassingly parallel
30 * Quickly parallelize algorithms that are embarrassingly parallel
31 using a number of simple approaches. Many simple things can be
31 using a number of simple approaches. Many simple things can be
32 parallelized interactively in one or two lines of code.
32 parallelized interactively in one or two lines of code.
33 * Steer traditional MPI applications on a supercomputer from an
33 * Steer traditional MPI applications on a supercomputer from an
34 IPython session on your laptop.
34 IPython session on your laptop.
35 * Analyze and visualize large datasets (that could be remote and/or
35 * Analyze and visualize large datasets (that could be remote and/or
36 distributed) interactively using IPython and tools like
36 distributed) interactively using IPython and tools like
37 matplotlib/TVTK.
37 matplotlib/TVTK.
38 * Develop, test and debug new parallel algorithms
38 * Develop, test and debug new parallel algorithms
39 (that may use MPI) interactively.
39 (that may use MPI) interactively.
40 * Tie together multiple MPI jobs running on different systems into
40 * Tie together multiple MPI jobs running on different systems into
41 one giant distributed and parallel system.
41 one giant distributed and parallel system.
42 * Start a parallel job on your cluster and then have a remote
42 * Start a parallel job on your cluster and then have a remote
43 collaborator connect to it and pull back data into their
43 collaborator connect to it and pull back data into their
44 local IPython session for plotting and analysis.
44 local IPython session for plotting and analysis.
45 * Run a set of tasks on a set of CPUs using dynamic load balancing.
45 * Run a set of tasks on a set of CPUs using dynamic load balancing.
46
46
47 Architecture overview
47 Architecture overview
48 =====================
48 =====================
49
49
50 The IPython architecture consists of three components:
50 The IPython architecture consists of three components:
51
51
52 * The IPython engine.
52 * The IPython engine.
53 * The IPython controller.
53 * The IPython controller.
54 * Various controller Clients.
54 * Various controller Clients.
55
55
56 IPython engine
56 IPython engine
57 ---------------
57 ---------------
58
58
59 The IPython engine is a Python instance that takes Python commands over a
59 The IPython engine is a Python instance that takes Python commands over a
60 network connection. Eventually, the IPython engine will be a full IPython
60 network connection. Eventually, the IPython engine will be a full IPython
61 interpreter, but for now, it is a regular Python interpreter. The engine
61 interpreter, but for now, it is a regular Python interpreter. The engine
62 can also handle incoming and outgoing Python objects sent over a network
62 can also handle incoming and outgoing Python objects sent over a network
63 connection. When multiple engines are started, parallel and distributed
63 connection. When multiple engines are started, parallel and distributed
64 computing becomes possible. An important feature of an IPython engine is
64 computing becomes possible. An important feature of an IPython engine is
65 that it blocks while user code is being executed. Read on for how the
65 that it blocks while user code is being executed. Read on for how the
66 IPython controller solves this problem to expose a clean asynchronous API
66 IPython controller solves this problem to expose a clean asynchronous API
67 to the user.
67 to the user.
68
68
69 IPython controller
69 IPython controller
70 ------------------
70 ------------------
71
71
72 The IPython controller provides an interface for working with a set of
72 The IPython controller provides an interface for working with a set of
73 engines. At an general level, the controller is a process to which
73 engines. At an general level, the controller is a process to which
74 IPython engines can connect. For each connected engine, the controller
74 IPython engines can connect. For each connected engine, the controller
75 manages a queue. All actions that can be performed on the engine go
75 manages a queue. All actions that can be performed on the engine go
76 through this queue. While the engines themselves block when user code is
76 through this queue. While the engines themselves block when user code is
77 run, the controller hides that from the user to provide a fully
77 run, the controller hides that from the user to provide a fully
78 asynchronous interface to a set of engines. Because the controller
78 asynchronous interface to a set of engines. Because the controller
79 listens on a network port for engines to connect to it, it must be
79 listens on a network port for engines to connect to it, it must be
80 started before any engines are started.
80 started before any engines are started.
81
81
82 The controller also provides a single point of contact for users who wish
82 The controller also provides a single point of contact for users who wish
83 to utilize the engines connected to the controller. There are different
83 to utilize the engines connected to the controller. There are different
84 ways of working with a controller. In IPython these ways correspond to different interfaces that the controller is adapted to. Currently we have two default interfaces to the controller:
84 ways of working with a controller. In IPython these ways correspond to different interfaces that the controller is adapted to. Currently we have two default interfaces to the controller:
85
85
86 * The MultiEngine interface.
86 * The MultiEngine interface.
87 * The Task interface.
87 * The Task interface.
88
88
89 Advanced users can easily add new custom interfaces to enable other
89 Advanced users can easily add new custom interfaces to enable other
90 styles of parallelism.
90 styles of parallelism.
91
91
92 .. note::
92 .. note::
93
93
94 A single controller and set of engines can be accessed
94 A single controller and set of engines can be accessed
95 through multiple interfaces simultaneously. This opens the
95 through multiple interfaces simultaneously. This opens the
96 door for lots of interesting things.
96 door for lots of interesting things.
97
97
98 Controller clients
98 Controller clients
99 ------------------
99 ------------------
100
100
101 For each controller interface, there is a corresponding client. These
101 For each controller interface, there is a corresponding client. These
102 clients allow users to interact with a set of engines through the
102 clients allow users to interact with a set of engines through the
103 interface.
103 interface.
104
104
105 Security
105 Security
106 --------
106 --------
107
107
108 By default (as long as `pyOpenSSL` is installed) all network connections between the controller and engines and the controller and clients are secure. What does this mean? First of all, all of the connections will be encrypted using SSL. Second, the connections are authenticated. We handle authentication in a `capabilities`__ based security model. In this model, a "capability (known in some systems as a key) is a communicable, unforgeable token of authority". Put simply, a capability is like a key to your house. If you have the key to your house, you can get in, if not you can't.
108 By default (as long as `pyOpenSSL` is installed) all network connections between the controller and engines and the controller and clients are secure. What does this mean? First of all, all of the connections will be encrypted using SSL. Second, the connections are authenticated. We handle authentication in a `capabilities`__ based security model. In this model, a "capability (known in some systems as a key) is a communicable, unforgeable token of authority". Put simply, a capability is like a key to your house. If you have the key to your house, you can get in, if not you can't.
109
109
110 .. __: http://en.wikipedia.org/wiki/Capability-based_security
110 .. __: http://en.wikipedia.org/wiki/Capability-based_security
111
111
112 In our architecture, the controller is the only process that listens on network ports, and is thus responsible to creating these keys. In IPython, these keys are known as Foolscap URLs, or FURLs, because of the underlying network protocol we are using. As a user, you don't need to know anything about the details of these FURLs, other than that when the controller starts, it saves a set of FURLs to files named something.furl. The default location of these files is your ~./ipython directory.
112 In our architecture, the controller is the only process that listens on network ports, and is thus responsible to creating these keys. In IPython, these keys are known as Foolscap URLs, or FURLs, because of the underlying network protocol we are using. As a user, you don't need to know anything about the details of these FURLs, other than that when the controller starts, it saves a set of FURLs to files named something.furl. The default location of these files is your ~./ipython directory.
113
113
114 To connect and authenticate to the controller an engine or client simply needs to present an appropriate furl (that was originally created by the controller) to the controller. Thus, the .furl files need to be copied to a location where the clients and engines can find them. Typically, this is the ~./ipython directory on the host where the client/engine is running (which could be a different host than the controller). Once the .furl files are copied over, everything should work fine.
114 To connect and authenticate to the controller an engine or client simply needs to present an appropriate furl (that was originally created by the controller) to the controller. Thus, the .furl files need to be copied to a location where the clients and engines can find them. Typically, this is the ~./ipython directory on the host where the client/engine is running (which could be a different host than the controller). Once the .furl files are copied over, everything should work fine.
115
115
116 Getting Started
116 Getting Started
117 ===============
117 ===============
118
118
119 To use IPython for parallel computing, you need to start one instance of
119 To use IPython for parallel computing, you need to start one instance of
120 the controller and one or more instances of the engine. The controller
120 the controller and one or more instances of the engine. The controller
121 and each engine can run on different machines or on the same machine.
121 and each engine can run on different machines or on the same machine.
122 Because of this, there are many different possibilities for setting up
122 Because of this, there are many different possibilities for setting up
123 the IP addresses and ports used by the various processes.
123 the IP addresses and ports used by the various processes.
124
124
125 Starting the controller and engine on your local machine
125 Starting the controller and engine on your local machine
126 --------------------------------------------------------
126 --------------------------------------------------------
127
127
128 This is the simplest configuration that can be used and is useful for
128 This is the simplest configuration that can be used and is useful for
129 testing the system and on machines that have multiple cores and/or
129 testing the system and on machines that have multiple cores and/or
130 multple CPUs. The easiest way of doing this is using the ``ipcluster``
130 multple CPUs. The easiest way of doing this is using the ``ipcluster``
131 command::
131 command::
132
132
133 $ ipcluster -n 4
133 $ ipcluster -n 4
134
134
135 This will start an IPython controller and then 4 engines that connect to
135 This will start an IPython controller and then 4 engines that connect to
136 the controller. Lastly, the script will print out the Python commands
136 the controller. Lastly, the script will print out the Python commands
137 that you can use to connect to the controller. It is that easy.
137 that you can use to connect to the controller. It is that easy.
138
138
139 Underneath the hood, the ``ipcluster`` script uses two other top-level
139 Underneath the hood, the ``ipcluster`` script uses two other top-level
140 scripts that you can also use yourself. These scripts are
140 scripts that you can also use yourself. These scripts are
141 ``ipcontroller``, which starts the controller and ``ipengine`` which
141 ``ipcontroller``, which starts the controller and ``ipengine`` which
142 starts one engine. To use these scripts to start things on your local
142 starts one engine. To use these scripts to start things on your local
143 machine, do the following.
143 machine, do the following.
144
144
145 First start the controller::
145 First start the controller::
146
146
147 $ ipcontroller &
147 $ ipcontroller &
148
148
149 Next, start however many instances of the engine you want using (repeatedly) the command::
149 Next, start however many instances of the engine you want using (repeatedly) the command::
150
150
151 $ ipengine &
151 $ ipengine &
152
152
153 .. warning::
153 .. warning::
154
154
155 The order of the above operations is very important. You *must*
155 The order of the above operations is very important. You *must*
156 start the controller before the engines, since the engines connect
156 start the controller before the engines, since the engines connect
157 to the controller as they get started.
157 to the controller as they get started.
158
158
159 On some platforms you may need to give these commands in the form
159 On some platforms you may need to give these commands in the form
160 ``(ipcontroller &)`` and ``(ipengine &)`` for them to work properly. The
160 ``(ipcontroller &)`` and ``(ipengine &)`` for them to work properly. The
161 engines should start and automatically connect to the controller on the
161 engines should start and automatically connect to the controller on the
162 default ports, which are chosen for this type of setup. You are now ready
162 default ports, which are chosen for this type of setup. You are now ready
163 to use the controller and engines from IPython.
163 to use the controller and engines from IPython.
164
164
165 Starting the controller and engines on different machines
165 Starting the controller and engines on different machines
166 ---------------------------------------------------------
166 ---------------------------------------------------------
167
167
168 This section needs to be updated to reflect the new Foolscap capabilities based
168 This section needs to be updated to reflect the new Foolscap capabilities based
169 model.
169 model.
170
170
171 Using ``ipcluster`` with ``ssh``
171 Using ``ipcluster`` with ``ssh``
172 --------------------------------
172 --------------------------------
173
173
174 The ``ipcluster`` command can also start a controller and engines using
174 The ``ipcluster`` command can also start a controller and engines using
175 ``ssh``. We need more documentation on this, but for now here is any
175 ``ssh``. We need more documentation on this, but for now here is any
176 example startup script::
176 example startup script::
177
177
178 controller = dict(host='myhost',
178 controller = dict(host='myhost',
179 engine_port=None, # default is 10105
179 engine_port=None, # default is 10105
180 control_port=None,
180 control_port=None,
181 )
181 )
182
182
183 # keys are hostnames, values are the number of engine on that host
183 # keys are hostnames, values are the number of engine on that host
184 engines = dict(node1=2,
184 engines = dict(node1=2,
185 node2=2,
185 node2=2,
186 node3=2,
186 node3=2,
187 node3=2,
187 node3=2,
188 )
188 )
189
189
190 Starting engines using ``mpirun``
190 Starting engines using ``mpirun``
191 ---------------------------------
191 ---------------------------------
192
192
193 The IPython engines can be started using ``mpirun``/``mpiexec``, even if
193 The IPython engines can be started using ``mpirun``/``mpiexec``, even if
194 the engines don't call MPI_Init() or use the MPI API in any way. This is
194 the engines don't call MPI_Init() or use the MPI API in any way. This is
195 supported on modern MPI implementations like `Open MPI`_.. This provides
195 supported on modern MPI implementations like `Open MPI`_.. This provides
196 an really nice way of starting a bunch of engine. On a system with MPI
196 an really nice way of starting a bunch of engine. On a system with MPI
197 installed you can do::
197 installed you can do::
198
198
199 mpirun -n 4 ipengine --controller-port=10000 --controller-ip=host0
199 mpirun -n 4 ipengine --controller-port=10000 --controller-ip=host0
200
200
201 .. _Open MPI: http://www.open-mpi.org/
201 .. _Open MPI: http://www.open-mpi.org/
202
202
203 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
203 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
204
204
205 Log files
205 Log files
206 ---------
206 ---------
207
207
208 All of the components of IPython have log files associated with them.
208 All of the components of IPython have log files associated with them.
209 These log files can be extremely useful in debugging problems with
209 These log files can be extremely useful in debugging problems with
210 IPython and can be found in the directory ``~/.ipython/log``. Sending
210 IPython and can be found in the directory ``~/.ipython/log``. Sending
211 the log files to us will often help us to debug any problems.
211 the log files to us will often help us to debug any problems.
212
212
213 Next Steps
213 Next Steps
214 ==========
214 ==========
215
215
216 Once you have started the IPython controller and one or more engines, you
216 Once you have started the IPython controller and one or more engines, you
217 are ready to use the engines to do somnething useful. To make sure
217 are ready to use the engines to do somnething useful. To make sure
218 everything is working correctly, try the following commands::
218 everything is working correctly, try the following commands::
219
219
220 In [1]: from ipython1.kernel import client
220 In [1]: from IPython.kernel import client
221
221
222 In [2]: mec = client.MultiEngineClient() # This looks for .furl files in ~./ipython
222 In [2]: mec = client.MultiEngineClient() # This looks for .furl files in ~./ipython
223
223
224 In [4]: mec.get_ids()
224 In [4]: mec.get_ids()
225 Out[4]: [0, 1, 2, 3]
225 Out[4]: [0, 1, 2, 3]
226
226
227 In [5]: mec.execute('print "Hello World"')
227 In [5]: mec.execute('print "Hello World"')
228 Out[5]:
228 Out[5]:
229 <Results List>
229 <Results List>
230 [0] In [1]: print "Hello World"
230 [0] In [1]: print "Hello World"
231 [0] Out[1]: Hello World
231 [0] Out[1]: Hello World
232
232
233 [1] In [1]: print "Hello World"
233 [1] In [1]: print "Hello World"
234 [1] Out[1]: Hello World
234 [1] Out[1]: Hello World
235
235
236 [2] In [1]: print "Hello World"
236 [2] In [1]: print "Hello World"
237 [2] Out[1]: Hello World
237 [2] Out[1]: Hello World
238
238
239 [3] In [1]: print "Hello World"
239 [3] In [1]: print "Hello World"
240 [3] Out[1]: Hello World
240 [3] Out[1]: Hello World
241
241
242 If this works, you are ready to learn more about the :ref:`MultiEngine <parallelmultiengine>` and :ref:`Task <paralleltask>` interfaces to the controller.
242 If this works, you are ready to learn more about the :ref:`MultiEngine <parallelmultiengine>` and :ref:`Task <paralleltask>` interfaces to the controller.
@@ -1,728 +1,728 b''
1 .. _parallelmultiengine:
1 .. _parallelmultiengine:
2
2
3 =================================
3 =================================
4 IPython's MultiEngine interface
4 IPython's MultiEngine interface
5 =================================
5 =================================
6
6
7 .. contents::
7 .. contents::
8
8
9 The MultiEngine interface represents one possible way of working with a
9 The MultiEngine interface represents one possible way of working with a
10 set of IPython engines. The basic idea behind the MultiEngine interface is
10 set of IPython engines. The basic idea behind the MultiEngine interface is
11 that the capabilities of each engine are explicitly exposed to the user.
11 that the capabilities of each engine are explicitly exposed to the user.
12 Thus, in the MultiEngine interface, each engine is given an id that is
12 Thus, in the MultiEngine interface, each engine is given an id that is
13 used to identify the engine and give it work to do. This interface is very
13 used to identify the engine and give it work to do. This interface is very
14 intuitive and is designed with interactive usage in mind, and is thus the
14 intuitive and is designed with interactive usage in mind, and is thus the
15 best place for new users of IPython to begin.
15 best place for new users of IPython to begin.
16
16
17 Starting the IPython controller and engines
17 Starting the IPython controller and engines
18 ===========================================
18 ===========================================
19
19
20 To follow along with this tutorial, you will need to start the IPython
20 To follow along with this tutorial, you will need to start the IPython
21 controller and four IPython engines. The simplest way of doing this is to
21 controller and four IPython engines. The simplest way of doing this is to
22 use the ``ipcluster`` command::
22 use the ``ipcluster`` command::
23
23
24 $ ipcluster -n 4
24 $ ipcluster -n 4
25
25
26 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
27
27
28 Creating a ``MultiEngineClient`` instance
28 Creating a ``MultiEngineClient`` instance
29 =========================================
29 =========================================
30
30
31 The first step is to import the IPython ``client`` module and then create a ``MultiEngineClient`` instance::
31 The first step is to import the IPython ``client`` module and then create a ``MultiEngineClient`` instance::
32
32
33 In [1]: from ipython1.kernel import client
33 In [1]: from IPython.kernel import client
34
34
35 In [2]: mec = client.MultiEngineClient()
35 In [2]: mec = client.MultiEngineClient()
36
36
37 To make sure there are engines connected to the controller, use can get a list of engine ids::
37 To make sure there are engines connected to the controller, use can get a list of engine ids::
38
38
39 In [3]: mec.get_ids()
39 In [3]: mec.get_ids()
40 Out[3]: [0, 1, 2, 3]
40 Out[3]: [0, 1, 2, 3]
41
41
42 Here we see that there are four engines ready to do work for us.
42 Here we see that there are four engines ready to do work for us.
43
43
44 Running Python commands
44 Running Python commands
45 =======================
45 =======================
46
46
47 The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method.
47 The most basic type of operation that can be performed on the engines is to execute Python code. Executing Python code can be done in blocking or non-blocking mode (blocking is default) using the ``execute`` method.
48
48
49 Blocking execution
49 Blocking execution
50 ------------------
50 ------------------
51
51
52 In blocking mode, the ``MultiEngineClient`` object (called ``mec`` in
52 In blocking mode, the ``MultiEngineClient`` object (called ``mec`` in
53 these examples) submits the command to the controller, which places the
53 these examples) submits the command to the controller, which places the
54 command in the engines' queues for execution. The ``execute`` call then
54 command in the engines' queues for execution. The ``execute`` call then
55 blocks until the engines are done executing the command::
55 blocks until the engines are done executing the command::
56
56
57 # The default is to run on all engines
57 # The default is to run on all engines
58 In [4]: mec.execute('a=5')
58 In [4]: mec.execute('a=5')
59 Out[4]:
59 Out[4]:
60 <Results List>
60 <Results List>
61 [0] In [1]: a=5
61 [0] In [1]: a=5
62 [1] In [1]: a=5
62 [1] In [1]: a=5
63 [2] In [1]: a=5
63 [2] In [1]: a=5
64 [3] In [1]: a=5
64 [3] In [1]: a=5
65
65
66 In [5]: mec.execute('b=10')
66 In [5]: mec.execute('b=10')
67 Out[5]:
67 Out[5]:
68 <Results List>
68 <Results List>
69 [0] In [2]: b=10
69 [0] In [2]: b=10
70 [1] In [2]: b=10
70 [1] In [2]: b=10
71 [2] In [2]: b=10
71 [2] In [2]: b=10
72 [3] In [2]: b=10
72 [3] In [2]: b=10
73
73
74 Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument::
74 Python commands can be executed on specific engines by calling execute using the ``targets`` keyword argument::
75
75
76 In [6]: mec.execute('c=a+b',targets=[0,2])
76 In [6]: mec.execute('c=a+b',targets=[0,2])
77 Out[6]:
77 Out[6]:
78 <Results List>
78 <Results List>
79 [0] In [3]: c=a+b
79 [0] In [3]: c=a+b
80 [2] In [3]: c=a+b
80 [2] In [3]: c=a+b
81
81
82
82
83 In [7]: mec.execute('c=a-b',targets=[1,3])
83 In [7]: mec.execute('c=a-b',targets=[1,3])
84 Out[7]:
84 Out[7]:
85 <Results List>
85 <Results List>
86 [1] In [3]: c=a-b
86 [1] In [3]: c=a-b
87 [3] In [3]: c=a-b
87 [3] In [3]: c=a-b
88
88
89
89
90 In [8]: mec.execute('print c')
90 In [8]: mec.execute('print c')
91 Out[8]:
91 Out[8]:
92 <Results List>
92 <Results List>
93 [0] In [4]: print c
93 [0] In [4]: print c
94 [0] Out[4]: 15
94 [0] Out[4]: 15
95
95
96 [1] In [4]: print c
96 [1] In [4]: print c
97 [1] Out[4]: -5
97 [1] Out[4]: -5
98
98
99 [2] In [4]: print c
99 [2] In [4]: print c
100 [2] Out[4]: 15
100 [2] Out[4]: 15
101
101
102 [3] In [4]: print c
102 [3] In [4]: print c
103 [3] Out[4]: -5
103 [3] Out[4]: -5
104
104
105 This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information::
105 This example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The ``execute`` method returns a Python ``dict`` that contains useful information::
106
106
107 In [9]: result_dict = mec.execute('d=10; print d')
107 In [9]: result_dict = mec.execute('d=10; print d')
108
108
109 In [10]: for r in result_dict:
109 In [10]: for r in result_dict:
110 ....: print r
110 ....: print r
111 ....:
111 ....:
112 ....:
112 ....:
113 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'}
113 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 0, 'stdout': '10\n'}
114 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'}
114 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 1, 'stdout': '10\n'}
115 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'}
115 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 2, 'stdout': '10\n'}
116 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'}
116 {'input': {'translated': 'd=10; print d', 'raw': 'd=10; print d'}, 'number': 5, 'id': 3, 'stdout': '10\n'}
117
117
118 Non-blocking execution
118 Non-blocking execution
119 ----------------------
119 ----------------------
120
120
121 In non-blocking mode, ``execute`` submits the command to be executed and then returns a
121 In non-blocking mode, ``execute`` submits the command to be executed and then returns a
122 ``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a
122 ``PendingResult`` object immediately. The ``PendingResult`` object gives you a way of getting a
123 result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to
123 result at a later time through its ``get_result`` method or ``r`` attribute. This allows you to
124 quickly submit long running commands without blocking your local Python/IPython session::
124 quickly submit long running commands without blocking your local Python/IPython session::
125
125
126 # In blocking mode
126 # In blocking mode
127 In [6]: mec.execute('import time')
127 In [6]: mec.execute('import time')
128 Out[6]:
128 Out[6]:
129 <Results List>
129 <Results List>
130 [0] In [1]: import time
130 [0] In [1]: import time
131 [1] In [1]: import time
131 [1] In [1]: import time
132 [2] In [1]: import time
132 [2] In [1]: import time
133 [3] In [1]: import time
133 [3] In [1]: import time
134
134
135 # In non-blocking mode
135 # In non-blocking mode
136 In [7]: pr = mec.execute('time.sleep(10)',block=False)
136 In [7]: pr = mec.execute('time.sleep(10)',block=False)
137
137
138 # Now block for the result
138 # Now block for the result
139 In [8]: pr.get_result()
139 In [8]: pr.get_result()
140 Out[8]:
140 Out[8]:
141 <Results List>
141 <Results List>
142 [0] In [2]: time.sleep(10)
142 [0] In [2]: time.sleep(10)
143 [1] In [2]: time.sleep(10)
143 [1] In [2]: time.sleep(10)
144 [2] In [2]: time.sleep(10)
144 [2] In [2]: time.sleep(10)
145 [3] In [2]: time.sleep(10)
145 [3] In [2]: time.sleep(10)
146
146
147 # Again in non-blocking mode
147 # Again in non-blocking mode
148 In [9]: pr = mec.execute('time.sleep(10)',block=False)
148 In [9]: pr = mec.execute('time.sleep(10)',block=False)
149
149
150 # Poll to see if the result is ready
150 # Poll to see if the result is ready
151 In [10]: pr.get_result(block=False)
151 In [10]: pr.get_result(block=False)
152
152
153 # A shorthand for get_result(block=True)
153 # A shorthand for get_result(block=True)
154 In [11]: pr.r
154 In [11]: pr.r
155 Out[11]:
155 Out[11]:
156 <Results List>
156 <Results List>
157 [0] In [3]: time.sleep(10)
157 [0] In [3]: time.sleep(10)
158 [1] In [3]: time.sleep(10)
158 [1] In [3]: time.sleep(10)
159 [2] In [3]: time.sleep(10)
159 [2] In [3]: time.sleep(10)
160 [3] In [3]: time.sleep(10)
160 [3] In [3]: time.sleep(10)
161
161
162 Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready::
162 Often, it is desirable to wait until a set of ``PendingResult`` objects are done. For this, there is a the method ``barrier``. This method takes a tuple of ``PendingResult`` objects and blocks until all of the associated results are ready::
163
163
164 In [72]: mec.block=False
164 In [72]: mec.block=False
165
165
166 # A trivial list of PendingResults objects
166 # A trivial list of PendingResults objects
167 In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)]
167 In [73]: pr_list = [mec.execute('time.sleep(3)') for i in range(10)]
168
168
169 # Wait until all of them are done
169 # Wait until all of them are done
170 In [74]: mec.barrier(pr_list)
170 In [74]: mec.barrier(pr_list)
171
171
172 # Then, their results are ready using get_result or the r attribute
172 # Then, their results are ready using get_result or the r attribute
173 In [75]: pr_list[0].r
173 In [75]: pr_list[0].r
174 Out[75]:
174 Out[75]:
175 <Results List>
175 <Results List>
176 [0] In [20]: time.sleep(3)
176 [0] In [20]: time.sleep(3)
177 [1] In [19]: time.sleep(3)
177 [1] In [19]: time.sleep(3)
178 [2] In [20]: time.sleep(3)
178 [2] In [20]: time.sleep(3)
179 [3] In [19]: time.sleep(3)
179 [3] In [19]: time.sleep(3)
180
180
181
181
182 The ``block`` and ``targets`` keyword arguments and attributes
182 The ``block`` and ``targets`` keyword arguments and attributes
183 --------------------------------------------------------------
183 --------------------------------------------------------------
184
184
185 Most commands in the multiengine interface (like ``execute``) accept ``block`` and ``targets``
185 Most commands in the multiengine interface (like ``execute``) accept ``block`` and ``targets``
186 as keyword arguments. As we have seen above, these keyword arguments control the blocking mode
186 as keyword arguments. As we have seen above, these keyword arguments control the blocking mode
187 and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block``
187 and which engines the command is applied to. The ``MultiEngineClient`` class also has ``block``
188 and ``targets`` attributes that control the default behavior when the keyword arguments are not
188 and ``targets`` attributes that control the default behavior when the keyword arguments are not
189 provided. Thus the following logic is used for ``block`` and ``targets``:
189 provided. Thus the following logic is used for ``block`` and ``targets``:
190
190
191 * If no keyword argument is provided, the instance attributes are used.
191 * If no keyword argument is provided, the instance attributes are used.
192 * Keyword argument, if provided override the instance attributes.
192 * Keyword argument, if provided override the instance attributes.
193
193
194 The following examples demonstrate how to use the instance attributes::
194 The following examples demonstrate how to use the instance attributes::
195
195
196 In [16]: mec.targets = [0,2]
196 In [16]: mec.targets = [0,2]
197
197
198 In [17]: mec.block = False
198 In [17]: mec.block = False
199
199
200 In [18]: pr = mec.execute('a=5')
200 In [18]: pr = mec.execute('a=5')
201
201
202 In [19]: pr.r
202 In [19]: pr.r
203 Out[19]:
203 Out[19]:
204 <Results List>
204 <Results List>
205 [0] In [6]: a=5
205 [0] In [6]: a=5
206 [2] In [6]: a=5
206 [2] In [6]: a=5
207
207
208 # Note targets='all' means all engines
208 # Note targets='all' means all engines
209 In [20]: mec.targets = 'all'
209 In [20]: mec.targets = 'all'
210
210
211 In [21]: mec.block = True
211 In [21]: mec.block = True
212
212
213 In [22]: mec.execute('b=10; print b')
213 In [22]: mec.execute('b=10; print b')
214 Out[22]:
214 Out[22]:
215 <Results List>
215 <Results List>
216 [0] In [7]: b=10; print b
216 [0] In [7]: b=10; print b
217 [0] Out[7]: 10
217 [0] Out[7]: 10
218
218
219 [1] In [6]: b=10; print b
219 [1] In [6]: b=10; print b
220 [1] Out[6]: 10
220 [1] Out[6]: 10
221
221
222 [2] In [7]: b=10; print b
222 [2] In [7]: b=10; print b
223 [2] Out[7]: 10
223 [2] Out[7]: 10
224
224
225 [3] In [6]: b=10; print b
225 [3] In [6]: b=10; print b
226 [3] Out[6]: 10
226 [3] Out[6]: 10
227
227
228 The ``block`` and ``targets`` instance attributes also determine the behavior of the parallel
228 The ``block`` and ``targets`` instance attributes also determine the behavior of the parallel
229 magic commands...
229 magic commands...
230
230
231
231
232 Parallel magic commands
232 Parallel magic commands
233 -----------------------
233 -----------------------
234
234
235 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all')::
235 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to ``execute`` and ``get_result``. The ``%px`` magic executes a single Python command on the engines specified by the `magicTargets``targets` attribute of the ``MultiEngineClient`` instance (by default this is 'all')::
236
236
237 # Make this MultiEngineClient active for parallel magic commands
237 # Make this MultiEngineClient active for parallel magic commands
238 In [23]: mec.activate()
238 In [23]: mec.activate()
239
239
240 In [24]: mec.block=True
240 In [24]: mec.block=True
241
241
242 In [25]: import numpy
242 In [25]: import numpy
243
243
244 In [26]: %px import numpy
244 In [26]: %px import numpy
245 Executing command on Controller
245 Executing command on Controller
246 Out[26]:
246 Out[26]:
247 <Results List>
247 <Results List>
248 [0] In [8]: import numpy
248 [0] In [8]: import numpy
249 [1] In [7]: import numpy
249 [1] In [7]: import numpy
250 [2] In [8]: import numpy
250 [2] In [8]: import numpy
251 [3] In [7]: import numpy
251 [3] In [7]: import numpy
252
252
253
253
254 In [27]: %px a = numpy.random.rand(2,2)
254 In [27]: %px a = numpy.random.rand(2,2)
255 Executing command on Controller
255 Executing command on Controller
256 Out[27]:
256 Out[27]:
257 <Results List>
257 <Results List>
258 [0] In [9]: a = numpy.random.rand(2,2)
258 [0] In [9]: a = numpy.random.rand(2,2)
259 [1] In [8]: a = numpy.random.rand(2,2)
259 [1] In [8]: a = numpy.random.rand(2,2)
260 [2] In [9]: a = numpy.random.rand(2,2)
260 [2] In [9]: a = numpy.random.rand(2,2)
261 [3] In [8]: a = numpy.random.rand(2,2)
261 [3] In [8]: a = numpy.random.rand(2,2)
262
262
263
263
264 In [28]: %px print numpy.linalg.eigvals(a)
264 In [28]: %px print numpy.linalg.eigvals(a)
265 Executing command on Controller
265 Executing command on Controller
266 Out[28]:
266 Out[28]:
267 <Results List>
267 <Results List>
268 [0] In [10]: print numpy.linalg.eigvals(a)
268 [0] In [10]: print numpy.linalg.eigvals(a)
269 [0] Out[10]: [ 1.28167017 0.14197338]
269 [0] Out[10]: [ 1.28167017 0.14197338]
270
270
271 [1] In [9]: print numpy.linalg.eigvals(a)
271 [1] In [9]: print numpy.linalg.eigvals(a)
272 [1] Out[9]: [-0.14093616 1.27877273]
272 [1] Out[9]: [-0.14093616 1.27877273]
273
273
274 [2] In [10]: print numpy.linalg.eigvals(a)
274 [2] In [10]: print numpy.linalg.eigvals(a)
275 [2] Out[10]: [-0.37023573 1.06779409]
275 [2] Out[10]: [-0.37023573 1.06779409]
276
276
277 [3] In [9]: print numpy.linalg.eigvals(a)
277 [3] In [9]: print numpy.linalg.eigvals(a)
278 [3] Out[9]: [ 0.83664764 -0.25602658]
278 [3] Out[9]: [ 0.83664764 -0.25602658]
279
279
280 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the ``get_result`` method::
280 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last command executed on each engine. It is simply a shortcut to the ``get_result`` method::
281
281
282 In [29]: %result
282 In [29]: %result
283 Out[29]:
283 Out[29]:
284 <Results List>
284 <Results List>
285 [0] In [10]: print numpy.linalg.eigvals(a)
285 [0] In [10]: print numpy.linalg.eigvals(a)
286 [0] Out[10]: [ 1.28167017 0.14197338]
286 [0] Out[10]: [ 1.28167017 0.14197338]
287
287
288 [1] In [9]: print numpy.linalg.eigvals(a)
288 [1] In [9]: print numpy.linalg.eigvals(a)
289 [1] Out[9]: [-0.14093616 1.27877273]
289 [1] Out[9]: [-0.14093616 1.27877273]
290
290
291 [2] In [10]: print numpy.linalg.eigvals(a)
291 [2] In [10]: print numpy.linalg.eigvals(a)
292 [2] Out[10]: [-0.37023573 1.06779409]
292 [2] Out[10]: [-0.37023573 1.06779409]
293
293
294 [3] In [9]: print numpy.linalg.eigvals(a)
294 [3] In [9]: print numpy.linalg.eigvals(a)
295 [3] Out[9]: [ 0.83664764 -0.25602658]
295 [3] Out[9]: [ 0.83664764 -0.25602658]
296
296
297 The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the ``targets`` attribute::
297 The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the ``targets`` attribute::
298
298
299 In [30]: mec.block=False
299 In [30]: mec.block=False
300
300
301 In [31]: %autopx
301 In [31]: %autopx
302 Auto Parallel Enabled
302 Auto Parallel Enabled
303 Type %autopx to disable
303 Type %autopx to disable
304
304
305 In [32]: max_evals = []
305 In [32]: max_evals = []
306 <ipython1.kernel.multiengineclient.PendingResult object at 0x17b8a70>
306 <IPython.kernel.multiengineclient.PendingResult object at 0x17b8a70>
307
307
308 In [33]: for i in range(100):
308 In [33]: for i in range(100):
309 ....: a = numpy.random.rand(10,10)
309 ....: a = numpy.random.rand(10,10)
310 ....: a = a+a.transpose()
310 ....: a = a+a.transpose()
311 ....: evals = numpy.linalg.eigvals(a)
311 ....: evals = numpy.linalg.eigvals(a)
312 ....: max_evals.append(evals[0].real)
312 ....: max_evals.append(evals[0].real)
313 ....:
313 ....:
314 ....:
314 ....:
315 <ipython1.kernel.multiengineclient.PendingResult object at 0x17af8f0>
315 <IPython.kernel.multiengineclient.PendingResult object at 0x17af8f0>
316
316
317 In [34]: %autopx
317 In [34]: %autopx
318 Auto Parallel Disabled
318 Auto Parallel Disabled
319
319
320 In [35]: mec.block=True
320 In [35]: mec.block=True
321
321
322 In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
322 In [36]: px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
323 Executing command on Controller
323 Executing command on Controller
324 Out[36]:
324 Out[36]:
325 <Results List>
325 <Results List>
326 [0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
326 [0] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
327 [0] Out[13]: Average max eigenvalue is: 10.1387247332
327 [0] Out[13]: Average max eigenvalue is: 10.1387247332
328
328
329 [1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
329 [1] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
330 [1] Out[12]: Average max eigenvalue is: 10.2076902286
330 [1] Out[12]: Average max eigenvalue is: 10.2076902286
331
331
332 [2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
332 [2] In [13]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
333 [2] Out[13]: Average max eigenvalue is: 10.1891484655
333 [2] Out[13]: Average max eigenvalue is: 10.1891484655
334
334
335 [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
335 [3] In [12]: print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
336 [3] Out[12]: Average max eigenvalue is: 10.1158837784
336 [3] Out[12]: Average max eigenvalue is: 10.1158837784
337
337
338 Using the ``with`` statement of Python 2.5
338 Using the ``with`` statement of Python 2.5
339 ------------------------------------------
339 ------------------------------------------
340
340
341 Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute::
341 Python 2.5 introduced the ``with`` statement. The ``MultiEngineClient`` can be used with the ``with`` statement to execute a block of code on the engines indicated by the ``targets`` attribute::
342
342
343 In [3]: with mec:
343 In [3]: with mec:
344 ...: client.remote() # Required so the following code is not run locally
344 ...: client.remote() # Required so the following code is not run locally
345 ...: a = 10
345 ...: a = 10
346 ...: b = 30
346 ...: b = 30
347 ...: c = a+b
347 ...: c = a+b
348 ...:
348 ...:
349 ...:
349 ...:
350
350
351 In [4]: mec.get_result()
351 In [4]: mec.get_result()
352 Out[4]:
352 Out[4]:
353 <Results List>
353 <Results List>
354 [0] In [1]: a = 10
354 [0] In [1]: a = 10
355 b = 30
355 b = 30
356 c = a+b
356 c = a+b
357
357
358 [1] In [1]: a = 10
358 [1] In [1]: a = 10
359 b = 30
359 b = 30
360 c = a+b
360 c = a+b
361
361
362 [2] In [1]: a = 10
362 [2] In [1]: a = 10
363 b = 30
363 b = 30
364 c = a+b
364 c = a+b
365
365
366 [3] In [1]: a = 10
366 [3] In [1]: a = 10
367 b = 30
367 b = 30
368 c = a+b
368 c = a+b
369
369
370 This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``.
370 This is basically another way of calling execute, but one with allows you to avoid writing code in strings. When used in this way, the attributes ``targets`` and ``block`` are used to control how the code is executed. For now, if you run code in non-blocking mode you won't have access to the ``PendingResult``.
371
371
372 Moving Python object around
372 Moving Python object around
373 ===========================
373 ===========================
374
374
375 In addition to executing code on engines, you can transfer Python objects to and from your
375 In addition to executing code on engines, you can transfer Python objects to and from your
376 IPython session and the engines. In IPython, these operations are called ``push`` (sending an
376 IPython session and the engines. In IPython, these operations are called ``push`` (sending an
377 object to the engines) and ``pull`` (getting an object from the engines).
377 object to the engines) and ``pull`` (getting an object from the engines).
378
378
379 Basic push and pull
379 Basic push and pull
380 -------------------
380 -------------------
381
381
382 Here are some examples of how you use ``push`` and ``pull``::
382 Here are some examples of how you use ``push`` and ``pull``::
383
383
384 In [38]: mec.push(dict(a=1.03234,b=3453))
384 In [38]: mec.push(dict(a=1.03234,b=3453))
385 Out[38]: [None, None, None, None]
385 Out[38]: [None, None, None, None]
386
386
387 In [39]: mec.pull('a')
387 In [39]: mec.pull('a')
388 Out[39]: [1.03234, 1.03234, 1.03234, 1.03234]
388 Out[39]: [1.03234, 1.03234, 1.03234, 1.03234]
389
389
390 In [40]: mec.pull('b',targets=0)
390 In [40]: mec.pull('b',targets=0)
391 Out[40]: [3453]
391 Out[40]: [3453]
392
392
393 In [41]: mec.pull(('a','b'))
393 In [41]: mec.pull(('a','b'))
394 Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]]
394 Out[41]: [[1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453]]
395
395
396 In [42]: mec.zip_pull(('a','b'))
396 In [42]: mec.zip_pull(('a','b'))
397 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
397 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
398
398
399 In [43]: mec.push(dict(c='speed'))
399 In [43]: mec.push(dict(c='speed'))
400 Out[43]: [None, None, None, None]
400 Out[43]: [None, None, None, None]
401
401
402 In [44]: %px print c
402 In [44]: %px print c
403 Executing command on Controller
403 Executing command on Controller
404 Out[44]:
404 Out[44]:
405 <Results List>
405 <Results List>
406 [0] In [14]: print c
406 [0] In [14]: print c
407 [0] Out[14]: speed
407 [0] Out[14]: speed
408
408
409 [1] In [13]: print c
409 [1] In [13]: print c
410 [1] Out[13]: speed
410 [1] Out[13]: speed
411
411
412 [2] In [14]: print c
412 [2] In [14]: print c
413 [2] Out[14]: speed
413 [2] Out[14]: speed
414
414
415 [3] In [13]: print c
415 [3] In [13]: print c
416 [3] Out[13]: speed
416 [3] Out[13]: speed
417
417
418 In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects::
418 In non-blocking mode ``push`` and ``pull`` also return ``PendingResult`` objects::
419
419
420 In [47]: mec.block=False
420 In [47]: mec.block=False
421
421
422 In [48]: pr = mec.pull('a')
422 In [48]: pr = mec.pull('a')
423
423
424 In [49]: pr.r
424 In [49]: pr.r
425 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
425 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
426
426
427
427
428 Push and pull for functions
428 Push and pull for functions
429 ---------------------------
429 ---------------------------
430
430
431 Functions can also be pushed and pulled using ``push_function`` and ``pull_function``::
431 Functions can also be pushed and pulled using ``push_function`` and ``pull_function``::
432
432
433 In [53]: def f(x):
433 In [53]: def f(x):
434 ....: return 2.0*x**4
434 ....: return 2.0*x**4
435 ....:
435 ....:
436
436
437 In [54]: mec.push_function(dict(f=f))
437 In [54]: mec.push_function(dict(f=f))
438 Out[54]: [None, None, None, None]
438 Out[54]: [None, None, None, None]
439
439
440 In [55]: mec.execute('y = f(4.0)')
440 In [55]: mec.execute('y = f(4.0)')
441 Out[55]:
441 Out[55]:
442 <Results List>
442 <Results List>
443 [0] In [15]: y = f(4.0)
443 [0] In [15]: y = f(4.0)
444 [1] In [14]: y = f(4.0)
444 [1] In [14]: y = f(4.0)
445 [2] In [15]: y = f(4.0)
445 [2] In [15]: y = f(4.0)
446 [3] In [14]: y = f(4.0)
446 [3] In [14]: y = f(4.0)
447
447
448
448
449 In [56]: px print y
449 In [56]: px print y
450 Executing command on Controller
450 Executing command on Controller
451 Out[56]:
451 Out[56]:
452 <Results List>
452 <Results List>
453 [0] In [16]: print y
453 [0] In [16]: print y
454 [0] Out[16]: 512.0
454 [0] Out[16]: 512.0
455
455
456 [1] In [15]: print y
456 [1] In [15]: print y
457 [1] Out[15]: 512.0
457 [1] Out[15]: 512.0
458
458
459 [2] In [16]: print y
459 [2] In [16]: print y
460 [2] Out[16]: 512.0
460 [2] Out[16]: 512.0
461
461
462 [3] In [15]: print y
462 [3] In [15]: print y
463 [3] Out[15]: 512.0
463 [3] Out[15]: 512.0
464
464
465
465
466 Dictionary interface
466 Dictionary interface
467 --------------------
467 --------------------
468
468
469 As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``::
469 As a shorthand to ``push`` and ``pull``, the ``MultiEngineClient`` class implements some of the Python dictionary interface. This make the remote namespaces of the engines appear as a local dictionary. Underneath, this uses ``push`` and ``pull``::
470
470
471 In [50]: mec.block=True
471 In [50]: mec.block=True
472
472
473 In [51]: mec['a']=['foo','bar']
473 In [51]: mec['a']=['foo','bar']
474
474
475 In [52]: mec['a']
475 In [52]: mec['a']
476 Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']]
476 Out[52]: [['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar']]
477
477
478 Scatter and gather
478 Scatter and gather
479 ------------------
479 ------------------
480
480
481 Sometimes it is useful to partition a sequence and push the partitions to different engines. In
481 Sometimes it is useful to partition a sequence and push the partitions to different engines. In
482 MPI language, this is know as scatter/gather and we follow that terminology. However, it is
482 MPI language, this is know as scatter/gather and we follow that terminology. However, it is
483 important to remember that in IPython ``scatter`` is from the interactive IPython session to
483 important to remember that in IPython ``scatter`` is from the interactive IPython session to
484 the engines and ``gather`` is from the engines back to the interactive IPython session. For
484 the engines and ``gather`` is from the engines back to the interactive IPython session. For
485 scatter/gather operations between engines, MPI should be used::
485 scatter/gather operations between engines, MPI should be used::
486
486
487 In [58]: mec.scatter('a',range(16))
487 In [58]: mec.scatter('a',range(16))
488 Out[58]: [None, None, None, None]
488 Out[58]: [None, None, None, None]
489
489
490 In [59]: px print a
490 In [59]: px print a
491 Executing command on Controller
491 Executing command on Controller
492 Out[59]:
492 Out[59]:
493 <Results List>
493 <Results List>
494 [0] In [17]: print a
494 [0] In [17]: print a
495 [0] Out[17]: [0, 1, 2, 3]
495 [0] Out[17]: [0, 1, 2, 3]
496
496
497 [1] In [16]: print a
497 [1] In [16]: print a
498 [1] Out[16]: [4, 5, 6, 7]
498 [1] Out[16]: [4, 5, 6, 7]
499
499
500 [2] In [17]: print a
500 [2] In [17]: print a
501 [2] Out[17]: [8, 9, 10, 11]
501 [2] Out[17]: [8, 9, 10, 11]
502
502
503 [3] In [16]: print a
503 [3] In [16]: print a
504 [3] Out[16]: [12, 13, 14, 15]
504 [3] Out[16]: [12, 13, 14, 15]
505
505
506
506
507 In [60]: mec.gather('a')
507 In [60]: mec.gather('a')
508 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
508 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
509
509
510 Other things to look at
510 Other things to look at
511 =======================
511 =======================
512
512
513 Parallel map
513 Parallel map
514 ------------
514 ------------
515
515
516 Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart::
516 Python's builtin ``map`` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, the MultiEngine interface in IPython already has a parallel version of ``map`` that works just like its serial counterpart::
517
517
518 In [63]: serial_result = map(lambda x:x**10, range(32))
518 In [63]: serial_result = map(lambda x:x**10, range(32))
519
519
520 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
520 In [64]: parallel_result = mec.map(lambda x:x**10, range(32))
521
521
522 In [65]: serial_result==parallel_result
522 In [65]: serial_result==parallel_result
523 Out[65]: True
523 Out[65]: True
524
524
525 As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes.
525 As you would expect, the parallel version of ``map`` is also influenced by the ``block`` and ``targets`` keyword arguments and attributes.
526
526
527 How to do parallel list comprehensions
527 How to do parallel list comprehensions
528 --------------------------------------
528 --------------------------------------
529
529
530 In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``::
530 In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using ``scatter`` and ``gather``::
531
531
532 In [66]: mec.scatter('x',range(64))
532 In [66]: mec.scatter('x',range(64))
533 Out[66]: [None, None, None, None]
533 Out[66]: [None, None, None, None]
534
534
535 In [67]: px y = [i**10 for i in x]
535 In [67]: px y = [i**10 for i in x]
536 Executing command on Controller
536 Executing command on Controller
537 Out[67]:
537 Out[67]:
538 <Results List>
538 <Results List>
539 [0] In [19]: y = [i**10 for i in x]
539 [0] In [19]: y = [i**10 for i in x]
540 [1] In [18]: y = [i**10 for i in x]
540 [1] In [18]: y = [i**10 for i in x]
541 [2] In [19]: y = [i**10 for i in x]
541 [2] In [19]: y = [i**10 for i in x]
542 [3] In [18]: y = [i**10 for i in x]
542 [3] In [18]: y = [i**10 for i in x]
543
543
544
544
545 In [68]: y = mec.gather('y')
545 In [68]: y = mec.gather('y')
546
546
547 In [69]: print y
547 In [69]: print y
548 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
548 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
549
549
550 Parallel Exceptions
550 Parallel Exceptions
551 -------------------
551 -------------------
552
552
553 In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works::
553 In the MultiEngine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, the MultiEngine interface has a ``CompositeError`` exception class that will be raised in most cases. The ``CompositeError`` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works::
554
554
555 In [76]: mec.block=True
555 In [76]: mec.block=True
556
556
557 In [77]: mec.execute('1/0')
557 In [77]: mec.execute('1/0')
558 ---------------------------------------------------------------------------
558 ---------------------------------------------------------------------------
559 CompositeError Traceback (most recent call last)
559 CompositeError Traceback (most recent call last)
560
560
561 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
561 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
562
562
563 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
563 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
564 432 targets, block = self._findTargetsAndBlock(targets, block)
564 432 targets, block = self._findTargetsAndBlock(targets, block)
565 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
565 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
566 --> 434 targets=targets, block=block)
566 --> 434 targets=targets, block=block)
567 435 if block:
567 435 if block:
568 436 result = ResultList(result)
568 436 result = ResultList(result)
569
569
570 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
570 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
571 72 result.raiseException()
571 72 result.raiseException()
572 73 except Exception, e:
572 73 except Exception, e:
573 ---> 74 raise e
573 ---> 74 raise e
574 75 return result
574 75 return result
575 76
575 76
576
576
577 CompositeError: one or more exceptions from call to method: execute
577 CompositeError: one or more exceptions from call to method: execute
578 [0:execute]: ZeroDivisionError: integer division or modulo by zero
578 [0:execute]: ZeroDivisionError: integer division or modulo by zero
579 [1:execute]: ZeroDivisionError: integer division or modulo by zero
579 [1:execute]: ZeroDivisionError: integer division or modulo by zero
580 [2:execute]: ZeroDivisionError: integer division or modulo by zero
580 [2:execute]: ZeroDivisionError: integer division or modulo by zero
581 [3:execute]: ZeroDivisionError: integer division or modulo by zero
581 [3:execute]: ZeroDivisionError: integer division or modulo by zero
582
582
583 Notice how the error message printed when ``CompositeError`` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
583 Notice how the error message printed when ``CompositeError`` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions::
584
584
585 In [80]: try:
585 In [80]: try:
586 ....: mec.execute('1/0')
586 ....: mec.execute('1/0')
587 ....: except client.CompositeError, e:
587 ....: except client.CompositeError, e:
588 ....: e.raise_exception()
588 ....: e.raise_exception()
589 ....:
589 ....:
590 ....:
590 ....:
591 ---------------------------------------------------------------------------
591 ---------------------------------------------------------------------------
592 ZeroDivisionError Traceback (most recent call last)
592 ZeroDivisionError Traceback (most recent call last)
593
593
594 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
594 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
595
595
596 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
596 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
597 156 raise IndexError("an exception with index %i does not exist"%excid)
597 156 raise IndexError("an exception with index %i does not exist"%excid)
598 157 else:
598 157 else:
599 --> 158 raise et, ev, etb
599 --> 158 raise et, ev, etb
600 159
600 159
601 160 def collect_exceptions(rlist, method):
601 160 def collect_exceptions(rlist, method):
602
602
603 ZeroDivisionError: integer division or modulo by zero
603 ZeroDivisionError: integer division or modulo by zero
604
604
605 If you are working in IPython, you can simple type ``%debug`` after one of these ``CompositeError`` is raised, and inspect the exception instance::
605 If you are working in IPython, you can simple type ``%debug`` after one of these ``CompositeError`` is raised, and inspect the exception instance::
606
606
607 In [81]: mec.execute('1/0')
607 In [81]: mec.execute('1/0')
608 ---------------------------------------------------------------------------
608 ---------------------------------------------------------------------------
609 CompositeError Traceback (most recent call last)
609 CompositeError Traceback (most recent call last)
610
610
611 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
611 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
612
612
613 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
613 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
614 432 targets, block = self._findTargetsAndBlock(targets, block)
614 432 targets, block = self._findTargetsAndBlock(targets, block)
615 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
615 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
616 --> 434 targets=targets, block=block)
616 --> 434 targets=targets, block=block)
617 435 if block:
617 435 if block:
618 436 result = ResultList(result)
618 436 result = ResultList(result)
619
619
620 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
620 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
621 72 result.raiseException()
621 72 result.raiseException()
622 73 except Exception, e:
622 73 except Exception, e:
623 ---> 74 raise e
623 ---> 74 raise e
624 75 return result
624 75 return result
625 76
625 76
626
626
627 CompositeError: one or more exceptions from call to method: execute
627 CompositeError: one or more exceptions from call to method: execute
628 [0:execute]: ZeroDivisionError: integer division or modulo by zero
628 [0:execute]: ZeroDivisionError: integer division or modulo by zero
629 [1:execute]: ZeroDivisionError: integer division or modulo by zero
629 [1:execute]: ZeroDivisionError: integer division or modulo by zero
630 [2:execute]: ZeroDivisionError: integer division or modulo by zero
630 [2:execute]: ZeroDivisionError: integer division or modulo by zero
631 [3:execute]: ZeroDivisionError: integer division or modulo by zero
631 [3:execute]: ZeroDivisionError: integer division or modulo by zero
632
632
633 In [82]: %debug
633 In [82]: %debug
634 >
634 >
635
635
636 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
636 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
637 73 except Exception, e:
637 73 except Exception, e:
638 ---> 74 raise e
638 ---> 74 raise e
639 75 return result
639 75 return result
640
640
641 # With the debugger running, e is the exceptions instance. We can tab complete
641 # With the debugger running, e is the exceptions instance. We can tab complete
642 # on it and see the extra methods that are available.
642 # on it and see the extra methods that are available.
643 ipdb> e.
643 ipdb> e.
644 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
644 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
645 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
645 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
646 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
646 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
647 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
647 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
648 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
648 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
649 ipdb> e.print_tracebacks()
649 ipdb> e.print_tracebacks()
650 [0:execute]:
650 [0:execute]:
651 ---------------------------------------------------------------------------
651 ---------------------------------------------------------------------------
652 ZeroDivisionError Traceback (most recent call last)
652 ZeroDivisionError Traceback (most recent call last)
653
653
654 /ipython1-client-r3021/docs/examples/<string> in <module>()
654 /ipython1-client-r3021/docs/examples/<string> in <module>()
655
655
656 ZeroDivisionError: integer division or modulo by zero
656 ZeroDivisionError: integer division or modulo by zero
657
657
658 [1:execute]:
658 [1:execute]:
659 ---------------------------------------------------------------------------
659 ---------------------------------------------------------------------------
660 ZeroDivisionError Traceback (most recent call last)
660 ZeroDivisionError Traceback (most recent call last)
661
661
662 /ipython1-client-r3021/docs/examples/<string> in <module>()
662 /ipython1-client-r3021/docs/examples/<string> in <module>()
663
663
664 ZeroDivisionError: integer division or modulo by zero
664 ZeroDivisionError: integer division or modulo by zero
665
665
666 [2:execute]:
666 [2:execute]:
667 ---------------------------------------------------------------------------
667 ---------------------------------------------------------------------------
668 ZeroDivisionError Traceback (most recent call last)
668 ZeroDivisionError Traceback (most recent call last)
669
669
670 /ipython1-client-r3021/docs/examples/<string> in <module>()
670 /ipython1-client-r3021/docs/examples/<string> in <module>()
671
671
672 ZeroDivisionError: integer division or modulo by zero
672 ZeroDivisionError: integer division or modulo by zero
673
673
674 [3:execute]:
674 [3:execute]:
675 ---------------------------------------------------------------------------
675 ---------------------------------------------------------------------------
676 ZeroDivisionError Traceback (most recent call last)
676 ZeroDivisionError Traceback (most recent call last)
677
677
678 /ipython1-client-r3021/docs/examples/<string> in <module>()
678 /ipython1-client-r3021/docs/examples/<string> in <module>()
679
679
680 ZeroDivisionError: integer division or modulo by zero
680 ZeroDivisionError: integer division or modulo by zero
681
681
682 All of this same error handling magic even works in non-blocking mode::
682 All of this same error handling magic even works in non-blocking mode::
683
683
684 In [83]: mec.block=False
684 In [83]: mec.block=False
685
685
686 In [84]: pr = mec.execute('1/0')
686 In [84]: pr = mec.execute('1/0')
687
687
688 In [85]: pr.r
688 In [85]: pr.r
689 ---------------------------------------------------------------------------
689 ---------------------------------------------------------------------------
690 CompositeError Traceback (most recent call last)
690 CompositeError Traceback (most recent call last)
691
691
692 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
692 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
693
693
694 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
694 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
695 170
695 170
696 171 def _get_r(self):
696 171 def _get_r(self):
697 --> 172 return self.get_result(block=True)
697 --> 172 return self.get_result(block=True)
698 173
698 173
699 174 r = property(_get_r)
699 174 r = property(_get_r)
700
700
701 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
701 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
702 131 return self.result
702 131 return self.result
703 132 try:
703 132 try:
704 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
704 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
705 134 except error.ResultNotCompleted:
705 134 except error.ResultNotCompleted:
706 135 return default
706 135 return default
707
707
708 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
708 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
709 385
709 385
710 386 def get_pending_deferred(self, deferredID, block):
710 386 def get_pending_deferred(self, deferredID, block):
711 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
711 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
712 388
712 388
713 389 def barrier(self, pendingResults):
713 389 def barrier(self, pendingResults):
714
714
715 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
715 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
716 72 result.raiseException()
716 72 result.raiseException()
717 73 except Exception, e:
717 73 except Exception, e:
718 ---> 74 raise e
718 ---> 74 raise e
719 75 return result
719 75 return result
720 76
720 76
721
721
722 CompositeError: one or more exceptions from call to method: execute
722 CompositeError: one or more exceptions from call to method: execute
723 [0:execute]: ZeroDivisionError: integer division or modulo by zero
723 [0:execute]: ZeroDivisionError: integer division or modulo by zero
724 [1:execute]: ZeroDivisionError: integer division or modulo by zero
724 [1:execute]: ZeroDivisionError: integer division or modulo by zero
725 [2:execute]: ZeroDivisionError: integer division or modulo by zero
725 [2:execute]: ZeroDivisionError: integer division or modulo by zero
726 [3:execute]: ZeroDivisionError: integer division or modulo by zero
726 [3:execute]: ZeroDivisionError: integer division or modulo by zero
727
727
728
728
@@ -1,240 +1,240 b''
1 .. _paralleltask:
1 .. _paralleltask:
2
2
3 =================================
3 =================================
4 The IPython Task interface
4 The IPython Task interface
5 =================================
5 =================================
6
6
7 .. contents::
7 .. contents::
8
8
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
9 The ``Task`` interface to the controller presents the engines as a fault tolerant, dynamic load-balanced system or workers. Unlike the ``MultiEngine`` interface, in the ``Task`` interface, the user have no direct access to individual engines. In some ways, this interface is simpler, but in other ways it is more powerful. Best of all the user can use both of these interfaces at the same time to take advantage or both of their strengths. When the user can break up the user's work into segments that do not depend on previous execution, the ``Task`` interface is ideal. But it also has more power and flexibility, allowing the user to guide the distribution of jobs, without having to assign Tasks to engines explicitly.
10
10
11 Starting the IPython controller and engines
11 Starting the IPython controller and engines
12 ===========================================
12 ===========================================
13
13
14 To follow along with this tutorial, the user will need to start the IPython
14 To follow along with this tutorial, the user will need to start the IPython
15 controller and four IPython engines. The simplest way of doing this is to
15 controller and four IPython engines. The simplest way of doing this is to
16 use the ``ipcluster`` command::
16 use the ``ipcluster`` command::
17
17
18 $ ipcluster -n 4
18 $ ipcluster -n 4
19
19
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
20 For more detailed information about starting the controller and engines, see our :ref:`introduction <ip1par>` to using IPython for parallel computing.
21
21
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
22 The magic here is that this single controller and set of engines is running both the MultiEngine and ``Task`` interfaces simultaneously.
23
23
24 QuickStart Task Farming
24 QuickStart Task Farming
25 =======================
25 =======================
26
26
27 First, a quick example of how to start running the most basic Tasks.
27 First, a quick example of how to start running the most basic Tasks.
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
28 The first step is to import the IPython ``client`` module and then create a ``TaskClient`` instance::
29
29
30 In [1]: from ipython1.kernel import client
30 In [1]: from IPython.kernel import client
31
31
32 In [2]: tc = client.TaskClient()
32 In [2]: tc = client.TaskClient()
33
33
34 Then the user wrap the commands the user want to run in Tasks::
34 Then the user wrap the commands the user want to run in Tasks::
35
35
36 In [3]: tasklist = []
36 In [3]: tasklist = []
37 In [4]: for n in range(1000):
37 In [4]: for n in range(1000):
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
38 ... tasklist.append(client.Task("a = %i"%n, pull="a"))
39
39
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
40 The first argument of the ``Task`` constructor is a string, the command to be executed. The most important optional keyword argument is ``pull``, which can be a string or list of strings, and it specifies the variable names to be saved as results of the ``Task``.
41
41
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
42 Next, the user need to submit the Tasks to the ``TaskController`` with the ``TaskClient``::
43
43
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
44 In [5]: taskids = [ tc.run(t) for t in tasklist ]
45
45
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
46 This will give the user a list of the TaskIDs used by the controller to keep track of the Tasks and their results. Now at some point the user are going to want to get those results back. The ``barrier`` method allows the user to wait for the Tasks to finish running::
47
47
48 In [6]: tc.barrier(taskids)
48 In [6]: tc.barrier(taskids)
49
49
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
50 This command will block until all the Tasks in ``taskids`` have finished. Now, the user probably want to look at the user's results::
51
51
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
52 In [7]: task_results = [ tc.get_task_result(taskid) for taskid in taskids ]
53
53
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
54 Now the user have a list of ``TaskResult`` objects, which have the actual result as a dictionary, but also keep track of some useful metadata about the ``Task``::
55
55
56 In [8]: tr = ``Task``_results[73]
56 In [8]: tr = ``Task``_results[73]
57
57
58 In [9]: tr
58 In [9]: tr
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
59 Out[9]: ``TaskResult``[ID:73]:{'a':73}
60
60
61 In [10]: tr.engineid
61 In [10]: tr.engineid
62 Out[10]: 1
62 Out[10]: 1
63
63
64 In [11]: tr.submitted, tr.completed, tr.duration
64 In [11]: tr.submitted, tr.completed, tr.duration
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
65 Out[11]: ("2008/03/08 03:41:42", "2008/03/08 03:41:44", 2.12345)
66
66
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
67 The actual results are stored in a dictionary, ``tr.results``, and a namespace object ``tr.ns`` which accesses the result keys by attribute::
68
68
69 In [12]: tr.results['a']
69 In [12]: tr.results['a']
70 Out[12]: 73
70 Out[12]: 73
71
71
72 In [13]: tr.ns.a
72 In [13]: tr.ns.a
73 Out[13]: 73
73 Out[13]: 73
74
74
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
75 That should cover the basics of running simple Tasks. There are several more powerful things the user can do with Tasks covered later. The most useful probably being using a ``MutiEngineClient`` interface to initialize all the engines with the import dependencies necessary to run the user's Tasks.
76
76
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
77 There are many options for running and managing Tasks. The best way to learn further about the ``Task`` interface is to study the examples in ``docs/examples``. If the user do so and learn a lots about this interface, we encourage the user to expand this documentation about the ``Task`` system.
78
78
79 Overview of the Task System
79 Overview of the Task System
80 ===========================
80 ===========================
81
81
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
82 The user's view of the ``Task`` system has three basic objects: The ``TaskClient``, the ``Task``, and the ``TaskResult``. The names of these three objects well indicate their role.
83
83
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
84 The ``TaskClient`` is the user's ``Task`` farming connection to the IPython cluster. Unlike the ``MultiEngineClient``, the ``TaskControler`` handles all the scheduling and distribution of work, so the ``TaskClient`` has no notion of engines, it just submits Tasks and requests their results. The Tasks are described as ``Task`` objects, and their results are wrapped in ``TaskResult`` objects. Thus, there are very few necessary methods for the user to manage.
85
85
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
86 Inside the task system is a Scheduler object, which assigns tasks to workers. The default scheduler is a simple FIFO queue. Subclassing the Scheduler should be easy, just implementing your own priority system.
87
87
88 The TaskClient
88 The TaskClient
89 ==============
89 ==============
90
90
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
91 The ``TaskClient`` is the object the user use to connect to the ``Controller`` that is managing the user's Tasks. It is the analog of the ``MultiEngineClient`` for the standard IPython multiplexing interface. As with all client interfaces, the first step is to import the IPython Client Module::
92
92
93 In [1]: from ipython1.kernel import client
93 In [1]: from IPython.kernel import client
94
94
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
95 Just as with the ``MultiEngineClient``, the user create the ``TaskClient`` with a tuple, containing the ip-address and port of the ``Controller``. the ``client`` module conveniently has the default address of the ``Task`` interface of the controller. Creating a default ``TaskClient`` object would be done with this::
96
96
97 In [2]: tc = client.TaskClient(client.default_task_address)
97 In [2]: tc = client.TaskClient(client.default_task_address)
98
98
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
99 or, if the user want to specify a non default location of the ``Controller``, the user can specify explicitly::
100
100
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
101 In [3]: tc = client.TaskClient(("192.168.1.1", 10113))
102
102
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
103 As discussed earlier, the ``TaskClient`` only has a few basic methods.
104
104
105 * ``tc.run(task)``
105 * ``tc.run(task)``
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
106 ``run`` is the method by which the user submits Tasks. It takes exactly one argument, a ``Task`` object. All the advanced control of ``Task`` behavior is handled by properties of the ``Task`` object, rather than the submission command, so they will be discussed later in the `Task`_ section. ``run`` returns an integer, the ``Task``ID by which the ``Task`` and its results can be tracked and retrieved::
107
107
108 In [4]: ``Task``ID = tc.run(``Task``)
108 In [4]: ``Task``ID = tc.run(``Task``)
109
109
110 * ``tc.get_task_result(taskid, block=``False``)``
110 * ``tc.get_task_result(taskid, block=``False``)``
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
111 ``get_task_result`` is the method by which results are retrieved. It takes a single integer argument, the ``Task``ID`` of the result the user wish to retrieve. ``get_task_result`` also takes a keyword argument ``block``. ``block`` specifies whether the user actually want to wait for the result. If ``block`` is false, as it is by default, ``get_task_result`` will return immediately. If the ``Task`` has completed, it will return the ``TaskResult`` object for that ``Task``. But if the ``Task`` has not completed, it will return ``None``. If the user specify ``block=``True``, then ``get_task_result`` will wait for the ``Task`` to complete, and always return the ``TaskResult`` for the requested ``Task``.
112 * ``tc.barrier(taskid(s))``
112 * ``tc.barrier(taskid(s))``
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
113 ``barrier`` is a synchronization method. It takes exactly one argument, a ``Task``ID or list of taskIDs. ``barrier`` will block until all the specified Tasks have completed. In practice, a barrier is often called between the ``Task`` submission section of the code and the result gathering section::
114
114
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
115 In [5]: taskIDs = [ tc.run(``Task``) for ``Task`` in myTasks ]
116
116
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
117 In [6]: tc.get_task_result(taskIDs[-1]) is None
118 Out[6]: ``True``
118 Out[6]: ``True``
119
119
120 In [7]: tc.barrier(``Task``ID)
120 In [7]: tc.barrier(``Task``ID)
121
121
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
122 In [8]: results = [ tc.get_task_result(tid) for tid in taskIDs ]
123
123
124 * ``tc.queue_status(verbose=``False``)``
124 * ``tc.queue_status(verbose=``False``)``
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
125 ``queue_status`` is a method for querying the state of the ``TaskControler``. ``queue_status`` returns a dict of the form::
126
126
127 {'scheduled': Tasks that have been submitted but yet run
127 {'scheduled': Tasks that have been submitted but yet run
128 'pending' : Tasks that are currently running
128 'pending' : Tasks that are currently running
129 'succeeded': Tasks that have completed successfully
129 'succeeded': Tasks that have completed successfully
130 'failed' : Tasks that have finished with a failure
130 'failed' : Tasks that have finished with a failure
131 }
131 }
132
132
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
133 if @verbose is not specified (or is ``False``), then the values of the dict are integers - the number of Tasks in each state. if @verbose is ``True``, then each element in the dict is a list of the taskIDs in that state::
134
134
135 In [8]: tc.queue_status()
135 In [8]: tc.queue_status()
136 Out[8]: {'scheduled': 4,
136 Out[8]: {'scheduled': 4,
137 'pending' : 2,
137 'pending' : 2,
138 'succeeded': 5,
138 'succeeded': 5,
139 'failed' : 1
139 'failed' : 1
140 }
140 }
141
141
142 In [9]: tc.queue_status(verbose=True)
142 In [9]: tc.queue_status(verbose=True)
143 Out[9]: {'scheduled': [8,9,10,11],
143 Out[9]: {'scheduled': [8,9,10,11],
144 'pending' : [6,7],
144 'pending' : [6,7],
145 'succeeded': [0,1,2,4,5],
145 'succeeded': [0,1,2,4,5],
146 'failed' : [3]
146 'failed' : [3]
147 }
147 }
148
148
149 * ``tc.abort(taskid)``
149 * ``tc.abort(taskid)``
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
150 ``abort`` allows the user to abort Tasks that have already been submitted. ``abort`` will always return immediately. If the ``Task`` has completed, ``abort`` will raise an ``IndexError ``Task`` Already Completed``. An obvious case for ``abort`` would be where the user submits a long-running ``Task`` with a number of retries (see ``Task``_ section for how to specify retries) in an interactive session, but realizes there has been a typo. The user can then abort the ``Task``, preventing certain failures from cluttering up the queue. It can also be used for parallel search-type problems, where only one ``Task`` will give the solution, so once the user find the solution, the user would want to abort all remaining Tasks to prevent wasted work.
151 * ``tc.spin()``
151 * ``tc.spin()``
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
152 ``spin`` simply triggers the scheduler in the ``TaskControler``. Under most normal circumstances, this will do nothing. The primary known usage case involves the ``Task`` dependency (see `Dependencies`_). The dependency is a function of an Engine's ``properties``, but changing the ``properties`` via the ``MutliEngineClient`` does not trigger a reschedule event. The main example case for this requires the following event sequence:
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
153 * ``engine`` is available, ``Task`` is submitted, but ``engine`` does not have ``Task``'s dependencies.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
154 * ``engine`` gets necessary dependencies while no new Tasks are submitted or completed.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
155 * now ``engine`` can run ``Task``, but a ``Task`` event is required for the ``TaskControler`` to try scheduling ``Task`` again.
156
156
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
157 ``spin`` is just an empty ping method to ensure that the Controller has scheduled all available Tasks, and should not be needed under most normal circumstances.
158
158
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
159 That covers the ``TaskClient``, a simple interface to the cluster. With this, the user can submit jobs (and abort if necessary), request their results, synchronize on arbitrary subsets of jobs.
160
160
161 .. _task: The Task Object
161 .. _task: The Task Object
162
162
163 The Task Object
163 The Task Object
164 ===============
164 ===============
165
165
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
166 The ``Task`` is the basic object for describing a job. It can be used in a very simple manner, where the user just specifies a command string to be executed as the ``Task``. The usage of this first argument is exactly the same as the ``execute`` method of the ``MultiEngine`` (in fact, ``execute`` is called to run the code)::
167
167
168 In [1]: t = client.Task("a = str(id)")
168 In [1]: t = client.Task("a = str(id)")
169
169
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
170 This ``Task`` would run, and store the string representation of the ``id`` element in ``a`` in each worker's namespace, but it is fairly useless because the user does not know anything about the state of the ``worker`` on which it ran at the time of retrieving results. It is important that each ``Task`` not expect the state of the ``worker`` to persist after the ``Task`` is completed.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
171 There are many different situations for using ``Task`` Farming, and the ``Task`` object has many attributes for use in customizing the ``Task`` behavior. All of a ``Task``'s attributes may be specified in the constructor, through keyword arguments, or after ``Task`` construction through attribute assignment.
172
172
173 Data Attributes
173 Data Attributes
174 ***************
174 ***************
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
175 It is likely that the user may want to move data around before or after executing the ``Task``. We provide methods of sending data to initialize the worker's namespace, and specifying what data to bring back as the ``Task``'s results.
176
176
177 * pull = []
177 * pull = []
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
178 The obvious case is as above, where ``t`` would execute and store the result of ``myfunc`` in ``a``, it is likely that the user would want to bring ``a`` back to their namespace. This is done through the ``pull`` attribute. ``pull`` can be a string or list of strings, and it specifies the names of variables to be retrieved. The ``TaskResult`` object retrieved by ``get_task_result`` will have a dictionary of keys and values, and the ``Task``'s ``pull`` attribute determines what goes into it::
179
179
180 In [2]: t = client.Task("a = str(id)", pull = "a")
180 In [2]: t = client.Task("a = str(id)", pull = "a")
181
181
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
182 In [3]: t = client.Task("a = str(id)", pull = ["a", "id"])
183
183
184 * push = {}
184 * push = {}
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
185 A user might also want to initialize some data into the namespace before the code part of the ``Task`` is run. Enter ``push``. ``push`` is a dictionary of key/value pairs to be loaded from the user's namespace into the worker's immediately before execution::
186
186
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
187 In [4]: t = client.Task("a = f(submitted)", push=dict(submitted=time.time()), pull="a")
188
188
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
189 push and pull result directly in calling an ``engine``'s ``push`` and ``pull`` methods before and after ``Task`` execution respectively, and thus their api is the same.
190
190
191 Namespace Cleaning
191 Namespace Cleaning
192 ******************
192 ******************
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
193 When a user is running a large number of Tasks, it is likely that the namespace of the worker's could become cluttered. Some Tasks might be sensitive to clutter, while others might be known to cause namespace pollution. For these reasons, Tasks have two boolean attributes for cleaning up the namespace.
194
194
195 * ``clear_after``
195 * ``clear_after``
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
196 if clear_after is specified ``True``, the worker on which the ``Task`` was run will be reset (via ``engine.reset``) upon completion of the ``Task``. This can be useful for both Tasks that produce clutter or Tasks whose intermediate data one might wish to be kept private::
197
197
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
198 In [5]: t = client.Task("a = range(1e10)", pull = "a",clear_after=True)
199
199
200
200
201 * ``clear_before``
201 * ``clear_before``
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
202 as one might guess, clear_before is identical to ``clear_after``, but it takes place before the ``Task`` is run. This ensures that the ``Task`` runs on a fresh worker::
203
203
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
204 In [6]: t = client.Task("a = globals()", pull = "a",clear_before=True)
205
205
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
206 Of course, a user can both at the same time, ensuring that all workers are clear except when they are currently running a job. Both of these default to ``False``.
207
207
208 Fault Tolerance
208 Fault Tolerance
209 ***************
209 ***************
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
210 It is possible that Tasks might fail, and there are a variety of reasons this could happen. One might be that the worker it was running on disconnected, and there was nothing wrong with the ``Task`` itself. With the fault tolerance attributes of the ``Task``, the user can specify how many times to resubmit the ``Task``, and what to do if it never succeeds.
211
211
212 * ``retries``
212 * ``retries``
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
213 ``retries`` is an integer, specifying the number of times a ``Task`` is to be retried. It defaults to zero. It is often a good idea for this number to be 1 or 2, to protect the ``Task`` from disconnecting engines, but not a large number. If a ``Task`` is failing 100 times, there is probably something wrong with the ``Task``. The canonical bad example:
214
214
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
215 In [7]: t = client.Task("os.kill(os.getpid(), 9)", retries=99)
216
216
217 This would actually take down 100 workers.
217 This would actually take down 100 workers.
218
218
219 * ``recovery_task``
219 * ``recovery_task``
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
220 ``recovery_task`` is another ``Task`` object, to be run in the event of the original ``Task`` still failing after running out of retries. Since ``recovery_task`` is another ``Task`` object, it can have its own ``recovery_task``. The chain of Tasks is limitless, except loops are not allowed (that would be bad!).
221
221
222 Dependencies
222 Dependencies
223 ************
223 ************
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
224 Dependencies are the most powerful part of the ``Task`` farming system, because it allows the user to do some classification of the workers, and guide the ``Task`` distribution without meddling with the controller directly. It makes use of two objects - the ``Task``'s ``depend`` attribute, and the engine's ``properties``. See the `MultiEngine`_ reference for how to use engine properties. The engine properties api exists for extending IPython, allowing conditional execution and new controllers that make decisions based on properties of its engines. Currently the ``Task`` dependency is the only internal use of the properties api.
225
225
226 .. _MultiEngine: ./parallel_multiengine
226 .. _MultiEngine: ./parallel_multiengine
227
227
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
228 The ``depend`` attribute of a ``Task`` must be a function of exactly one argument, the worker's properties dictionary, and it should return ``True`` if the ``Task`` should be allowed to run on the worker and ``False`` if not. The usage in the controller is fault tolerant, so exceptions raised by ``Task.depend`` will be ignored and functionally equivalent to always returning ``False``. Tasks`` with invalid ``depend`` functions will never be assigned to a worker::
229
229
230 In [8]: def dep(properties):
230 In [8]: def dep(properties):
231 ... return properties["RAM"] > 2**32 # have at least 4GB
231 ... return properties["RAM"] > 2**32 # have at least 4GB
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
232 In [9]: t = client.Task("a = bigfunc()", depend=dep)
233
233
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
234 It is important to note that assignment of values to the properties dict is done entirely by the user, either locally (in the engine) using the EngineAPI, or remotely, through the ``MultiEngineClient``'s get/set_properties methods.
235
235
236
236
237
237
238
238
239
239
240
240
General Comments 0
You need to be logged in to leave comments. Login now