Show More
@@ -0,0 +1,69 b'' | |||
|
1 | """Tests for asyncresult.py""" | |
|
2 | ||
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
10 | #------------------------------------------------------------------------------- | |
|
11 | # Imports | |
|
12 | #------------------------------------------------------------------------------- | |
|
13 | ||
|
14 | ||
|
15 | from IPython.zmq.parallel.error import TimeoutError | |
|
16 | ||
|
17 | from IPython.zmq.parallel.tests import add_engines | |
|
18 | from .clienttest import ClusterTestCase | |
|
19 | ||
|
20 | def setup(): | |
|
21 | add_engines(2) | |
|
22 | ||
|
23 | def wait(n): | |
|
24 | import time | |
|
25 | time.sleep(n) | |
|
26 | return n | |
|
27 | ||
|
28 | class AsyncResultTest(ClusterTestCase): | |
|
29 | ||
|
30 | def test_single_result(self): | |
|
31 | eid = self.client.ids[-1] | |
|
32 | ar = self.client[eid].apply_async(lambda : 42) | |
|
33 | self.assertEquals(ar.get(), 42) | |
|
34 | ar = self.client[[eid]].apply_async(lambda : 42) | |
|
35 | self.assertEquals(ar.get(), [42]) | |
|
36 | ar = self.client[-1:].apply_async(lambda : 42) | |
|
37 | self.assertEquals(ar.get(), [42]) | |
|
38 | ||
|
39 | def test_get_after_done(self): | |
|
40 | ar = self.client[-1].apply_async(lambda : 42) | |
|
41 | self.assertFalse(ar.ready()) | |
|
42 | ar.wait() | |
|
43 | self.assertTrue(ar.ready()) | |
|
44 | self.assertEquals(ar.get(), 42) | |
|
45 | self.assertEquals(ar.get(), 42) | |
|
46 | ||
|
47 | def test_get_before_done(self): | |
|
48 | ar = self.client[-1].apply_async(wait, 0.1) | |
|
49 | self.assertRaises(TimeoutError, ar.get, 0) | |
|
50 | ar.wait(0) | |
|
51 | self.assertFalse(ar.ready()) | |
|
52 | self.assertEquals(ar.get(), 0.1) | |
|
53 | ||
|
54 | def test_get_after_error(self): | |
|
55 | ar = self.client[-1].apply_async(lambda : 1/0) | |
|
56 | ar.wait() | |
|
57 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
|
58 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
|
59 | self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) | |
|
60 | ||
|
61 | def test_get_dict(self): | |
|
62 | n = len(self.client) | |
|
63 | ar = self.client[:].apply_async(lambda : 5) | |
|
64 | self.assertEquals(ar.get(), [5]*n) | |
|
65 | d = ar.get_dict() | |
|
66 | self.assertEquals(sorted(d.keys()), sorted(self.client.ids)) | |
|
67 | for eid,r in d.iteritems(): | |
|
68 | self.assertEquals(r, 5) | |
|
69 |
@@ -0,0 +1,101 b'' | |||
|
1 | """Tests for dependency.py""" | |
|
2 | ||
|
3 | __docformat__ = "restructuredtext en" | |
|
4 | ||
|
5 | #------------------------------------------------------------------------------- | |
|
6 | # Copyright (C) 2011 The IPython Development Team | |
|
7 | # | |
|
8 | # Distributed under the terms of the BSD License. The full license is in | |
|
9 | # the file COPYING, distributed as part of this software. | |
|
10 | #------------------------------------------------------------------------------- | |
|
11 | ||
|
12 | #------------------------------------------------------------------------------- | |
|
13 | # Imports | |
|
14 | #------------------------------------------------------------------------------- | |
|
15 | ||
|
16 | # import | |
|
17 | import os | |
|
18 | ||
|
19 | from IPython.utils.pickleutil import can, uncan | |
|
20 | ||
|
21 | from IPython.zmq.parallel import dependency as dmod | |
|
22 | from IPython.zmq.parallel.util import interactive | |
|
23 | ||
|
24 | from IPython.zmq.parallel.tests import add_engines | |
|
25 | from .clienttest import ClusterTestCase | |
|
26 | ||
|
27 | def setup(): | |
|
28 | add_engines(1) | |
|
29 | ||
|
30 | @dmod.require('time') | |
|
31 | def wait(n): | |
|
32 | time.sleep(n) | |
|
33 | return n | |
|
34 | ||
|
35 | mixed = map(str, range(10)) | |
|
36 | completed = map(str, range(0,10,2)) | |
|
37 | failed = map(str, range(1,10,2)) | |
|
38 | ||
|
39 | class DependencyTest(ClusterTestCase): | |
|
40 | ||
|
41 | def setUp(self): | |
|
42 | ClusterTestCase.setUp(self) | |
|
43 | self.user_ns = {'__builtins__' : __builtins__} | |
|
44 | self.view = self.client.load_balanced_view() | |
|
45 | self.dview = self.client[-1] | |
|
46 | self.succeeded = set(map(str, range(0,25,2))) | |
|
47 | self.failed = set(map(str, range(1,25,2))) | |
|
48 | ||
|
49 | def assertMet(self, dep): | |
|
50 | self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met") | |
|
51 | ||
|
52 | def assertUnmet(self, dep): | |
|
53 | self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met") | |
|
54 | ||
|
55 | def assertUnreachable(self, dep): | |
|
56 | self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable") | |
|
57 | ||
|
58 | def assertReachable(self, dep): | |
|
59 | self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable") | |
|
60 | ||
|
61 | def cancan(self, f): | |
|
62 | """decorator to pass through canning into self.user_ns""" | |
|
63 | return uncan(can(f), self.user_ns) | |
|
64 | ||
|
65 | def test_require_imports(self): | |
|
66 | """test that @require imports names""" | |
|
67 | @self.cancan | |
|
68 | @dmod.require('urllib') | |
|
69 | @interactive | |
|
70 | def encode(dikt): | |
|
71 | return urllib.urlencode(dikt) | |
|
72 | # must pass through canning to properly connect namespaces | |
|
73 | self.assertEquals(encode(dict(a=5)), 'a=5') | |
|
74 | ||
|
75 | def test_success_only(self): | |
|
76 | dep = dmod.Dependency(mixed, success=True, failure=False) | |
|
77 | self.assertUnmet(dep) | |
|
78 | self.assertUnreachable(dep) | |
|
79 | dep.all=False | |
|
80 | self.assertMet(dep) | |
|
81 | self.assertReachable(dep) | |
|
82 | dep = dmod.Dependency(completed, success=True, failure=False) | |
|
83 | self.assertMet(dep) | |
|
84 | self.assertReachable(dep) | |
|
85 | dep.all=False | |
|
86 | self.assertMet(dep) | |
|
87 | self.assertReachable(dep) | |
|
88 | ||
|
89 | def test_failure_only(self): | |
|
90 | dep = dmod.Dependency(mixed, success=False, failure=True) | |
|
91 | self.assertUnmet(dep) | |
|
92 | self.assertUnreachable(dep) | |
|
93 | dep.all=False | |
|
94 | self.assertMet(dep) | |
|
95 | self.assertReachable(dep) | |
|
96 | dep = dmod.Dependency(completed, success=False, failure=True) | |
|
97 | self.assertUnmet(dep) | |
|
98 | self.assertUnreachable(dep) | |
|
99 | dep.all=False | |
|
100 | self.assertUnmet(dep) | |
|
101 | self.assertUnreachable(dep) |
@@ -0,0 +1,287 b'' | |||
|
1 | """test View objects""" | |
|
2 | #------------------------------------------------------------------------------- | |
|
3 | # Copyright (C) 2011 The IPython Development Team | |
|
4 | # | |
|
5 | # Distributed under the terms of the BSD License. The full license is in | |
|
6 | # the file COPYING, distributed as part of this software. | |
|
7 | #------------------------------------------------------------------------------- | |
|
8 | ||
|
9 | #------------------------------------------------------------------------------- | |
|
10 | # Imports | |
|
11 | #------------------------------------------------------------------------------- | |
|
12 | ||
|
13 | import time | |
|
14 | from tempfile import mktemp | |
|
15 | ||
|
16 | import zmq | |
|
17 | ||
|
18 | from IPython.zmq.parallel import client as clientmod | |
|
19 | from IPython.zmq.parallel import error | |
|
20 | from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult, AsyncMapResult | |
|
21 | from IPython.zmq.parallel.view import LoadBalancedView, DirectView | |
|
22 | from IPython.zmq.parallel.util import interactive | |
|
23 | ||
|
24 | from IPython.zmq.parallel.tests import add_engines | |
|
25 | ||
|
26 | from .clienttest import ClusterTestCase, segfault, wait, skip_without | |
|
27 | ||
|
28 | def setup(): | |
|
29 | add_engines(3) | |
|
30 | ||
|
31 | class TestView(ClusterTestCase): | |
|
32 | ||
|
33 | def test_segfault_task(self): | |
|
34 | """test graceful handling of engine death (balanced)""" | |
|
35 | # self.add_engines(1) | |
|
36 | ar = self.client[-1].apply_async(segfault) | |
|
37 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
38 | eid = ar.engine_id | |
|
39 | while eid in self.client.ids: | |
|
40 | time.sleep(.01) | |
|
41 | self.client.spin() | |
|
42 | ||
|
43 | def test_segfault_mux(self): | |
|
44 | """test graceful handling of engine death (direct)""" | |
|
45 | # self.add_engines(1) | |
|
46 | eid = self.client.ids[-1] | |
|
47 | ar = self.client[eid].apply_async(segfault) | |
|
48 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
49 | eid = ar.engine_id | |
|
50 | while eid in self.client.ids: | |
|
51 | time.sleep(.01) | |
|
52 | self.client.spin() | |
|
53 | ||
|
54 | def test_push_pull(self): | |
|
55 | """test pushing and pulling""" | |
|
56 | data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) | |
|
57 | t = self.client.ids[-1] | |
|
58 | v = self.client[t] | |
|
59 | push = v.push | |
|
60 | pull = v.pull | |
|
61 | v.block=True | |
|
62 | nengines = len(self.client) | |
|
63 | push({'data':data}) | |
|
64 | d = pull('data') | |
|
65 | self.assertEquals(d, data) | |
|
66 | self.client[:].push({'data':data}) | |
|
67 | d = self.client[:].pull('data', block=True) | |
|
68 | self.assertEquals(d, nengines*[data]) | |
|
69 | ar = push({'data':data}, block=False) | |
|
70 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
71 | r = ar.get() | |
|
72 | ar = self.client[:].pull('data', block=False) | |
|
73 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
74 | r = ar.get() | |
|
75 | self.assertEquals(r, nengines*[data]) | |
|
76 | self.client[:].push(dict(a=10,b=20)) | |
|
77 | r = self.client[:].pull(('a','b')) | |
|
78 | self.assertEquals(r, nengines*[[10,20]]) | |
|
79 | ||
|
80 | def test_push_pull_function(self): | |
|
81 | "test pushing and pulling functions" | |
|
82 | def testf(x): | |
|
83 | return 2.0*x | |
|
84 | ||
|
85 | t = self.client.ids[-1] | |
|
86 | self.client[t].block=True | |
|
87 | push = self.client[t].push | |
|
88 | pull = self.client[t].pull | |
|
89 | execute = self.client[t].execute | |
|
90 | push({'testf':testf}) | |
|
91 | r = pull('testf') | |
|
92 | self.assertEqual(r(1.0), testf(1.0)) | |
|
93 | execute('r = testf(10)') | |
|
94 | r = pull('r') | |
|
95 | self.assertEquals(r, testf(10)) | |
|
96 | ar = self.client[:].push({'testf':testf}, block=False) | |
|
97 | ar.get() | |
|
98 | ar = self.client[:].pull('testf', block=False) | |
|
99 | rlist = ar.get() | |
|
100 | for r in rlist: | |
|
101 | self.assertEqual(r(1.0), testf(1.0)) | |
|
102 | execute("def g(x): return x*x") | |
|
103 | r = pull(('testf','g')) | |
|
104 | self.assertEquals((r[0](10),r[1](10)), (testf(10), 100)) | |
|
105 | ||
|
106 | def test_push_function_globals(self): | |
|
107 | """test that pushed functions have access to globals""" | |
|
108 | @interactive | |
|
109 | def geta(): | |
|
110 | return a | |
|
111 | # self.add_engines(1) | |
|
112 | v = self.client[-1] | |
|
113 | v.block=True | |
|
114 | v['f'] = geta | |
|
115 | self.assertRaisesRemote(NameError, v.execute, 'b=f()') | |
|
116 | v.execute('a=5') | |
|
117 | v.execute('b=f()') | |
|
118 | self.assertEquals(v['b'], 5) | |
|
119 | ||
|
120 | def test_push_function_defaults(self): | |
|
121 | """test that pushed functions preserve default args""" | |
|
122 | def echo(a=10): | |
|
123 | return a | |
|
124 | v = self.client[-1] | |
|
125 | v.block=True | |
|
126 | v['f'] = echo | |
|
127 | v.execute('b=f()') | |
|
128 | self.assertEquals(v['b'], 10) | |
|
129 | ||
|
130 | def test_get_result(self): | |
|
131 | """test getting results from the Hub.""" | |
|
132 | c = clientmod.Client(profile='iptest') | |
|
133 | # self.add_engines(1) | |
|
134 | t = c.ids[-1] | |
|
135 | v = c[t] | |
|
136 | v2 = self.client[t] | |
|
137 | ar = v.apply_async(wait, 1) | |
|
138 | # give the monitor time to notice the message | |
|
139 | time.sleep(.25) | |
|
140 | ahr = v2.get_result(ar.msg_ids) | |
|
141 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |
|
142 | self.assertEquals(ahr.get(), ar.get()) | |
|
143 | ar2 = v2.get_result(ar.msg_ids) | |
|
144 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |
|
145 | c.spin() | |
|
146 | c.close() | |
|
147 | ||
|
148 | def test_run_newline(self): | |
|
149 | """test that run appends newline to files""" | |
|
150 | tmpfile = mktemp() | |
|
151 | with open(tmpfile, 'w') as f: | |
|
152 | f.write("""def g(): | |
|
153 | return 5 | |
|
154 | """) | |
|
155 | v = self.client[-1] | |
|
156 | v.run(tmpfile, block=True) | |
|
157 | self.assertEquals(v.apply_sync(lambda f: f(), clientmod.Reference('g')), 5) | |
|
158 | ||
|
159 | def test_apply_tracked(self): | |
|
160 | """test tracking for apply""" | |
|
161 | # self.add_engines(1) | |
|
162 | t = self.client.ids[-1] | |
|
163 | v = self.client[t] | |
|
164 | v.block=False | |
|
165 | def echo(n=1024*1024, **kwargs): | |
|
166 | with v.temp_flags(**kwargs): | |
|
167 | return v.apply(lambda x: x, 'x'*n) | |
|
168 | ar = echo(1, track=False) | |
|
169 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
170 | self.assertTrue(ar.sent) | |
|
171 | ar = echo(track=True) | |
|
172 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
173 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
174 | ar._tracker.wait() | |
|
175 | self.assertTrue(ar.sent) | |
|
176 | ||
|
177 | def test_push_tracked(self): | |
|
178 | t = self.client.ids[-1] | |
|
179 | ns = dict(x='x'*1024*1024) | |
|
180 | v = self.client[t] | |
|
181 | ar = v.push(ns, block=False, track=False) | |
|
182 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
183 | self.assertTrue(ar.sent) | |
|
184 | ||
|
185 | ar = v.push(ns, block=False, track=True) | |
|
186 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
187 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
188 | ar._tracker.wait() | |
|
189 | self.assertTrue(ar.sent) | |
|
190 | ar.get() | |
|
191 | ||
|
192 | def test_scatter_tracked(self): | |
|
193 | t = self.client.ids | |
|
194 | x='x'*1024*1024 | |
|
195 | ar = self.client[t].scatter('x', x, block=False, track=False) | |
|
196 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
197 | self.assertTrue(ar.sent) | |
|
198 | ||
|
199 | ar = self.client[t].scatter('x', x, block=False, track=True) | |
|
200 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
201 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
202 | ar._tracker.wait() | |
|
203 | self.assertTrue(ar.sent) | |
|
204 | ar.get() | |
|
205 | ||
|
206 | def test_remote_reference(self): | |
|
207 | v = self.client[-1] | |
|
208 | v['a'] = 123 | |
|
209 | ra = clientmod.Reference('a') | |
|
210 | b = v.apply_sync(lambda x: x, ra) | |
|
211 | self.assertEquals(b, 123) | |
|
212 | ||
|
213 | ||
|
214 | def test_scatter_gather(self): | |
|
215 | view = self.client[:] | |
|
216 | seq1 = range(16) | |
|
217 | view.scatter('a', seq1) | |
|
218 | seq2 = view.gather('a', block=True) | |
|
219 | self.assertEquals(seq2, seq1) | |
|
220 | self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True) | |
|
221 | ||
|
222 | @skip_without('numpy') | |
|
223 | def test_scatter_gather_numpy(self): | |
|
224 | import numpy | |
|
225 | from numpy.testing.utils import assert_array_equal, assert_array_almost_equal | |
|
226 | view = self.client[:] | |
|
227 | a = numpy.arange(64) | |
|
228 | view.scatter('a', a) | |
|
229 | b = view.gather('a', block=True) | |
|
230 | assert_array_equal(b, a) | |
|
231 | ||
|
232 | def test_map(self): | |
|
233 | view = self.client[:] | |
|
234 | def f(x): | |
|
235 | return x**2 | |
|
236 | data = range(16) | |
|
237 | r = view.map_sync(f, data) | |
|
238 | self.assertEquals(r, map(f, data)) | |
|
239 | ||
|
240 | def test_scatterGatherNonblocking(self): | |
|
241 | data = range(16) | |
|
242 | view = self.client[:] | |
|
243 | view.scatter('a', data, block=False) | |
|
244 | ar = view.gather('a', block=False) | |
|
245 | self.assertEquals(ar.get(), data) | |
|
246 | ||
|
247 | @skip_without('numpy') | |
|
248 | def test_scatter_gather_numpy_nonblocking(self): | |
|
249 | import numpy | |
|
250 | from numpy.testing.utils import assert_array_equal, assert_array_almost_equal | |
|
251 | a = numpy.arange(64) | |
|
252 | view = self.client[:] | |
|
253 | ar = view.scatter('a', a, block=False) | |
|
254 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
255 | amr = view.gather('a', block=False) | |
|
256 | self.assertTrue(isinstance(amr, AsyncMapResult)) | |
|
257 | assert_array_equal(amr.get(), a) | |
|
258 | ||
|
259 | def test_execute(self): | |
|
260 | view = self.client[:] | |
|
261 | # self.client.debug=True | |
|
262 | execute = view.execute | |
|
263 | ar = execute('c=30', block=False) | |
|
264 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
265 | ar = execute('d=[0,1,2]', block=False) | |
|
266 | self.client.wait(ar, 1) | |
|
267 | self.assertEquals(len(ar.get()), len(self.client)) | |
|
268 | for c in view['c']: | |
|
269 | self.assertEquals(c, 30) | |
|
270 | ||
|
271 | def test_abort(self): | |
|
272 | view = self.client[-1] | |
|
273 | ar = view.execute('import time; time.sleep(0.25)', block=False) | |
|
274 | ar2 = view.apply_async(lambda : 2) | |
|
275 | ar3 = view.apply_async(lambda : 3) | |
|
276 | view.abort(ar2) | |
|
277 | view.abort(ar3.msg_ids) | |
|
278 | self.assertRaises(error.TaskAborted, ar2.get) | |
|
279 | self.assertRaises(error.TaskAborted, ar3.get) | |
|
280 | ||
|
281 | def test_temp_flags(self): | |
|
282 | view = self.client[-1] | |
|
283 | view.block=True | |
|
284 | with view.temp_flags(block=False): | |
|
285 | self.assertFalse(view.block) | |
|
286 | self.assertTrue(view.block) | |
|
287 |
@@ -0,0 +1,97 b'' | |||
|
1 | """ | |
|
2 | An exceptionally lousy site spider | |
|
3 | Ken Kinder <ken@kenkinder.com> | |
|
4 | ||
|
5 | Updated for newparallel by Min Ragan-Kelley <benjaminrk@gmail.com> | |
|
6 | ||
|
7 | This module gives an example of how the task interface to the | |
|
8 | IPython controller works. Before running this script start the IPython controller | |
|
9 | and some engines using something like:: | |
|
10 | ||
|
11 | ipclusterz start -n 4 | |
|
12 | """ | |
|
13 | import sys | |
|
14 | from IPython.zmq.parallel import client, error | |
|
15 | import time | |
|
16 | import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier | |
|
17 | ||
|
18 | def fetchAndParse(url, data=None): | |
|
19 | import urllib2 | |
|
20 | import urlparse | |
|
21 | import BeautifulSoup | |
|
22 | links = [] | |
|
23 | try: | |
|
24 | page = urllib2.urlopen(url, data=data) | |
|
25 | except Exception: | |
|
26 | return links | |
|
27 | else: | |
|
28 | if page.headers.type == 'text/html': | |
|
29 | doc = BeautifulSoup.BeautifulSoup(page.read()) | |
|
30 | for node in doc.findAll('a'): | |
|
31 | href = node.get('href', None) | |
|
32 | if href: | |
|
33 | links.append(urlparse.urljoin(url, href)) | |
|
34 | return links | |
|
35 | ||
|
36 | class DistributedSpider(object): | |
|
37 | ||
|
38 | # Time to wait between polling for task results. | |
|
39 | pollingDelay = 0.5 | |
|
40 | ||
|
41 | def __init__(self, site): | |
|
42 | self.client = client.Client() | |
|
43 | self.view = self.client.load_balanced_view() | |
|
44 | self.mux = self.client[:] | |
|
45 | ||
|
46 | self.allLinks = [] | |
|
47 | self.linksWorking = {} | |
|
48 | self.linksDone = {} | |
|
49 | ||
|
50 | self.site = site | |
|
51 | ||
|
52 | def visitLink(self, url): | |
|
53 | if url not in self.allLinks: | |
|
54 | self.allLinks.append(url) | |
|
55 | if url.startswith(self.site): | |
|
56 | print ' ', url | |
|
57 | self.linksWorking[url] = self.view.apply(fetchAndParse, url) | |
|
58 | ||
|
59 | def onVisitDone(self, links, url): | |
|
60 | print url, ':' | |
|
61 | self.linksDone[url] = None | |
|
62 | del self.linksWorking[url] | |
|
63 | for link in links: | |
|
64 | self.visitLink(link) | |
|
65 | ||
|
66 | def run(self): | |
|
67 | self.visitLink(self.site) | |
|
68 | while self.linksWorking: | |
|
69 | print len(self.linksWorking), 'pending...' | |
|
70 | self.synchronize() | |
|
71 | time.sleep(self.pollingDelay) | |
|
72 | ||
|
73 | def synchronize(self): | |
|
74 | for url, ar in self.linksWorking.items(): | |
|
75 | # Calling get_task_result with block=False will return None if the | |
|
76 | # task is not done yet. This provides a simple way of polling. | |
|
77 | try: | |
|
78 | links = ar.get(0) | |
|
79 | except error.TimeoutError: | |
|
80 | continue | |
|
81 | except Exception as e: | |
|
82 | self.linksDone[url] = None | |
|
83 | del self.linksWorking[url] | |
|
84 | print url, ':', e.traceback | |
|
85 | else: | |
|
86 | self.onVisitDone(links, url) | |
|
87 | ||
|
88 | def main(): | |
|
89 | if len(sys.argv) > 1: | |
|
90 | site = sys.argv[1] | |
|
91 | else: | |
|
92 | site = raw_input('Enter site to crawl: ') | |
|
93 | distributedSpider = DistributedSpider(site) | |
|
94 | distributedSpider.run() | |
|
95 | ||
|
96 | if __name__ == '__main__': | |
|
97 | main() |
@@ -0,0 +1,19 b'' | |||
|
1 | """ | |
|
2 | A Distributed Hello world | |
|
3 | Ken Kinder <ken@kenkinder.com> | |
|
4 | """ | |
|
5 | from IPython.zmq.parallel import client | |
|
6 | ||
|
7 | rc = client.Client() | |
|
8 | ||
|
9 | def sleep_and_echo(t, msg): | |
|
10 | import time | |
|
11 | time.sleep(t) | |
|
12 | return msg | |
|
13 | ||
|
14 | view = rc.load_balanced_view() | |
|
15 | ||
|
16 | world = view.apply_async(sleep_and_echo, 3, 'World!') | |
|
17 | hello = view.apply_async(sleep_and_echo, 2, 'Hello') | |
|
18 | print "Submitted tasks:", hello.msg_ids, world.msg_ids | |
|
19 | print hello.get(), world.get() |
@@ -15,10 +15,9 b' __docformat__ = "restructuredtext en"' | |||
|
15 | 15 | # Imports |
|
16 | 16 | #------------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 | from types import FunctionType | |
|
19 | 18 | import copy |
|
20 | ||
|
21 | from IPython.zmq.parallel.dependency import dependent | |
|
19 | import sys | |
|
20 | from types import FunctionType | |
|
22 | 21 | |
|
23 | 22 | import codeutil |
|
24 | 23 | |
@@ -67,12 +66,22 b' class CannedFunction(CannedObject):' | |||
|
67 | 66 | self._checkType(f) |
|
68 | 67 | self.code = f.func_code |
|
69 | 68 | self.defaults = f.func_defaults |
|
69 | self.module = f.__module__ or '__main__' | |
|
70 | 70 | self.__name__ = f.__name__ |
|
71 | 71 | |
|
72 | 72 | def _checkType(self, obj): |
|
73 | 73 | assert isinstance(obj, FunctionType), "Not a function type" |
|
74 | 74 | |
|
75 | 75 | def getObject(self, g=None): |
|
76 | # try to load function back into its module: | |
|
77 | if not self.module.startswith('__'): | |
|
78 | try: | |
|
79 | __import__(self.module) | |
|
80 | except ImportError: | |
|
81 | pass | |
|
82 | else: | |
|
83 | g = sys.modules[self.module].__dict__ | |
|
84 | ||
|
76 | 85 | if g is None: |
|
77 | 86 | g = globals() |
|
78 | 87 | newFunc = FunctionType(self.code, g, self.__name__, self.defaults) |
@@ -82,8 +91,9 b' class CannedFunction(CannedObject):' | |||
|
82 | 91 | # Functions |
|
83 | 92 | #------------------------------------------------------------------------------- |
|
84 | 93 | |
|
85 | ||
|
86 | 94 | def can(obj): |
|
95 | # import here to prevent module-level circular imports | |
|
96 | from IPython.zmq.parallel.dependency import dependent | |
|
87 | 97 | if isinstance(obj, dependent): |
|
88 | 98 | keys = ('f','df') |
|
89 | 99 | return CannedObject(obj, keys=keys) |
@@ -12,6 +12,8 b'' | |||
|
12 | 12 | |
|
13 | 13 | import time |
|
14 | 14 | |
|
15 | from zmq import MessageTracker | |
|
16 | ||
|
15 | 17 | from IPython.external.decorator import decorator |
|
16 | 18 | from . import error |
|
17 | 19 | |
@@ -19,6 +21,9 b' from . import error' | |||
|
19 | 21 | # Classes |
|
20 | 22 | #----------------------------------------------------------------------------- |
|
21 | 23 | |
|
24 | # global empty tracker that's always done: | |
|
25 | finished_tracker = MessageTracker() | |
|
26 | ||
|
22 | 27 | @decorator |
|
23 | 28 | def check_ready(f, self, *args, **kwargs): |
|
24 | 29 | """Call spin() to sync state prior to calling the method.""" |
@@ -36,18 +41,26 b' class AsyncResult(object):' | |||
|
36 | 41 | msg_ids = None |
|
37 | 42 | _targets = None |
|
38 | 43 | _tracker = None |
|
44 | _single_result = False | |
|
39 | 45 | |
|
40 | 46 | def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None): |
|
41 | self._client = client | |
|
42 | 47 | if isinstance(msg_ids, basestring): |
|
48 | # always a list | |
|
43 | 49 | msg_ids = [msg_ids] |
|
50 | if tracker is None: | |
|
51 | # default to always done | |
|
52 | tracker = finished_tracker | |
|
53 | self._client = client | |
|
44 | 54 | self.msg_ids = msg_ids |
|
45 | 55 | self._fname=fname |
|
46 | 56 | self._targets = targets |
|
47 | 57 | self._tracker = tracker |
|
48 | 58 | self._ready = False |
|
49 | 59 | self._success = None |
|
50 |
|
|
|
60 | if len(msg_ids) == 1: | |
|
61 | self._single_result = not isinstance(targets, (list, tuple)) | |
|
62 | else: | |
|
63 | self._single_result = False | |
|
51 | 64 | |
|
52 | 65 | def __repr__(self): |
|
53 | 66 | if self._ready: |
@@ -99,7 +112,7 b' class AsyncResult(object):' | |||
|
99 | 112 | """ |
|
100 | 113 | if self._ready: |
|
101 | 114 | return |
|
102 |
self._ready = self._client. |
|
|
115 | self._ready = self._client.wait(self.msg_ids, timeout) | |
|
103 | 116 | if self._ready: |
|
104 | 117 | try: |
|
105 | 118 | results = map(self._client.results.get, self.msg_ids) |
@@ -149,10 +162,9 b' class AsyncResult(object):' | |||
|
149 | 162 | return dict(zip(engine_ids,results)) |
|
150 | 163 | |
|
151 | 164 | @property |
|
152 | @check_ready | |
|
153 | 165 | def result(self): |
|
154 | 166 | """result property wrapper for `get(timeout=0)`.""" |
|
155 |
return self. |
|
|
167 | return self.get() | |
|
156 | 168 | |
|
157 | 169 | # abbreviated alias: |
|
158 | 170 | r = result |
@@ -169,7 +181,7 b' class AsyncResult(object):' | |||
|
169 | 181 | @property |
|
170 | 182 | def result_dict(self): |
|
171 | 183 | """result property as a dict.""" |
|
172 |
return self.get_dict( |
|
|
184 | return self.get_dict() | |
|
173 | 185 | |
|
174 | 186 | def __dict__(self): |
|
175 | 187 | return self.get_dict(0) |
@@ -181,11 +193,17 b' class AsyncResult(object):' | |||
|
181 | 193 | |
|
182 | 194 | @property |
|
183 | 195 | def sent(self): |
|
184 | """check whether my messages have been sent""" | |
|
185 |
|
|
|
186 | return True | |
|
187 | else: | |
|
188 | return self._tracker.done | |
|
196 | """check whether my messages have been sent.""" | |
|
197 | return self._tracker.done | |
|
198 | ||
|
199 | def wait_for_send(self, timeout=-1): | |
|
200 | """wait for pyzmq send to complete. | |
|
201 | ||
|
202 | This is necessary when sending arrays that you intend to edit in-place. | |
|
203 | `timeout` is in seconds, and will raise TimeoutError if it is reached | |
|
204 | before the send completes. | |
|
205 | """ | |
|
206 | return self._tracker.wait(timeout) | |
|
189 | 207 | |
|
190 | 208 | #------------------------------------- |
|
191 | 209 | # dict-access |
@@ -285,7 +303,7 b' class AsyncHubResult(AsyncResult):' | |||
|
285 | 303 | if self._ready: |
|
286 | 304 | return |
|
287 | 305 | local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) |
|
288 |
local_ready = self._client. |
|
|
306 | local_ready = self._client.wait(local_ids, timeout) | |
|
289 | 307 | if local_ready: |
|
290 | 308 | remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) |
|
291 | 309 | if not remote_ids: |
This diff has been collapsed as it changes many lines, (638 lines changed) Show them Hide them | |||
@@ -1,4 +1,4 b'' | |||
|
1 |
"""A semi-synchronous Client for the ZMQ c |
|
|
1 | """A semi-synchronous Client for the ZMQ cluster""" | |
|
2 | 2 | #----------------------------------------------------------------------------- |
|
3 | 3 | # Copyright (C) 2010 The IPython Development Team |
|
4 | 4 | # |
@@ -31,57 +31,26 b' from IPython.external.decorator import decorator' | |||
|
31 | 31 | from IPython.external.ssh import tunnel |
|
32 | 32 | |
|
33 | 33 | from . import error |
|
34 | from . import map as Map | |
|
35 | 34 | from . import util |
|
36 | 35 | from . import streamsession as ss |
|
37 | 36 | from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult |
|
38 | 37 | from .clusterdir import ClusterDir, ClusterDirError |
|
39 | 38 | from .dependency import Dependency, depend, require, dependent |
|
40 | 39 | from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction |
|
41 | from .util import ReverseDict, validate_url, disambiguate_url | |
|
42 | 40 | from .view import DirectView, LoadBalancedView |
|
43 | 41 | |
|
44 | 42 | #-------------------------------------------------------------------------- |
|
45 | # helpers for implementing old MEC API via client.apply | |
|
46 | #-------------------------------------------------------------------------- | |
|
47 | ||
|
48 | def _push(user_ns, **ns): | |
|
49 | """helper method for implementing `client.push` via `client.apply`""" | |
|
50 | user_ns.update(ns) | |
|
51 | ||
|
52 | def _pull(user_ns, keys): | |
|
53 | """helper method for implementing `client.pull` via `client.apply`""" | |
|
54 | if isinstance(keys, (list,tuple, set)): | |
|
55 | for key in keys: | |
|
56 | if not user_ns.has_key(key): | |
|
57 | raise NameError("name '%s' is not defined"%key) | |
|
58 | return map(user_ns.get, keys) | |
|
59 | else: | |
|
60 | if not user_ns.has_key(keys): | |
|
61 | raise NameError("name '%s' is not defined"%keys) | |
|
62 | return user_ns.get(keys) | |
|
63 | ||
|
64 | def _clear(user_ns): | |
|
65 | """helper method for implementing `client.clear` via `client.apply`""" | |
|
66 | user_ns.clear() | |
|
67 | ||
|
68 | def _execute(user_ns, code): | |
|
69 | """helper method for implementing `client.execute` via `client.apply`""" | |
|
70 | exec code in user_ns | |
|
71 | ||
|
72 | ||
|
73 | #-------------------------------------------------------------------------- | |
|
74 | 43 | # Decorators for Client methods |
|
75 | 44 | #-------------------------------------------------------------------------- |
|
76 | 45 | |
|
77 | 46 | @decorator |
|
78 | def spinfirst(f, self, *args, **kwargs): | |
|
47 | def spin_first(f, self, *args, **kwargs): | |
|
79 | 48 | """Call spin() to sync state prior to calling the method.""" |
|
80 | 49 | self.spin() |
|
81 | 50 | return f(self, *args, **kwargs) |
|
82 | 51 | |
|
83 | 52 | @decorator |
|
84 | def defaultblock(f, self, *args, **kwargs): | |
|
53 | def default_block(f, self, *args, **kwargs): | |
|
85 | 54 | """Default to self.block; preserve self.block.""" |
|
86 | 55 | block = kwargs.get('block',None) |
|
87 | 56 | block = self.block if block is None else block |
@@ -151,7 +120,7 b' class Metadata(dict):' | |||
|
151 | 120 | |
|
152 | 121 | |
|
153 | 122 | class Client(HasTraits): |
|
154 |
"""A semi-synchronous client to the IPython ZMQ c |
|
|
123 | """A semi-synchronous client to the IPython ZMQ cluster | |
|
155 | 124 | |
|
156 | 125 | Parameters |
|
157 | 126 | ---------- |
@@ -193,11 +162,11 b' class Client(HasTraits):' | |||
|
193 | 162 | flag for whether to use paramiko instead of shell ssh for tunneling. |
|
194 | 163 | [default: True on win32, False else] |
|
195 | 164 | |
|
196 |
|
|
|
197 |
|
|
|
198 |
|
|
|
199 |
|
|
|
200 |
|
|
|
165 | ------- exec authentication args ------- | |
|
166 | If even localhost is untrusted, you can have some protection against | |
|
167 | unauthorized execution by using a key. Messages are still sent | |
|
168 | as cleartext, so if someone can snoop your loopback traffic this will | |
|
169 | not help against malicious attacks. | |
|
201 | 170 | |
|
202 | 171 | exec_key : str |
|
203 | 172 | an authentication key or file containing a key |
@@ -207,7 +176,7 b' class Client(HasTraits):' | |||
|
207 | 176 | Attributes |
|
208 | 177 | ---------- |
|
209 | 178 | |
|
210 |
ids : s |
|
|
179 | ids : list of int engine IDs | |
|
211 | 180 | requesting the ids attribute always synchronizes |
|
212 | 181 | the registration state. To request ids without synchronization, |
|
213 | 182 | use semi-private _ids attributes. |
@@ -234,15 +203,18 b' class Client(HasTraits):' | |||
|
234 | 203 | flushes incoming results and registration state changes |
|
235 | 204 | control methods spin, and requesting `ids` also ensures up to date |
|
236 | 205 | |
|
237 |
|
|
|
206 | wait | |
|
238 | 207 | wait on one or more msg_ids |
|
239 | 208 | |
|
240 | 209 | execution methods |
|
241 | 210 | apply |
|
242 | 211 | legacy: execute, run |
|
243 | 212 | |
|
213 | data movement | |
|
214 | push, pull, scatter, gather | |
|
215 | ||
|
244 | 216 | query methods |
|
245 | queue_status, get_result, purge | |
|
217 | queue_status, get_result, purge, result_status | |
|
246 | 218 | |
|
247 | 219 | control methods |
|
248 | 220 | abort, shutdown |
@@ -264,23 +236,25 b' class Client(HasTraits):' | |||
|
264 | 236 | _ssh=Bool(False) |
|
265 | 237 | _context = Instance('zmq.Context') |
|
266 | 238 | _config = Dict() |
|
267 | _engines=Instance(ReverseDict, (), {}) | |
|
239 | _engines=Instance(util.ReverseDict, (), {}) | |
|
268 | 240 | # _hub_socket=Instance('zmq.Socket') |
|
269 | 241 | _query_socket=Instance('zmq.Socket') |
|
270 | 242 | _control_socket=Instance('zmq.Socket') |
|
271 | 243 | _iopub_socket=Instance('zmq.Socket') |
|
272 | 244 | _notification_socket=Instance('zmq.Socket') |
|
273 |
_ |
|
|
274 | _mux_ident=Str() | |
|
275 | _task_ident=Str() | |
|
245 | _mux_socket=Instance('zmq.Socket') | |
|
246 | _task_socket=Instance('zmq.Socket') | |
|
276 | 247 | _task_scheme=Str() |
|
277 | 248 | _balanced_views=Dict() |
|
278 | 249 | _direct_views=Dict() |
|
279 | 250 | _closed = False |
|
251 | _ignored_control_replies=Int(0) | |
|
252 | _ignored_hub_replies=Int(0) | |
|
280 | 253 | |
|
281 | 254 | def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None, |
|
282 | 255 | context=None, username=None, debug=False, exec_key=None, |
|
283 | 256 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
257 | timeout=10 | |
|
284 | 258 | ): |
|
285 | 259 | super(Client, self).__init__(debug=debug, profile=profile) |
|
286 | 260 | if context is None: |
@@ -292,11 +266,11 b' class Client(HasTraits):' | |||
|
292 | 266 | if self._cd is not None: |
|
293 | 267 | if url_or_file is None: |
|
294 | 268 | url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') |
|
295 |
assert url_or_file is not None, "I can't find enough information to connect to a |
|
|
269 | assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ | |
|
296 | 270 | " Please specify at least one of url_or_file or profile." |
|
297 | 271 | |
|
298 | 272 | try: |
|
299 | validate_url(url_or_file) | |
|
273 | util.validate_url(url_or_file) | |
|
300 | 274 | except AssertionError: |
|
301 | 275 | if not os.path.exists(url_or_file): |
|
302 | 276 | if self._cd: |
@@ -316,7 +290,7 b' class Client(HasTraits):' | |||
|
316 | 290 | sshserver=cfg['ssh'] |
|
317 | 291 | url = cfg['url'] |
|
318 | 292 | location = cfg.setdefault('location', None) |
|
319 | cfg['url'] = disambiguate_url(cfg['url'], location) | |
|
293 | cfg['url'] = util.disambiguate_url(cfg['url'], location) | |
|
320 | 294 | url = cfg['url'] |
|
321 | 295 | |
|
322 | 296 | self._config = cfg |
@@ -351,10 +325,11 b' class Client(HasTraits):' | |||
|
351 | 325 | |
|
352 | 326 | self._notification_handlers = {'registration_notification' : self._register_engine, |
|
353 | 327 | 'unregistration_notification' : self._unregister_engine, |
|
328 | 'shutdown_notification' : lambda msg: self.close(), | |
|
354 | 329 | } |
|
355 | 330 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, |
|
356 | 331 | 'apply_reply' : self._handle_apply_reply} |
|
357 | self._connect(sshserver, ssh_kwargs) | |
|
332 | self._connect(sshserver, ssh_kwargs, timeout) | |
|
358 | 333 | |
|
359 | 334 | def __del__(self): |
|
360 | 335 | """cleanup sockets, but _not_ context.""" |
@@ -378,22 +353,6 b' class Client(HasTraits):' | |||
|
378 | 353 | pass |
|
379 | 354 | self._cd = None |
|
380 | 355 | |
|
381 | @property | |
|
382 | def ids(self): | |
|
383 | """Always up-to-date ids property.""" | |
|
384 | self._flush_notifications() | |
|
385 | # always copy: | |
|
386 | return list(self._ids) | |
|
387 | ||
|
388 | def close(self): | |
|
389 | if self._closed: | |
|
390 | return | |
|
391 | snames = filter(lambda n: n.endswith('socket'), dir(self)) | |
|
392 | for socket in map(lambda name: getattr(self, name), snames): | |
|
393 | if isinstance(socket, zmq.Socket) and not socket.closed: | |
|
394 | socket.close() | |
|
395 | self._closed = True | |
|
396 | ||
|
397 | 356 | def _update_engines(self, engines): |
|
398 | 357 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
399 | 358 | for k,v in engines.iteritems(): |
@@ -402,16 +361,15 b' class Client(HasTraits):' | |||
|
402 | 361 | self._ids.append(eid) |
|
403 | 362 | self._ids = sorted(self._ids) |
|
404 | 363 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
405 |
self._task_scheme == 'pure' and self._task_ |
|
|
364 | self._task_scheme == 'pure' and self._task_socket: | |
|
406 | 365 | self._stop_scheduling_tasks() |
|
407 | 366 | |
|
408 | 367 | def _stop_scheduling_tasks(self): |
|
409 | 368 | """Stop scheduling tasks because an engine has been unregistered |
|
410 | 369 | from a pure ZMQ scheduler. |
|
411 | 370 | """ |
|
412 |
self._task_ |
|
|
413 |
|
|
|
414 | # self._task_socket = None | |
|
371 | self._task_socket.close() | |
|
372 | self._task_socket = None | |
|
415 | 373 | msg = "An engine has been unregistered, and we are using pure " +\ |
|
416 | 374 | "ZMQ task scheduling. Task farming will be disabled." |
|
417 | 375 | if self.outstanding: |
@@ -434,8 +392,8 b' class Client(HasTraits):' | |||
|
434 | 392 | targets = [targets] |
|
435 | 393 | return [self._engines[t] for t in targets], list(targets) |
|
436 | 394 | |
|
437 | def _connect(self, sshserver, ssh_kwargs): | |
|
438 |
"""setup all our socket connections to the c |
|
|
395 | def _connect(self, sshserver, ssh_kwargs, timeout): | |
|
396 | """setup all our socket connections to the cluster. This is called from | |
|
439 | 397 | __init__.""" |
|
440 | 398 | |
|
441 | 399 | # Maybe allow reconnecting? |
@@ -444,13 +402,16 b' class Client(HasTraits):' | |||
|
444 | 402 | self._connected=True |
|
445 | 403 | |
|
446 | 404 | def connect_socket(s, url): |
|
447 | url = disambiguate_url(url, self._config['location']) | |
|
405 | url = util.disambiguate_url(url, self._config['location']) | |
|
448 | 406 | if self._ssh: |
|
449 | 407 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) |
|
450 | 408 | else: |
|
451 | 409 | return s.connect(url) |
|
452 | 410 | |
|
453 | 411 | self.session.send(self._query_socket, 'connection_request') |
|
412 | r,w,x = zmq.select([self._query_socket],[],[], timeout) | |
|
413 | if not r: | |
|
414 | raise error.TimeoutError("Hub connection request timed out") | |
|
454 | 415 | idents,msg = self.session.recv(self._query_socket,mode=0) |
|
455 | 416 | if self.debug: |
|
456 | 417 | pprint(msg) |
@@ -458,18 +419,15 b' class Client(HasTraits):' | |||
|
458 | 419 | content = msg.content |
|
459 | 420 | self._config['registration'] = dict(content) |
|
460 | 421 | if content.status == 'ok': |
|
461 | self._apply_socket = self._context.socket(zmq.XREP) | |
|
462 | self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
463 | 422 | if content.mux: |
|
464 |
|
|
|
465 | self._mux_ident = 'mux' | |
|
466 |
connect_socket(self._ |
|
|
423 | self._mux_socket = self._context.socket(zmq.XREQ) | |
|
424 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
425 | connect_socket(self._mux_socket, content.mux) | |
|
467 | 426 | if content.task: |
|
468 | 427 | self._task_scheme, task_addr = content.task |
|
469 |
|
|
|
470 |
|
|
|
471 |
connect_socket(self._ |
|
|
472 | self._task_ident = 'task' | |
|
428 | self._task_socket = self._context.socket(zmq.XREQ) | |
|
429 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
430 | connect_socket(self._task_socket, task_addr) | |
|
473 | 431 | if content.notification: |
|
474 | 432 | self._notification_socket = self._context.socket(zmq.SUB) |
|
475 | 433 | connect_socket(self._notification_socket, content.notification) |
@@ -488,8 +446,6 b' class Client(HasTraits):' | |||
|
488 | 446 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
489 | 447 | connect_socket(self._iopub_socket, content.iopub) |
|
490 | 448 | self._update_engines(dict(content.engines)) |
|
491 | # give XREP apply_socket some time to connect | |
|
492 | time.sleep(0.25) | |
|
493 | 449 | else: |
|
494 | 450 | self._connected = False |
|
495 | 451 | raise Exception("Failed to connect!") |
@@ -499,7 +455,7 b' class Client(HasTraits):' | |||
|
499 | 455 | #-------------------------------------------------------------------------- |
|
500 | 456 | |
|
501 | 457 | def _unwrap_exception(self, content): |
|
502 | """unwrap exception, and remap engineid to int.""" | |
|
458 | """unwrap exception, and remap engine_id to int.""" | |
|
503 | 459 | e = error.unwrap_exception(content) |
|
504 | 460 | # print e.traceback |
|
505 | 461 | if e.engine_info: |
@@ -545,7 +501,7 b' class Client(HasTraits):' | |||
|
545 | 501 | |
|
546 | 502 | self._handle_stranded_msgs(eid, uuid) |
|
547 | 503 | |
|
548 |
if self._task_ |
|
|
504 | if self._task_socket and self._task_scheme == 'pure': | |
|
549 | 505 | self._stop_scheduling_tasks() |
|
550 | 506 | |
|
551 | 507 | def _handle_stranded_msgs(self, eid, uuid): |
@@ -622,7 +578,7 b' class Client(HasTraits):' | |||
|
622 | 578 | if content['status'] == 'ok': |
|
623 | 579 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] |
|
624 | 580 | elif content['status'] == 'aborted': |
|
625 |
self.results[msg_id] = error.Aborted |
|
|
581 | self.results[msg_id] = error.TaskAborted(msg_id) | |
|
626 | 582 | elif content['status'] == 'resubmitted': |
|
627 | 583 | # TODO: handle resubmission |
|
628 | 584 | pass |
@@ -665,12 +621,26 b' class Client(HasTraits):' | |||
|
665 | 621 | in the ZMQ queue. |
|
666 | 622 | |
|
667 | 623 | Currently: ignore them.""" |
|
624 | if self._ignored_control_replies <= 0: | |
|
625 | return | |
|
668 | 626 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
669 | 627 | while msg is not None: |
|
628 | self._ignored_control_replies -= 1 | |
|
670 | 629 | if self.debug: |
|
671 | 630 | pprint(msg) |
|
672 | 631 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
673 | 632 | |
|
633 | def _flush_ignored_control(self): | |
|
634 | """flush ignored control replies""" | |
|
635 | while self._ignored_control_replies > 0: | |
|
636 | self.session.recv(self._control_socket) | |
|
637 | self._ignored_control_replies -= 1 | |
|
638 | ||
|
639 | def _flush_ignored_hub_replies(self): | |
|
640 | msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | |
|
641 | while msg is not None: | |
|
642 | msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | |
|
643 | ||
|
674 | 644 | def _flush_iopub(self, sock): |
|
675 | 645 | """Flush replies from the iopub channel waiting |
|
676 | 646 | in the ZMQ queue. |
@@ -718,26 +688,46 b' class Client(HasTraits):' | |||
|
718 | 688 | if not isinstance(key, (int, slice, tuple, list, xrange)): |
|
719 | 689 | raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) |
|
720 | 690 | else: |
|
721 | return self.view(key, balanced=False) | |
|
691 | return self._get_view(key, balanced=False) | |
|
722 | 692 | |
|
723 | 693 | #-------------------------------------------------------------------------- |
|
724 | 694 | # Begin public methods |
|
725 | 695 | #-------------------------------------------------------------------------- |
|
726 | 696 | |
|
697 | @property | |
|
698 | def ids(self): | |
|
699 | """Always up-to-date ids property.""" | |
|
700 | self._flush_notifications() | |
|
701 | # always copy: | |
|
702 | return list(self._ids) | |
|
703 | ||
|
704 | def close(self): | |
|
705 | if self._closed: | |
|
706 | return | |
|
707 | snames = filter(lambda n: n.endswith('socket'), dir(self)) | |
|
708 | for socket in map(lambda name: getattr(self, name), snames): | |
|
709 | if isinstance(socket, zmq.Socket) and not socket.closed: | |
|
710 | socket.close() | |
|
711 | self._closed = True | |
|
712 | ||
|
727 | 713 | def spin(self): |
|
728 | 714 | """Flush any registration notifications and execution results |
|
729 | 715 | waiting in the ZMQ queue. |
|
730 | 716 | """ |
|
731 | 717 | if self._notification_socket: |
|
732 | 718 | self._flush_notifications() |
|
733 |
if self._ |
|
|
734 |
self._flush_results(self._ |
|
|
719 | if self._mux_socket: | |
|
720 | self._flush_results(self._mux_socket) | |
|
721 | if self._task_socket: | |
|
722 | self._flush_results(self._task_socket) | |
|
735 | 723 | if self._control_socket: |
|
736 | 724 | self._flush_control(self._control_socket) |
|
737 | 725 | if self._iopub_socket: |
|
738 | 726 | self._flush_iopub(self._iopub_socket) |
|
727 | if self._query_socket: | |
|
728 | self._flush_ignored_hub_replies() | |
|
739 | 729 | |
|
740 |
def |
|
|
730 | def wait(self, jobs=None, timeout=-1): | |
|
741 | 731 | """waits on one or more `jobs`, for up to `timeout` seconds. |
|
742 | 732 | |
|
743 | 733 | Parameters |
@@ -786,8 +776,8 b' class Client(HasTraits):' | |||
|
786 | 776 | # Control methods |
|
787 | 777 | #-------------------------------------------------------------------------- |
|
788 | 778 | |
|
789 | @spinfirst | |
|
790 | @defaultblock | |
|
779 | @spin_first | |
|
780 | @default_block | |
|
791 | 781 | def clear(self, targets=None, block=None): |
|
792 | 782 | """Clear the namespace in target(s).""" |
|
793 | 783 | targets = self._build_targets(targets)[0] |
@@ -795,18 +785,21 b' class Client(HasTraits):' | |||
|
795 | 785 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) |
|
796 | 786 | error = False |
|
797 | 787 | if self.block: |
|
788 | self._flush_ignored_control() | |
|
798 | 789 | for i in range(len(targets)): |
|
799 | 790 | idents,msg = self.session.recv(self._control_socket,0) |
|
800 | 791 | if self.debug: |
|
801 | 792 | pprint(msg) |
|
802 | 793 | if msg['content']['status'] != 'ok': |
|
803 | 794 | error = self._unwrap_exception(msg['content']) |
|
795 | else: | |
|
796 | self._ignored_control_replies += len(targets) | |
|
804 | 797 | if error: |
|
805 | 798 | raise error |
|
806 | 799 | |
|
807 | 800 | |
|
808 | @spinfirst | |
|
809 | @defaultblock | |
|
801 | @spin_first | |
|
802 | @default_block | |
|
810 | 803 | def abort(self, jobs=None, targets=None, block=None): |
|
811 | 804 | """Abort specific jobs from the execution queues of target(s). |
|
812 | 805 | |
@@ -839,35 +832,41 b' class Client(HasTraits):' | |||
|
839 | 832 | content=content, ident=t) |
|
840 | 833 | error = False |
|
841 | 834 | if self.block: |
|
835 | self._flush_ignored_control() | |
|
842 | 836 | for i in range(len(targets)): |
|
843 | 837 | idents,msg = self.session.recv(self._control_socket,0) |
|
844 | 838 | if self.debug: |
|
845 | 839 | pprint(msg) |
|
846 | 840 | if msg['content']['status'] != 'ok': |
|
847 | 841 | error = self._unwrap_exception(msg['content']) |
|
842 | else: | |
|
843 | self._ignored_control_replies += len(targets) | |
|
848 | 844 | if error: |
|
849 | 845 | raise error |
|
850 | 846 | |
|
851 | @spinfirst | |
|
852 | @defaultblock | |
|
853 |
def shutdown(self, targets=None, restart=False, |
|
|
854 |
"""Terminates one or more engine processes, optionally including the |
|
|
855 |
if |
|
|
847 | @spin_first | |
|
848 | @default_block | |
|
849 | def shutdown(self, targets=None, restart=False, hub=False, block=None): | |
|
850 | """Terminates one or more engine processes, optionally including the hub.""" | |
|
851 | if hub: | |
|
856 | 852 | targets = 'all' |
|
857 | 853 | targets = self._build_targets(targets)[0] |
|
858 | 854 | for t in targets: |
|
859 | 855 | self.session.send(self._control_socket, 'shutdown_request', |
|
860 | 856 | content={'restart':restart},ident=t) |
|
861 | 857 | error = False |
|
862 |
if block or |
|
|
858 | if block or hub: | |
|
859 | self._flush_ignored_control() | |
|
863 | 860 | for i in range(len(targets)): |
|
864 | idents,msg = self.session.recv(self._control_socket,0) | |
|
861 | idents,msg = self.session.recv(self._control_socket, 0) | |
|
865 | 862 | if self.debug: |
|
866 | 863 | pprint(msg) |
|
867 | 864 | if msg['content']['status'] != 'ok': |
|
868 | 865 | error = self._unwrap_exception(msg['content']) |
|
866 | else: | |
|
867 | self._ignored_control_replies += len(targets) | |
|
869 | 868 | |
|
870 |
if |
|
|
869 | if hub: | |
|
871 | 870 | time.sleep(0.25) |
|
872 | 871 | self.session.send(self._query_socket, 'shutdown_request') |
|
873 | 872 | idents,msg = self.session.recv(self._query_socket, 0) |
@@ -883,8 +882,8 b' class Client(HasTraits):' | |||
|
883 | 882 | # Execution methods |
|
884 | 883 | #-------------------------------------------------------------------------- |
|
885 | 884 | |
|
886 | @defaultblock | |
|
887 | def execute(self, code, targets='all', block=None): | |
|
885 | @default_block | |
|
886 | def _execute(self, code, targets='all', block=None): | |
|
888 | 887 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
889 | 888 | |
|
890 | 889 | ``execute`` is always `bound` (affects engine namespace) |
@@ -901,33 +900,7 b' class Client(HasTraits):' | |||
|
901 | 900 | whether or not to wait until done to return |
|
902 | 901 | default: self.block |
|
903 | 902 | """ |
|
904 | result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) | |
|
905 | if not block: | |
|
906 | return result | |
|
907 | ||
|
908 | def run(self, filename, targets='all', block=None): | |
|
909 | """Execute contents of `filename` on engine(s). | |
|
910 | ||
|
911 | This simply reads the contents of the file and calls `execute`. | |
|
912 | ||
|
913 | Parameters | |
|
914 | ---------- | |
|
915 | ||
|
916 | filename : str | |
|
917 | The path to the file | |
|
918 | targets : int/str/list of ints/strs | |
|
919 | the engines on which to execute | |
|
920 | default : all | |
|
921 | block : bool | |
|
922 | whether or not to wait until done | |
|
923 | default: self.block | |
|
924 | ||
|
925 | """ | |
|
926 | with open(filename, 'r') as f: | |
|
927 | # add newline in case of trailing indented whitespace | |
|
928 | # which will cause SyntaxError | |
|
929 | code = f.read()+'\n' | |
|
930 | return self.execute(code, targets=targets, block=block) | |
|
903 | return self[targets].execute(code, block=block) | |
|
931 | 904 | |
|
932 | 905 | def _maybe_raise(self, result): |
|
933 | 906 | """wrapper for maybe raising an exception if apply failed.""" |
@@ -936,287 +909,113 b' class Client(HasTraits):' | |||
|
936 | 909 | |
|
937 | 910 | return result |
|
938 | 911 | |
|
939 | def _build_dependency(self, dep): | |
|
940 | """helper for building jsonable dependencies from various input forms""" | |
|
941 | if isinstance(dep, Dependency): | |
|
942 | return dep.as_dict() | |
|
943 | elif isinstance(dep, AsyncResult): | |
|
944 | return dep.msg_ids | |
|
945 | elif dep is None: | |
|
946 | return [] | |
|
947 | else: | |
|
948 | # pass to Dependency constructor | |
|
949 | return list(Dependency(dep)) | |
|
950 | ||
|
951 | @defaultblock | |
|
952 | def apply(self, f, args=None, kwargs=None, bound=False, block=None, | |
|
953 | targets=None, balanced=None, | |
|
954 | after=None, follow=None, timeout=None, | |
|
955 | track=False): | |
|
956 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | |
|
912 | def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | |
|
913 | ident=None): | |
|
914 | """construct and send an apply message via a socket. | |
|
957 | 915 | |
|
958 | This is the central execution command for the client. | |
|
959 | ||
|
960 | Parameters | |
|
961 | ---------- | |
|
962 | ||
|
963 | f : function | |
|
964 | The fuction to be called remotely | |
|
965 | args : tuple/list | |
|
966 | The positional arguments passed to `f` | |
|
967 | kwargs : dict | |
|
968 | The keyword arguments passed to `f` | |
|
969 | bound : bool (default: False) | |
|
970 | Whether to pass the Engine(s) Namespace as the first argument to `f`. | |
|
971 | block : bool (default: self.block) | |
|
972 | Whether to wait for the result, or return immediately. | |
|
973 | False: | |
|
974 | returns AsyncResult | |
|
975 | True: | |
|
976 | returns actual result(s) of f(*args, **kwargs) | |
|
977 | if multiple targets: | |
|
978 | list of results, matching `targets` | |
|
979 | track : bool | |
|
980 | whether to track non-copying sends. | |
|
981 | [default False] | |
|
982 | ||
|
983 | targets : int,list of ints, 'all', None | |
|
984 | Specify the destination of the job. | |
|
985 | if None: | |
|
986 | Submit via Task queue for load-balancing. | |
|
987 | if 'all': | |
|
988 | Run on all active engines | |
|
989 | if list: | |
|
990 | Run on each specified engine | |
|
991 | if int: | |
|
992 | Run on single engine | |
|
993 | Note: | |
|
994 | that if `balanced=True`, and `targets` is specified, | |
|
995 | then the load-balancing will be limited to balancing | |
|
996 | among `targets`. | |
|
997 | ||
|
998 | balanced : bool, default None | |
|
999 | whether to load-balance. This will default to True | |
|
1000 | if targets is unspecified, or False if targets is specified. | |
|
1001 | ||
|
1002 | If `balanced` and `targets` are both specified, the task will | |
|
1003 | be assigne to *one* of the targets by the scheduler. | |
|
1004 | ||
|
1005 | The following arguments are only used when balanced is True: | |
|
1006 | ||
|
1007 | after : Dependency or collection of msg_ids | |
|
1008 | Only for load-balanced execution (targets=None) | |
|
1009 | Specify a list of msg_ids as a time-based dependency. | |
|
1010 | This job will only be run *after* the dependencies | |
|
1011 | have been met. | |
|
1012 | ||
|
1013 | follow : Dependency or collection of msg_ids | |
|
1014 | Only for load-balanced execution (targets=None) | |
|
1015 | Specify a list of msg_ids as a location-based dependency. | |
|
1016 | This job will only be run on an engine where this dependency | |
|
1017 | is met. | |
|
1018 | ||
|
1019 | timeout : float/int or None | |
|
1020 | Only for load-balanced execution (targets=None) | |
|
1021 | Specify an amount of time (in seconds) for the scheduler to | |
|
1022 | wait for dependencies to be met before failing with a | |
|
1023 | DependencyTimeout. | |
|
1024 | ||
|
1025 | Returns | |
|
1026 | ------- | |
|
1027 | ||
|
1028 | if block is False: | |
|
1029 | return AsyncResult wrapping msg_ids | |
|
1030 | output of AsyncResult.get() is identical to that of `apply(...block=True)` | |
|
1031 | else: | |
|
1032 | if single target (or balanced): | |
|
1033 | return result of `f(*args, **kwargs)` | |
|
1034 | else: | |
|
1035 | return list of results, matching `targets` | |
|
916 | This is the principal method with which all engine execution is performed by views. | |
|
1036 | 917 | """ |
|
918 | ||
|
1037 | 919 | assert not self._closed, "cannot use me anymore, I'm closed!" |
|
1038 | 920 | # defaults: |
|
1039 | block = block if block is not None else self.block | |
|
1040 | 921 | args = args if args is not None else [] |
|
1041 | 922 | kwargs = kwargs if kwargs is not None else {} |
|
923 | subheader = subheader if subheader is not None else {} | |
|
1042 | 924 | |
|
1043 | if not self._ids: | |
|
1044 | # flush notification socket if no engines yet | |
|
1045 | any_ids = self.ids | |
|
1046 | if not any_ids: | |
|
1047 | raise error.NoEnginesRegistered("Can't execute without any connected engines.") | |
|
1048 | ||
|
1049 | if balanced is None: | |
|
1050 | if targets is None: | |
|
1051 | # default to balanced if targets unspecified | |
|
1052 | balanced = True | |
|
1053 | else: | |
|
1054 | # otherwise default to multiplexing | |
|
1055 | balanced = False | |
|
1056 | ||
|
1057 | if targets is None and balanced is False: | |
|
1058 | # default to all if *not* balanced, and targets is unspecified | |
|
1059 | targets = 'all' | |
|
1060 | ||
|
1061 | # enforce types of f,args,kwrags | |
|
925 | # validate arguments | |
|
1062 | 926 | if not callable(f): |
|
1063 | 927 | raise TypeError("f must be callable, not %s"%type(f)) |
|
1064 | 928 | if not isinstance(args, (tuple, list)): |
|
1065 | 929 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
1066 | 930 | if not isinstance(kwargs, dict): |
|
1067 | 931 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
932 | if not isinstance(subheader, dict): | |
|
933 | raise TypeError("subheader must be dict, not %s"%type(subheader)) | |
|
1068 | 934 | |
|
1069 | options = dict(bound=bound, block=block, targets=targets, track=track) | |
|
1070 | ||
|
1071 | if balanced: | |
|
1072 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
|
1073 | after=after, follow=follow, **options) | |
|
1074 | elif follow or after or timeout: | |
|
1075 | msg = "follow, after, and timeout args are only used for" | |
|
1076 | msg += " load-balanced execution." | |
|
1077 | raise ValueError(msg) | |
|
1078 | else: | |
|
1079 | return self._apply_direct(f, args, kwargs, **options) | |
|
1080 | ||
|
1081 | def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, | |
|
1082 | after=None, follow=None, timeout=None, track=None): | |
|
1083 | """call f(*args, **kwargs) remotely in a load-balanced manner. | |
|
1084 | ||
|
1085 | This is a private method, see `apply` for details. | |
|
1086 | Not to be called directly! | |
|
1087 | """ | |
|
1088 | ||
|
1089 | loc = locals() | |
|
1090 | for name in ('bound', 'block', 'track'): | |
|
1091 | assert loc[name] is not None, "kwarg %r must be specified!"%name | |
|
1092 | ||
|
1093 | if not self._task_ident: | |
|
1094 | msg = "Task farming is disabled" | |
|
1095 | if self._task_scheme == 'pure': | |
|
1096 | msg += " because the pure ZMQ scheduler cannot handle" | |
|
1097 | msg += " disappearing engines." | |
|
1098 | raise RuntimeError(msg) | |
|
1099 | ||
|
1100 | if self._task_scheme == 'pure': | |
|
1101 | # pure zmq scheme doesn't support dependencies | |
|
1102 | msg = "Pure ZMQ scheduler doesn't support dependencies" | |
|
1103 | if (follow or after): | |
|
1104 | # hard fail on DAG dependencies | |
|
1105 | raise RuntimeError(msg) | |
|
1106 | if isinstance(f, dependent): | |
|
1107 | # soft warn on functional dependencies | |
|
1108 | warnings.warn(msg, RuntimeWarning) | |
|
1109 | ||
|
1110 | # defaults: | |
|
1111 | args = args if args is not None else [] | |
|
1112 | kwargs = kwargs if kwargs is not None else {} | |
|
1113 | ||
|
1114 | if targets: | |
|
1115 | idents,_ = self._build_targets(targets) | |
|
1116 | else: | |
|
1117 | idents = [] | |
|
935 | if not self._ids: | |
|
936 | # flush notification socket if no engines yet | |
|
937 | any_ids = self.ids | |
|
938 | if not any_ids: | |
|
939 | raise error.NoEnginesRegistered("Can't execute without any connected engines.") | |
|
940 | # enforce types of f,args,kwargs | |
|
1118 | 941 | |
|
1119 | after = self._build_dependency(after) | |
|
1120 | follow = self._build_dependency(follow) | |
|
1121 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) | |
|
1122 | 942 | bufs = util.pack_apply_message(f,args,kwargs) |
|
1123 | content = dict(bound=bound) | |
|
1124 | 943 | |
|
1125 |
msg = self.session.send( |
|
|
1126 |
|
|
|
944 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, | |
|
945 | subheader=subheader, track=track) | |
|
946 | ||
|
1127 | 947 | msg_id = msg['msg_id'] |
|
1128 | 948 | self.outstanding.add(msg_id) |
|
949 | if ident: | |
|
950 | # possibly routed to a specific engine | |
|
951 | if isinstance(ident, list): | |
|
952 | ident = ident[-1] | |
|
953 | if ident in self._engines.values(): | |
|
954 | # save for later, in case of engine death | |
|
955 | self._outstanding_dict[ident].add(msg_id) | |
|
1129 | 956 | self.history.append(msg_id) |
|
1130 | 957 | self.metadata[msg_id]['submitted'] = datetime.now() |
|
1131 | tracker = None if track is False else msg['tracker'] | |
|
1132 | ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker) | |
|
1133 | if block: | |
|
1134 | try: | |
|
1135 | return ar.get() | |
|
1136 | except KeyboardInterrupt: | |
|
1137 | return ar | |
|
1138 | else: | |
|
1139 | return ar | |
|
1140 | ||
|
1141 | def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None, | |
|
1142 | track=None): | |
|
1143 | """Then underlying method for applying functions to specific engines | |
|
1144 | via the MUX queue. | |
|
1145 | ||
|
1146 | This is a private method, see `apply` for details. | |
|
1147 | Not to be called directly! | |
|
1148 | """ | |
|
1149 | ||
|
1150 | if not self._mux_ident: | |
|
1151 | msg = "Multiplexing is disabled" | |
|
1152 | raise RuntimeError(msg) | |
|
1153 | 958 | |
|
1154 | loc = locals() | |
|
1155 | for name in ('bound', 'block', 'targets', 'track'): | |
|
1156 | assert loc[name] is not None, "kwarg %r must be specified!"%name | |
|
1157 | ||
|
1158 | idents,targets = self._build_targets(targets) | |
|
1159 | ||
|
1160 | subheader = {} | |
|
1161 | content = dict(bound=bound) | |
|
1162 | bufs = util.pack_apply_message(f,args,kwargs) | |
|
1163 | ||
|
1164 | msg_ids = [] | |
|
1165 | trackers = [] | |
|
1166 | for ident in idents: | |
|
1167 | msg = self.session.send(self._apply_socket, "apply_request", | |
|
1168 | content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader, | |
|
1169 | track=track) | |
|
1170 | if track: | |
|
1171 | trackers.append(msg['tracker']) | |
|
1172 | msg_id = msg['msg_id'] | |
|
1173 | self.outstanding.add(msg_id) | |
|
1174 | self._outstanding_dict[ident].add(msg_id) | |
|
1175 | self.history.append(msg_id) | |
|
1176 | msg_ids.append(msg_id) | |
|
1177 | ||
|
1178 | tracker = None if track is False else zmq.MessageTracker(*trackers) | |
|
1179 | ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker) | |
|
1180 | ||
|
1181 | if block: | |
|
1182 | try: | |
|
1183 | return ar.get() | |
|
1184 | except KeyboardInterrupt: | |
|
1185 | return ar | |
|
1186 | else: | |
|
1187 | return ar | |
|
1188 | ||
|
959 | return msg | |
|
960 | ||
|
1189 | 961 | #-------------------------------------------------------------------------- |
|
1190 | 962 | # construct a View object |
|
1191 | 963 | #-------------------------------------------------------------------------- |
|
1192 | 964 | |
|
1193 | @defaultblock | |
|
1194 | def remote(self, bound=False, block=None, targets=None, balanced=None): | |
|
1195 | """Decorator for making a RemoteFunction""" | |
|
1196 | return remote(self, bound=bound, targets=targets, block=block, balanced=balanced) | |
|
1197 | ||
|
1198 | @defaultblock | |
|
1199 | def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None): | |
|
1200 | """Decorator for making a ParallelFunction""" | |
|
1201 | return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced) | |
|
1202 | ||
|
1203 | 965 | def _cache_view(self, targets, balanced): |
|
1204 | 966 | """save views, so subsequent requests don't create new objects.""" |
|
1205 | 967 | if balanced: |
|
968 | # validate whether we can run | |
|
969 | if not self._task_socket: | |
|
970 | msg = "Task farming is disabled" | |
|
971 | if self._task_scheme == 'pure': | |
|
972 | msg += " because the pure ZMQ scheduler cannot handle" | |
|
973 | msg += " disappearing engines." | |
|
974 | raise RuntimeError(msg) | |
|
975 | socket = self._task_socket | |
|
1206 | 976 | view_class = LoadBalancedView |
|
1207 | 977 | view_cache = self._balanced_views |
|
1208 | 978 | else: |
|
979 | socket = self._mux_socket | |
|
1209 | 980 | view_class = DirectView |
|
1210 | 981 | view_cache = self._direct_views |
|
1211 | 982 | |
|
1212 | 983 | # use str, since often targets will be a list |
|
1213 | 984 | key = str(targets) |
|
1214 | 985 | if key not in view_cache: |
|
1215 | view_cache[key] = view_class(client=self, targets=targets) | |
|
986 | view_cache[key] = view_class(client=self, socket=socket, targets=targets) | |
|
1216 | 987 | |
|
1217 | 988 | return view_cache[key] |
|
1218 | 989 | |
|
1219 |
def view(self, targets=None |
|
|
990 | def load_balanced_view(self, targets=None): | |
|
991 | """construct a DirectView object. | |
|
992 | ||
|
993 | If no arguments are specified, create a LoadBalancedView | |
|
994 | using all engines. | |
|
995 | ||
|
996 | Parameters | |
|
997 | ---------- | |
|
998 | ||
|
999 | targets: list,slice,int,etc. [default: use all engines] | |
|
1000 | The subset of engines across which to load-balance | |
|
1001 | """ | |
|
1002 | return self._get_view(targets, balanced=True) | |
|
1003 | ||
|
1004 | def direct_view(self, targets='all'): | |
|
1005 | """construct a DirectView object. | |
|
1006 | ||
|
1007 | If no targets are specified, create a DirectView | |
|
1008 | using all engines. | |
|
1009 | ||
|
1010 | Parameters | |
|
1011 | ---------- | |
|
1012 | ||
|
1013 | targets: list,slice,int,etc. [default: use all engines] | |
|
1014 | The engines to use for the View | |
|
1015 | """ | |
|
1016 | return self._get_view(targets, balanced=False) | |
|
1017 | ||
|
1018 | def _get_view(self, targets, balanced): | |
|
1220 | 1019 | """Method for constructing View objects. |
|
1221 | 1020 | |
|
1222 | 1021 | If no arguments are specified, create a LoadBalancedView |
@@ -1234,9 +1033,7 b' class Client(HasTraits):' | |||
|
1234 | 1033 | |
|
1235 | 1034 | """ |
|
1236 | 1035 | |
|
1237 | balanced = (targets is None) if balanced is None else balanced | |
|
1238 | ||
|
1239 | if targets is None: | |
|
1036 | if targets in (None,'all'): | |
|
1240 | 1037 | if balanced: |
|
1241 | 1038 | return self._cache_view(None,True) |
|
1242 | 1039 | else: |
@@ -1261,20 +1058,20 b' class Client(HasTraits):' | |||
|
1261 | 1058 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
1262 | 1059 | |
|
1263 | 1060 | #-------------------------------------------------------------------------- |
|
1264 | # Data movement | |
|
1061 | # Data movement (TO BE REMOVED) | |
|
1265 | 1062 | #-------------------------------------------------------------------------- |
|
1266 | 1063 | |
|
1267 | @defaultblock | |
|
1268 | def push(self, ns, targets='all', block=None, track=False): | |
|
1064 | @default_block | |
|
1065 | def _push(self, ns, targets='all', block=None, track=False): | |
|
1269 | 1066 | """Push the contents of `ns` into the namespace on `target`""" |
|
1270 | 1067 | if not isinstance(ns, dict): |
|
1271 | 1068 | raise TypeError("Must be a dict, not %s"%type(ns)) |
|
1272 | result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track) | |
|
1069 | result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track) | |
|
1273 | 1070 | if not block: |
|
1274 | 1071 | return result |
|
1275 | 1072 | |
|
1276 | @defaultblock | |
|
1277 | def pull(self, keys, targets='all', block=None): | |
|
1073 | @default_block | |
|
1074 | def _pull(self, keys, targets='all', block=None): | |
|
1278 | 1075 | """Pull objects from `target`'s namespace by `keys`""" |
|
1279 | 1076 | if isinstance(keys, basestring): |
|
1280 | 1077 | pass |
@@ -1284,64 +1081,15 b' class Client(HasTraits):' | |||
|
1284 | 1081 | raise TypeError("keys must be str, not type %r"%type(key)) |
|
1285 | 1082 | else: |
|
1286 | 1083 | raise TypeError("keys must be strs, not %r"%keys) |
|
1287 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | |
|
1084 | result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | |
|
1288 | 1085 | return result |
|
1289 | 1086 | |
|
1290 | @defaultblock | |
|
1291 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False): | |
|
1292 | """ | |
|
1293 | Partition a Python sequence and send the partitions to a set of engines. | |
|
1294 | """ | |
|
1295 | targets = self._build_targets(targets)[-1] | |
|
1296 | mapObject = Map.dists[dist]() | |
|
1297 | nparts = len(targets) | |
|
1298 | msg_ids = [] | |
|
1299 | trackers = [] | |
|
1300 | for index, engineid in enumerate(targets): | |
|
1301 | partition = mapObject.getPartition(seq, index, nparts) | |
|
1302 | if flatten and len(partition) == 1: | |
|
1303 | r = self.push({key: partition[0]}, targets=engineid, block=False, track=track) | |
|
1304 | else: | |
|
1305 | r = self.push({key: partition}, targets=engineid, block=False, track=track) | |
|
1306 | msg_ids.extend(r.msg_ids) | |
|
1307 | if track: | |
|
1308 | trackers.append(r._tracker) | |
|
1309 | ||
|
1310 | if track: | |
|
1311 | tracker = zmq.MessageTracker(*trackers) | |
|
1312 | else: | |
|
1313 | tracker = None | |
|
1314 | ||
|
1315 | r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker) | |
|
1316 | if block: | |
|
1317 | r.wait() | |
|
1318 | else: | |
|
1319 | return r | |
|
1320 | ||
|
1321 | @defaultblock | |
|
1322 | def gather(self, key, dist='b', targets='all', block=None): | |
|
1323 | """ | |
|
1324 | Gather a partitioned sequence on a set of engines as a single local seq. | |
|
1325 | """ | |
|
1326 | ||
|
1327 | targets = self._build_targets(targets)[-1] | |
|
1328 | mapObject = Map.dists[dist]() | |
|
1329 | msg_ids = [] | |
|
1330 | for index, engineid in enumerate(targets): | |
|
1331 | msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids) | |
|
1332 | ||
|
1333 | r = AsyncMapResult(self, msg_ids, mapObject, fname='gather') | |
|
1334 | if block: | |
|
1335 | return r.get() | |
|
1336 | else: | |
|
1337 | return r | |
|
1338 | ||
|
1339 | 1087 | #-------------------------------------------------------------------------- |
|
1340 | 1088 | # Query methods |
|
1341 | 1089 | #-------------------------------------------------------------------------- |
|
1342 | 1090 | |
|
1343 | @spinfirst | |
|
1344 | @defaultblock | |
|
1091 | @spin_first | |
|
1092 | @default_block | |
|
1345 | 1093 | def get_result(self, indices_or_msg_ids=None, block=None): |
|
1346 | 1094 | """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. |
|
1347 | 1095 | |
@@ -1406,7 +1154,7 b' class Client(HasTraits):' | |||
|
1406 | 1154 | |
|
1407 | 1155 | return ar |
|
1408 | 1156 | |
|
1409 | @spinfirst | |
|
1157 | @spin_first | |
|
1410 | 1158 | def result_status(self, msg_ids, status_only=True): |
|
1411 | 1159 | """Check on the status of the result(s) of the apply request with `msg_ids`. |
|
1412 | 1160 | |
@@ -1505,7 +1253,7 b' class Client(HasTraits):' | |||
|
1505 | 1253 | error.collect_exceptions(failures, "result_status") |
|
1506 | 1254 | return content |
|
1507 | 1255 | |
|
1508 | @spinfirst | |
|
1256 | @spin_first | |
|
1509 | 1257 | def queue_status(self, targets='all', verbose=False): |
|
1510 | 1258 | """Fetch the status of engine queues. |
|
1511 | 1259 | |
@@ -1518,8 +1266,8 b' class Client(HasTraits):' | |||
|
1518 | 1266 | verbose : bool |
|
1519 | 1267 | Whether to return lengths only, or lists of ids for each element |
|
1520 | 1268 | """ |
|
1521 |
|
|
|
1522 |
content = dict(targets= |
|
|
1269 | engine_ids = self._build_targets(targets)[1] | |
|
1270 | content = dict(targets=engine_ids, verbose=verbose) | |
|
1523 | 1271 | self.session.send(self._query_socket, "queue_request", content=content) |
|
1524 | 1272 | idents,msg = self.session.recv(self._query_socket, 0) |
|
1525 | 1273 | if self.debug: |
@@ -1528,11 +1276,15 b' class Client(HasTraits):' | |||
|
1528 | 1276 | status = content.pop('status') |
|
1529 | 1277 | if status != 'ok': |
|
1530 | 1278 | raise self._unwrap_exception(content) |
|
1531 |
|
|
|
1279 | content = util.rekey(content) | |
|
1280 | if isinstance(targets, int): | |
|
1281 | return content[targets] | |
|
1282 | else: | |
|
1283 | return content | |
|
1532 | 1284 | |
|
1533 | @spinfirst | |
|
1285 | @spin_first | |
|
1534 | 1286 | def purge_results(self, jobs=[], targets=[]): |
|
1535 |
"""Tell the |
|
|
1287 | """Tell the Hub to forget results. | |
|
1536 | 1288 | |
|
1537 | 1289 | Individual results can be purged by msg_id, or the entire |
|
1538 | 1290 | history of specific targets can be purged. |
@@ -1540,11 +1292,11 b' class Client(HasTraits):' | |||
|
1540 | 1292 | Parameters |
|
1541 | 1293 | ---------- |
|
1542 | 1294 | |
|
1543 |
jobs : str or list of str |
|
|
1295 | jobs : str or list of str or AsyncResult objects | |
|
1544 | 1296 | the msg_ids whose results should be forgotten. |
|
1545 | 1297 | targets : int/str/list of ints/strs |
|
1546 | 1298 | The targets, by uuid or int_id, whose entire history is to be purged. |
|
1547 |
Use `targets='all'` to scrub everything from the |
|
|
1299 | Use `targets='all'` to scrub everything from the Hub's memory. | |
|
1548 | 1300 | |
|
1549 | 1301 | default : None |
|
1550 | 1302 | """ |
@@ -6,11 +6,9 b'' | |||
|
6 | 6 | # the file COPYING, distributed as part of this software. |
|
7 | 7 | #----------------------------------------------------------------------------- |
|
8 | 8 | |
|
9 | from IPython.external.decorator import decorator | |
|
10 | ||
|
11 | 9 | from .asyncresult import AsyncResult |
|
12 | 10 | from .error import UnmetDependency |
|
13 | ||
|
11 | from .util import interactive | |
|
14 | 12 | |
|
15 | 13 | class depend(object): |
|
16 | 14 | """Dependency decorator, for use with tasks. |
@@ -54,6 +52,8 b' class dependent(object):' | |||
|
54 | 52 | self.dkwargs = dkwargs |
|
55 | 53 | |
|
56 | 54 | def __call__(self, *args, **kwargs): |
|
55 | # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'): | |
|
56 | # self.df.func_globals = self.f.func_globals | |
|
57 | 57 | if self.df(*self.dargs, **self.dkwargs) is False: |
|
58 | 58 | raise UnmetDependency() |
|
59 | 59 | return self.f(*args, **kwargs) |
@@ -62,13 +62,18 b' class dependent(object):' | |||
|
62 | 62 | def __name__(self): |
|
63 | 63 | return self.func_name |
|
64 | 64 | |
|
65 | @interactive | |
|
65 | 66 | def _require(*names): |
|
66 | 67 | """Helper for @require decorator.""" |
|
68 | from IPython.zmq.parallel.error import UnmetDependency | |
|
69 | user_ns = globals() | |
|
67 | 70 | for name in names: |
|
71 | if name in user_ns: | |
|
72 | continue | |
|
68 | 73 | try: |
|
69 |
|
|
|
74 | exec 'import %s'%name in user_ns | |
|
70 | 75 | except ImportError: |
|
71 | return False | |
|
76 | raise UnmetDependency(name) | |
|
72 | 77 | return True |
|
73 | 78 | |
|
74 | 79 | def require(*names): |
@@ -96,54 +101,73 b' class Dependency(set):' | |||
|
96 | 101 | all : bool [default True] |
|
97 | 102 | Whether the dependency should be considered met when *all* depending tasks have completed |
|
98 | 103 | or only when *any* have been completed. |
|
99 |
success |
|
|
100 |
Whether to consider |
|
|
101 | If `all=success_only=True`, then this task will fail with an ImpossibleDependency | |
|
104 | success : bool [default True] | |
|
105 | Whether to consider successes as fulfilling dependencies. | |
|
106 | failure : bool [default False] | |
|
107 | Whether to consider failures as fulfilling dependencies. | |
|
108 | ||
|
109 | If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency | |
|
102 | 110 | as soon as the first depended-upon task fails. |
|
103 | 111 | """ |
|
104 | 112 | |
|
105 | 113 | all=True |
|
106 |
success |
|
|
114 | success=True | |
|
115 | failure=True | |
|
107 | 116 | |
|
108 |
def __init__(self, dependencies=[], all=True, success |
|
|
117 | def __init__(self, dependencies=[], all=True, success=True, failure=False): | |
|
109 | 118 | if isinstance(dependencies, dict): |
|
110 | 119 | # load from dict |
|
111 | 120 | all = dependencies.get('all', True) |
|
112 |
success |
|
|
121 | success = dependencies.get('success', success) | |
|
122 | failure = dependencies.get('failure', failure) | |
|
113 | 123 | dependencies = dependencies.get('dependencies', []) |
|
114 | 124 | ids = [] |
|
115 | if isinstance(dependencies, AsyncResult): | |
|
116 | ids.extend(AsyncResult.msg_ids) | |
|
117 | else: | |
|
118 |
|
|
|
119 | if isinstance(d, basestring): | |
|
120 | ids.append(d) | |
|
121 | elif isinstance(d, AsyncResult): | |
|
122 | ids.extend(d.msg_ids) | |
|
123 |
|
|
|
124 | raise TypeError("invalid dependency type: %r"%type(d)) | |
|
125 | ||
|
126 | # extract ids from various sources: | |
|
127 | if isinstance(dependencies, (basestring, AsyncResult)): | |
|
128 | dependencies = [dependencies] | |
|
129 | for d in dependencies: | |
|
130 | if isinstance(d, basestring): | |
|
131 | ids.append(d) | |
|
132 | elif isinstance(d, AsyncResult): | |
|
133 | ids.extend(d.msg_ids) | |
|
134 | else: | |
|
135 | raise TypeError("invalid dependency type: %r"%type(d)) | |
|
136 | ||
|
125 | 137 | set.__init__(self, ids) |
|
126 | 138 | self.all = all |
|
127 | self.success_only=success_only | |
|
139 | if not (success or failure): | |
|
140 | raise ValueError("Must depend on at least one of successes or failures!") | |
|
141 | self.success=success | |
|
142 | self.failure = failure | |
|
128 | 143 | |
|
129 | 144 | def check(self, completed, failed=None): |
|
130 | if failed is not None and not self.success_only: | |
|
131 | completed = completed.union(failed) | |
|
145 | """check whether our dependencies have been met.""" | |
|
132 | 146 | if len(self) == 0: |
|
133 | 147 | return True |
|
148 | against = set() | |
|
149 | if self.success: | |
|
150 | against = completed | |
|
151 | if failed is not None and self.failure: | |
|
152 | against = against.union(failed) | |
|
134 | 153 | if self.all: |
|
135 |
return self.issubset( |
|
|
154 | return self.issubset(against) | |
|
136 | 155 | else: |
|
137 |
return not self.isdisjoint( |
|
|
156 | return not self.isdisjoint(against) | |
|
138 | 157 | |
|
139 | def unreachable(self, failed): | |
|
140 | if len(self) == 0 or len(failed) == 0 or not self.success_only: | |
|
158 | def unreachable(self, completed, failed=None): | |
|
159 | """return whether this dependency has become impossible.""" | |
|
160 | if len(self) == 0: | |
|
141 | 161 | return False |
|
142 | # print self, self.success_only, self.all, failed | |
|
162 | against = set() | |
|
163 | if not self.success: | |
|
164 | against = completed | |
|
165 | if failed is not None and not self.failure: | |
|
166 | against = against.union(failed) | |
|
143 | 167 | if self.all: |
|
144 |
return not self.isdisjoint( |
|
|
168 | return not self.isdisjoint(against) | |
|
145 | 169 | else: |
|
146 |
return self.issubset( |
|
|
170 | return self.issubset(against) | |
|
147 | 171 | |
|
148 | 172 | |
|
149 | 173 | def as_dict(self): |
@@ -151,9 +175,10 b' class Dependency(set):' | |||
|
151 | 175 | return dict( |
|
152 | 176 | dependencies=list(self), |
|
153 | 177 | all=self.all, |
|
154 |
success |
|
|
178 | success=self.success, | |
|
179 | failure=self.failure | |
|
155 | 180 | ) |
|
156 | ||
|
181 | ||
|
157 | 182 | |
|
158 | 183 | __all__ = ['depend', 'require', 'dependent', 'Dependency'] |
|
159 | 184 |
@@ -890,13 +890,9 b' class Hub(LoggingFactory):' | |||
|
890 | 890 | |
|
891 | 891 | def shutdown_request(self, client_id, msg): |
|
892 | 892 | """handle shutdown request.""" |
|
893 | # s = self.context.socket(zmq.XREQ) | |
|
894 | # s.connect(self.client_connections['mux']) | |
|
895 | # time.sleep(0.1) | |
|
896 | # for eid,ec in self.engines.iteritems(): | |
|
897 | # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue) | |
|
898 | # time.sleep(1) | |
|
899 | 893 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) |
|
894 | # also notify other clients of shutdown | |
|
895 | self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) | |
|
900 | 896 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) |
|
901 | 897 | dc.start() |
|
902 | 898 |
@@ -1,4 +1,4 b'' | |||
|
1 |
"""Remote Functions and decorators for |
|
|
1 | """Remote Functions and decorators for Views.""" | |
|
2 | 2 | #----------------------------------------------------------------------------- |
|
3 | 3 | # Copyright (C) 2010 The IPython Development Team |
|
4 | 4 | # |
@@ -22,33 +22,33 b' from .asyncresult import AsyncMapResult' | |||
|
22 | 22 | #----------------------------------------------------------------------------- |
|
23 | 23 | |
|
24 | 24 | @testdec.skip_doctest |
|
25 | def remote(client, bound=False, block=None, targets=None, balanced=None): | |
|
25 | def remote(view, block=None, **flags): | |
|
26 | 26 | """Turn a function into a remote function. |
|
27 | 27 | |
|
28 | 28 | This method can be used for map: |
|
29 | 29 | |
|
30 |
In [1]: @remote( |
|
|
30 | In [1]: @remote(view,block=True) | |
|
31 | 31 | ...: def func(a): |
|
32 | 32 | ...: pass |
|
33 | 33 | """ |
|
34 | 34 | |
|
35 | 35 | def remote_function(f): |
|
36 |
return RemoteFunction( |
|
|
36 | return RemoteFunction(view, f, block=block, **flags) | |
|
37 | 37 | return remote_function |
|
38 | 38 | |
|
39 | 39 | @testdec.skip_doctest |
|
40 |
def parallel( |
|
|
40 | def parallel(view, dist='b', block=None, **flags): | |
|
41 | 41 | """Turn a function into a parallel remote function. |
|
42 | 42 | |
|
43 | 43 | This method can be used for map: |
|
44 | 44 | |
|
45 |
In [1]: @parallel( |
|
|
45 | In [1]: @parallel(view, block=True) | |
|
46 | 46 | ...: def func(a): |
|
47 | 47 | ...: pass |
|
48 | 48 | """ |
|
49 | 49 | |
|
50 | 50 | def parallel_function(f): |
|
51 |
return ParallelFunction( |
|
|
51 | return ParallelFunction(view, f, dist=dist, block=block, **flags) | |
|
52 | 52 | return parallel_function |
|
53 | 53 | |
|
54 | 54 | #-------------------------------------------------------------------------- |
@@ -61,44 +61,32 b' class RemoteFunction(object):' | |||
|
61 | 61 | Parameters |
|
62 | 62 | ---------- |
|
63 | 63 | |
|
64 |
|
|
|
65 |
The |
|
|
64 | view : View instance | |
|
65 | The view to be used for execution | |
|
66 | 66 | f : callable |
|
67 | 67 | The function to be wrapped into a remote function |
|
68 | bound : bool [default: False] | |
|
69 | Whether the affect the remote namespace when called | |
|
70 | 68 | block : bool [default: None] |
|
71 | 69 | Whether to wait for results or not. The default behavior is |
|
72 |
to use the current `block` attribute of ` |
|
|
73 | targets : valid target list [default: all] | |
|
74 | The targets on which to execute. | |
|
75 | balanced : bool | |
|
76 | Whether to load-balance with the Task scheduler or not | |
|
70 | to use the current `block` attribute of `view` | |
|
71 | ||
|
72 | **flags : remaining kwargs are passed to View.temp_flags | |
|
77 | 73 | """ |
|
78 | 74 | |
|
79 |
|
|
|
75 | view = None # the remote connection | |
|
80 | 76 | func = None # the wrapped function |
|
81 | 77 | block = None # whether to block |
|
82 | bound = None # whether to affect the namespace | |
|
83 | targets = None # where to execute | |
|
84 | balanced = None # whether to load-balance | |
|
78 | flags = None # dict of extra kwargs for temp_flags | |
|
85 | 79 | |
|
86 |
def __init__(self, |
|
|
87 |
self. |
|
|
80 | def __init__(self, view, f, block=None, **flags): | |
|
81 | self.view = view | |
|
88 | 82 | self.func = f |
|
89 | 83 | self.block=block |
|
90 | self.bound=bound | |
|
91 | self.targets=targets | |
|
92 | if balanced is None: | |
|
93 | if targets is None: | |
|
94 | balanced = True | |
|
95 | else: | |
|
96 | balanced = False | |
|
97 | self.balanced = balanced | |
|
84 | self.flags=flags | |
|
98 | 85 | |
|
99 | 86 | def __call__(self, *args, **kwargs): |
|
100 | return self.client.apply(self.func, args=args, kwargs=kwargs, | |
|
101 | block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) | |
|
87 | block = self.view.block if self.block is None else self.block | |
|
88 | with self.view.temp_flags(block=block, **self.flags): | |
|
89 | return self.view.apply(self.func, *args, **kwargs) | |
|
102 | 90 | |
|
103 | 91 | |
|
104 | 92 | class ParallelFunction(RemoteFunction): |
@@ -111,51 +99,57 b' class ParallelFunction(RemoteFunction):' | |||
|
111 | 99 | Parameters |
|
112 | 100 | ---------- |
|
113 | 101 | |
|
114 |
|
|
|
115 |
The |
|
|
102 | view : View instance | |
|
103 | The view to be used for execution | |
|
116 | 104 | f : callable |
|
117 | 105 | The function to be wrapped into a remote function |
|
118 |
|
|
|
119 | Whether the affect the remote namespace when called | |
|
106 | dist : str [default: 'b'] | |
|
107 | The key for which mapObject to use to distribute sequences | |
|
108 | options are: | |
|
109 | * 'b' : use contiguous chunks in order | |
|
110 | * 'r' : use round-robin striping | |
|
120 | 111 | block : bool [default: None] |
|
121 | 112 | Whether to wait for results or not. The default behavior is |
|
122 |
to use the current `block` attribute of ` |
|
|
123 | targets : valid target list [default: all] | |
|
124 | The targets on which to execute. | |
|
125 | balanced : bool | |
|
126 | Whether to load-balance with the Task scheduler or not | |
|
127 | chunk_size : int or None | |
|
113 | to use the current `block` attribute of `view` | |
|
114 | chunksize : int or None | |
|
128 | 115 | The size of chunk to use when breaking up sequences in a load-balanced manner |
|
116 | **flags : remaining kwargs are passed to View.temp_flags | |
|
129 | 117 | """ |
|
130 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): | |
|
131 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | |
|
132 | self.chunk_size = chunk_size | |
|
118 | ||
|
119 | chunksize=None | |
|
120 | mapObject=None | |
|
121 | ||
|
122 | def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): | |
|
123 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |
|
124 | self.chunksize = chunksize | |
|
133 | 125 | |
|
134 | 126 | mapClass = Map.dists[dist] |
|
135 | 127 | self.mapObject = mapClass() |
|
136 | 128 | |
|
137 | 129 | def __call__(self, *sequences): |
|
130 | # check that the length of sequences match | |
|
138 | 131 | len_0 = len(sequences[0]) |
|
139 | 132 | for s in sequences: |
|
140 | 133 | if len(s)!=len_0: |
|
141 | 134 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) |
|
142 | 135 | raise ValueError(msg) |
|
143 | ||
|
144 |
if |
|
|
145 |
if self.chunk |
|
|
146 |
nparts = len_0/self.chunk |
|
|
136 | balanced = 'Balanced' in self.view.__class__.__name__ | |
|
137 | if balanced: | |
|
138 | if self.chunksize: | |
|
139 | nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) | |
|
147 | 140 | else: |
|
148 | 141 | nparts = len_0 |
|
149 |
targets = [ |
|
|
142 | targets = [None]*nparts | |
|
150 | 143 | else: |
|
151 |
if self.chunk |
|
|
152 |
warnings.warn("`chunk |
|
|
144 | if self.chunksize: | |
|
145 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | |
|
153 | 146 | # multiplexed: |
|
154 |
targets = self. |
|
|
147 | targets = self.view.targets | |
|
155 | 148 | nparts = len(targets) |
|
156 | 149 | |
|
157 | 150 | msg_ids = [] |
|
158 | 151 | # my_f = lambda *a: map(self.func, *a) |
|
152 | client = self.view.client | |
|
159 | 153 | for index, t in enumerate(targets): |
|
160 | 154 | args = [] |
|
161 | 155 | for seq in sequences: |
@@ -173,12 +167,15 b' class ParallelFunction(RemoteFunction):' | |||
|
173 | 167 | args = [self.func]+args |
|
174 | 168 | else: |
|
175 | 169 | f=self.func |
|
176 | ar = self.client.apply(f, args=args, block=False, bound=self.bound, | |
|
177 | targets=t, balanced=self.balanced) | |
|
170 | ||
|
171 | view = self.view if balanced else client[t] | |
|
172 | with view.temp_flags(block=False, **self.flags): | |
|
173 | ar = view.apply(f, *args) | |
|
178 | 174 | |
|
179 | 175 | msg_ids.append(ar.msg_ids[0]) |
|
180 | 176 | |
|
181 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) | |
|
177 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__) | |
|
178 | ||
|
182 | 179 | if self.block: |
|
183 | 180 | try: |
|
184 | 181 | return r.get() |
@@ -238,7 +238,7 b' class TaskScheduler(SessionFactory):' | |||
|
238 | 238 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
239 | 239 | parent = msg['header'] |
|
240 | 240 | idents = [idents[0],engine]+idents[1:] |
|
241 | print (idents) | |
|
241 | # print (idents) | |
|
242 | 242 | try: |
|
243 | 243 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
244 | 244 | except: |
@@ -277,8 +277,9 b' class TaskScheduler(SessionFactory):' | |||
|
277 | 277 | # time dependencies |
|
278 | 278 | after = Dependency(header.get('after', [])) |
|
279 | 279 | if after.all: |
|
280 | after.difference_update(self.all_completed) | |
|
281 | if not after.success_only: | |
|
280 | if after.success: | |
|
281 | after.difference_update(self.all_completed) | |
|
282 | if after.failure: | |
|
282 | 283 | after.difference_update(self.all_failed) |
|
283 | 284 | if after.check(self.all_completed, self.all_failed): |
|
284 | 285 | # recast as empty set, if `after` already met, |
@@ -302,7 +303,7 b' class TaskScheduler(SessionFactory):' | |||
|
302 | 303 | self.depending[msg_id] = args |
|
303 | 304 | return self.fail_unreachable(msg_id, error.InvalidDependency) |
|
304 | 305 | # check if unreachable: |
|
305 | if dep.unreachable(self.all_failed): | |
|
306 | if dep.unreachable(self.all_completed, self.all_failed): | |
|
306 | 307 | self.depending[msg_id] = args |
|
307 | 308 | return self.fail_unreachable(msg_id) |
|
308 | 309 | |
@@ -379,7 +380,11 b' class TaskScheduler(SessionFactory):' | |||
|
379 | 380 | if follow.all: |
|
380 | 381 | # check follow for impossibility |
|
381 | 382 | dests = set() |
|
382 | relevant = self.all_completed if follow.success_only else self.all_done | |
|
383 | relevant = set() | |
|
384 | if follow.success: | |
|
385 | relevant = self.all_completed | |
|
386 | if follow.failure: | |
|
387 | relevant = relevant.union(self.all_failed) | |
|
383 | 388 | for m in follow.intersection(relevant): |
|
384 | 389 | dests.add(self.destinations[m]) |
|
385 | 390 | if len(dests) > 1: |
@@ -514,11 +519,8 b' class TaskScheduler(SessionFactory):' | |||
|
514 | 519 | |
|
515 | 520 | for msg_id in jobs: |
|
516 | 521 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] |
|
517 | # if dep_id in after: | |
|
518 | # if after.all and (success or not after.success_only): | |
|
519 | # after.remove(dep_id) | |
|
520 | 522 | |
|
521 | if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): | |
|
523 | if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed): | |
|
522 | 524 | self.fail_unreachable(msg_id) |
|
523 | 525 | |
|
524 | 526 | elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run |
@@ -170,7 +170,7 b' class Kernel(SessionFactory):' | |||
|
170 | 170 | |
|
171 | 171 | content = dict(status='ok') |
|
172 | 172 | reply_msg = self.session.send(stream, 'abort_reply', content=content, |
|
173 |
parent=parent, ident=ident) |
|
|
173 | parent=parent, ident=ident) | |
|
174 | 174 | self.log.debug(str(reply_msg)) |
|
175 | 175 | |
|
176 | 176 | def shutdown_request(self, stream, ident, parent): |
@@ -184,10 +184,7 b' class Kernel(SessionFactory):' | |||
|
184 | 184 | content['status'] = 'ok' |
|
185 | 185 | msg = self.session.send(stream, 'shutdown_reply', |
|
186 | 186 | content=content, parent=parent, ident=ident) |
|
187 | # msg = self.session.send(self.pub_socket, 'shutdown_reply', | |
|
188 | # content, parent, ident) | |
|
189 | # print >> sys.__stdout__, msg | |
|
190 | # time.sleep(0.2) | |
|
187 | self.log.debug(str(msg)) | |
|
191 | 188 | dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) |
|
192 | 189 | dc.start() |
|
193 | 190 | |
@@ -295,7 +292,7 b' class Kernel(SessionFactory):' | |||
|
295 | 292 | content = parent[u'content'] |
|
296 | 293 | bufs = parent[u'buffers'] |
|
297 | 294 | msg_id = parent['header']['msg_id'] |
|
298 |
bound = |
|
|
295 | # bound = parent['header'].get('bound', False) | |
|
299 | 296 | except: |
|
300 | 297 | self.log.error("Got bad msg: %s"%parent, exc_info=True) |
|
301 | 298 | return |
@@ -314,16 +311,12 b' class Kernel(SessionFactory):' | |||
|
314 | 311 | working = self.user_ns |
|
315 | 312 | # suffix = |
|
316 | 313 | prefix = "_"+str(msg_id).replace("-","")+"_" |
|
317 |
|
|
|
318 | # | |
|
319 | # else: | |
|
320 | # working = dict() | |
|
321 | # suffix = prefix = "_" # prevent keyword collisions with lambda | |
|
314 | ||
|
322 | 315 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
323 | if bound: | |
|
324 | bound_ns = Namespace(working) | |
|
325 | args = [bound_ns]+list(args) | |
|
326 | # if f.fun | |
|
316 | # if bound: | |
|
317 | # bound_ns = Namespace(working) | |
|
318 | # args = [bound_ns]+list(args) | |
|
319 | ||
|
327 | 320 | fname = getattr(f, '__name__', 'f') |
|
328 | 321 | |
|
329 | 322 | fname = prefix+"f" |
@@ -341,8 +334,8 b' class Kernel(SessionFactory):' | |||
|
341 | 334 | finally: |
|
342 | 335 | for key in ns.iterkeys(): |
|
343 | 336 | working.pop(key) |
|
344 | if bound: | |
|
345 | working.update(bound_ns) | |
|
337 | # if bound: | |
|
338 | # working.update(bound_ns) | |
|
346 | 339 | |
|
347 | 340 | packed_result,buf = serialize_object(result) |
|
348 | 341 | result_buf = [packed_result]+buf |
@@ -364,9 +357,12 b' class Kernel(SessionFactory):' | |||
|
364 | 357 | |
|
365 | 358 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
366 | 359 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
367 | ||
|
368 | # if reply_msg['content']['status'] == u'error': | |
|
369 | # self.abort_queues() | |
|
360 | ||
|
361 | # flush i/o | |
|
362 | # should this be before reply_msg is sent, like in the single-kernel code, | |
|
363 | # or should nothing get in the way of real results? | |
|
364 | sys.stdout.flush() | |
|
365 | sys.stderr.flush() | |
|
370 | 366 | |
|
371 | 367 | def dispatch_queue(self, stream, msg): |
|
372 | 368 | self.control_stream.flush() |
@@ -1,5 +1,16 b'' | |||
|
1 | 1 | """toplevel setup/teardown for parallel tests.""" |
|
2 | 2 | |
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
10 | #------------------------------------------------------------------------------- | |
|
11 | # Imports | |
|
12 | #------------------------------------------------------------------------------- | |
|
13 | ||
|
3 | 14 | import tempfile |
|
4 | 15 | import time |
|
5 | 16 | from subprocess import Popen, PIPE, STDOUT |
@@ -15,17 +26,27 b' def setup():' | |||
|
15 | 26 | cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT) |
|
16 | 27 | processes.append(cp) |
|
17 | 28 | time.sleep(.5) |
|
18 | add_engine() | |
|
29 | add_engines(1) | |
|
19 | 30 | c = client.Client(profile='iptest') |
|
20 | 31 | while not c.ids: |
|
21 | 32 | time.sleep(.1) |
|
22 | 33 | c.spin() |
|
34 | c.close() | |
|
23 | 35 | |
|
24 | def add_engine(profile='iptest'): | |
|
25 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) | |
|
26 | # ep.start() | |
|
27 | processes.append(ep) | |
|
28 | return ep | |
|
36 | def add_engines(n=1, profile='iptest'): | |
|
37 | rc = client.Client(profile=profile) | |
|
38 | base = len(rc) | |
|
39 | eps = [] | |
|
40 | for i in range(n): | |
|
41 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT) | |
|
42 | # ep.start() | |
|
43 | processes.append(ep) | |
|
44 | eps.append(ep) | |
|
45 | while len(rc) < base+n: | |
|
46 | time.sleep(.1) | |
|
47 | rc.spin() | |
|
48 | rc.close() | |
|
49 | return eps | |
|
29 | 50 | |
|
30 | 51 | def teardown(): |
|
31 | 52 | time.sleep(1) |
@@ -1,3 +1,12 b'' | |||
|
1 | """base class for parallel client tests""" | |
|
2 | ||
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
1 | 10 | import sys |
|
2 | 11 | import tempfile |
|
3 | 12 | import time |
@@ -6,6 +15,7 b' from multiprocessing import Process' | |||
|
6 | 15 | |
|
7 | 16 | from nose import SkipTest |
|
8 | 17 | |
|
18 | import zmq | |
|
9 | 19 | from zmq.tests import BaseZMQTestCase |
|
10 | 20 | |
|
11 | 21 | from IPython.external.decorator import decorator |
@@ -14,7 +24,7 b' from IPython.zmq.parallel import error' | |||
|
14 | 24 | from IPython.zmq.parallel.client import Client |
|
15 | 25 | from IPython.zmq.parallel.ipcluster import launch_process |
|
16 | 26 | from IPython.zmq.parallel.entry_point import select_random_ports |
|
17 | from IPython.zmq.parallel.tests import processes,add_engine | |
|
27 | from IPython.zmq.parallel.tests import processes,add_engines | |
|
18 | 28 | |
|
19 | 29 | # simple tasks for use in apply tests |
|
20 | 30 | |
@@ -47,13 +57,11 b' def skip_without(*names):' | |||
|
47 | 57 | return f(*args, **kwargs) |
|
48 | 58 | return skip_without_names |
|
49 | 59 | |
|
50 | ||
|
51 | 60 | class ClusterTestCase(BaseZMQTestCase): |
|
52 | 61 | |
|
53 | 62 | def add_engines(self, n=1, block=True): |
|
54 | 63 | """add multiple engines to our cluster""" |
|
55 | for i in range(n): | |
|
56 | self.engines.append(add_engine()) | |
|
64 | self.engines.extend(add_engines(n)) | |
|
57 | 65 | if block: |
|
58 | 66 | self.wait_on_engines() |
|
59 | 67 | |
@@ -68,10 +76,11 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
68 | 76 | |
|
69 | 77 | def connect_client(self): |
|
70 | 78 | """connect a client with my Context, and track its sockets for cleanup""" |
|
71 | c = Client(profile='iptest',context=self.context) | |
|
72 | ||
|
73 | # for name in filter(lambda n:n.endswith('socket'), dir(c)): | |
|
74 | # self.sockets.append(getattr(c, name)) | |
|
79 | c = Client(profile='iptest', context=self.context) | |
|
80 | for name in filter(lambda n:n.endswith('socket'), dir(c)): | |
|
81 | s = getattr(c, name) | |
|
82 | s.setsockopt(zmq.LINGER, 0) | |
|
83 | self.sockets.append(s) | |
|
75 | 84 | return c |
|
76 | 85 | |
|
77 | 86 | def assertRaisesRemote(self, etype, f, *args, **kwargs): |
@@ -92,15 +101,19 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
92 | 101 | self.engines=[] |
|
93 | 102 | |
|
94 | 103 | def tearDown(self): |
|
95 | ||
|
104 | # self.client.clear(block=True) | |
|
96 | 105 | # close fds: |
|
97 | 106 | for e in filter(lambda e: e.poll() is not None, processes): |
|
98 | 107 | processes.remove(e) |
|
99 | 108 | |
|
109 | # allow flushing of incoming messages to prevent crash on socket close | |
|
110 | self.client.wait(timeout=2) | |
|
111 | # time.sleep(2) | |
|
112 | self.client.spin() | |
|
100 | 113 | self.client.close() |
|
101 | 114 | BaseZMQTestCase.tearDown(self) |
|
102 |
# this will be |
|
|
103 | self.context.term() | |
|
115 | # this will be redundant when pyzmq merges PR #88 | |
|
116 | # self.context.term() | |
|
104 | 117 | # print tempfile.TemporaryFile().fileno(), |
|
105 | 118 | # sys.stdout.flush() |
|
106 | 119 | No newline at end of file |
@@ -1,3 +1,16 b'' | |||
|
1 | """Tests for parallel client.py""" | |
|
2 | ||
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
10 | #------------------------------------------------------------------------------- | |
|
11 | # Imports | |
|
12 | #------------------------------------------------------------------------------- | |
|
13 | ||
|
1 | 14 | import time |
|
2 | 15 | from tempfile import mktemp |
|
3 | 16 | |
@@ -8,7 +21,10 b' from IPython.zmq.parallel import error' | |||
|
8 | 21 | from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult |
|
9 | 22 | from IPython.zmq.parallel.view import LoadBalancedView, DirectView |
|
10 | 23 | |
|
11 | from clienttest import ClusterTestCase, segfault, wait | |
|
24 | from clienttest import ClusterTestCase, segfault, wait, add_engines | |
|
25 | ||
|
26 | def setup(): | |
|
27 | add_engines(4) | |
|
12 | 28 | |
|
13 | 29 | class TestClient(ClusterTestCase): |
|
14 | 30 | |
@@ -17,27 +33,6 b' class TestClient(ClusterTestCase):' | |||
|
17 | 33 | self.add_engines(3) |
|
18 | 34 | self.assertEquals(len(self.client.ids), n+3) |
|
19 | 35 | |
|
20 | def test_segfault_task(self): | |
|
21 | """test graceful handling of engine death (balanced)""" | |
|
22 | self.add_engines(1) | |
|
23 | ar = self.client.apply(segfault, block=False) | |
|
24 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
25 | eid = ar.engine_id | |
|
26 | while eid in self.client.ids: | |
|
27 | time.sleep(.01) | |
|
28 | self.client.spin() | |
|
29 | ||
|
30 | def test_segfault_mux(self): | |
|
31 | """test graceful handling of engine death (direct)""" | |
|
32 | self.add_engines(1) | |
|
33 | eid = self.client.ids[-1] | |
|
34 | ar = self.client[eid].apply_async(segfault) | |
|
35 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
36 | eid = ar.engine_id | |
|
37 | while eid in self.client.ids: | |
|
38 | time.sleep(.01) | |
|
39 | self.client.spin() | |
|
40 | ||
|
41 | 36 | def test_view_indexing(self): |
|
42 | 37 | """test index access for views""" |
|
43 | 38 | self.add_engines(2) |
@@ -71,8 +66,8 b' class TestClient(ClusterTestCase):' | |||
|
71 | 66 | v = self.client[:2] |
|
72 | 67 | v2 =self.client[:2] |
|
73 | 68 | self.assertTrue(v is v2) |
|
74 | v = self.client.view() | |
|
75 |
v2 = self.client.view( |
|
|
69 | v = self.client.load_balanced_view() | |
|
70 | v2 = self.client.load_balanced_view(targets=None) | |
|
76 | 71 | self.assertTrue(v is v2) |
|
77 | 72 | |
|
78 | 73 | def test_targets(self): |
@@ -84,102 +79,26 b' class TestClient(ClusterTestCase):' | |||
|
84 | 79 | |
|
85 | 80 | def test_clear(self): |
|
86 | 81 | """test clear behavior""" |
|
87 | self.add_engines(2) | |
|
88 |
self.client |
|
|
89 | self.client.push(dict(a=5)) | |
|
90 | self.client.pull('a') | |
|
82 | # self.add_engines(2) | |
|
83 | v = self.client[:] | |
|
84 | v.block=True | |
|
85 | v.push(dict(a=5)) | |
|
86 | v.pull('a') | |
|
91 | 87 | id0 = self.client.ids[-1] |
|
92 | 88 | self.client.clear(targets=id0) |
|
93 |
self.client.pull('a' |
|
|
94 |
self.assertRaisesRemote(NameError, self.client. |
|
|
95 | self.client.clear() | |
|
89 | self.client[:-1].pull('a') | |
|
90 | self.assertRaisesRemote(NameError, self.client[id0].get, 'a') | |
|
91 | self.client.clear(block=True) | |
|
96 | 92 | for i in self.client.ids: |
|
97 | self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i) | |
|
98 | ||
|
99 | ||
|
100 | def test_push_pull(self): | |
|
101 | """test pushing and pulling""" | |
|
102 | data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) | |
|
103 | t = self.client.ids[-1] | |
|
104 | self.add_engines(2) | |
|
105 | push = self.client.push | |
|
106 | pull = self.client.pull | |
|
107 | self.client.block=True | |
|
108 | nengines = len(self.client) | |
|
109 | push({'data':data}, targets=t) | |
|
110 | d = pull('data', targets=t) | |
|
111 | self.assertEquals(d, data) | |
|
112 | push({'data':data}) | |
|
113 | d = pull('data') | |
|
114 | self.assertEquals(d, nengines*[data]) | |
|
115 | ar = push({'data':data}, block=False) | |
|
116 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
117 | r = ar.get() | |
|
118 | ar = pull('data', block=False) | |
|
119 | self.assertTrue(isinstance(ar, AsyncResult)) | |
|
120 | r = ar.get() | |
|
121 | self.assertEquals(r, nengines*[data]) | |
|
122 | push(dict(a=10,b=20)) | |
|
123 | r = pull(('a','b')) | |
|
124 | self.assertEquals(r, nengines*[[10,20]]) | |
|
125 | ||
|
126 | def test_push_pull_function(self): | |
|
127 | "test pushing and pulling functions" | |
|
128 | def testf(x): | |
|
129 | return 2.0*x | |
|
130 | ||
|
131 | self.add_engines(4) | |
|
132 | t = self.client.ids[-1] | |
|
133 | self.client.block=True | |
|
134 | push = self.client.push | |
|
135 | pull = self.client.pull | |
|
136 | execute = self.client.execute | |
|
137 | push({'testf':testf}, targets=t) | |
|
138 | r = pull('testf', targets=t) | |
|
139 | self.assertEqual(r(1.0), testf(1.0)) | |
|
140 | execute('r = testf(10)', targets=t) | |
|
141 | r = pull('r', targets=t) | |
|
142 | self.assertEquals(r, testf(10)) | |
|
143 | ar = push({'testf':testf}, block=False) | |
|
144 | ar.get() | |
|
145 | ar = pull('testf', block=False) | |
|
146 | rlist = ar.get() | |
|
147 | for r in rlist: | |
|
148 | self.assertEqual(r(1.0), testf(1.0)) | |
|
149 | execute("def g(x): return x*x", targets=t) | |
|
150 | r = pull(('testf','g'),targets=t) | |
|
151 | self.assertEquals((r[0](10),r[1](10)), (testf(10), 100)) | |
|
152 | ||
|
153 | def test_push_function_globals(self): | |
|
154 | """test that pushed functions have access to globals""" | |
|
155 | def geta(): | |
|
156 | return a | |
|
157 | self.add_engines(1) | |
|
158 | v = self.client[-1] | |
|
159 | v.block=True | |
|
160 | v['f'] = geta | |
|
161 | self.assertRaisesRemote(NameError, v.execute, 'b=f()') | |
|
162 | v.execute('a=5') | |
|
163 | v.execute('b=f()') | |
|
164 | self.assertEquals(v['b'], 5) | |
|
93 | # print i | |
|
94 | self.assertRaisesRemote(NameError, self.client[i].get, 'a') | |
|
165 | 95 | |
|
166 | def test_push_function_defaults(self): | |
|
167 | """test that pushed functions preserve default args""" | |
|
168 | def echo(a=10): | |
|
169 | return a | |
|
170 | self.add_engines(1) | |
|
171 | v = self.client[-1] | |
|
172 | v.block=True | |
|
173 | v['f'] = echo | |
|
174 | v.execute('b=f()') | |
|
175 | self.assertEquals(v['b'], 10) | |
|
176 | ||
|
177 | 96 | def test_get_result(self): |
|
178 | 97 | """test getting results from the Hub.""" |
|
179 | 98 | c = clientmod.Client(profile='iptest') |
|
180 | self.add_engines(1) | |
|
99 | # self.add_engines(1) | |
|
181 | 100 | t = c.ids[-1] |
|
182 |
ar = c.apply(wait, |
|
|
101 | ar = c[t].apply_async(wait, 1) | |
|
183 | 102 | # give the monitor time to notice the message |
|
184 | 103 | time.sleep(.25) |
|
185 | 104 | ahr = self.client.get_result(ar.msg_ids) |
@@ -187,76 +106,42 b' class TestClient(ClusterTestCase):' | |||
|
187 | 106 | self.assertEquals(ahr.get(), ar.get()) |
|
188 | 107 | ar2 = self.client.get_result(ar.msg_ids) |
|
189 | 108 | self.assertFalse(isinstance(ar2, AsyncHubResult)) |
|
109 | c.close() | |
|
190 | 110 | |
|
191 | 111 | def test_ids_list(self): |
|
192 | 112 | """test client.ids""" |
|
193 | self.add_engines(2) | |
|
113 | # self.add_engines(2) | |
|
194 | 114 | ids = self.client.ids |
|
195 | 115 | self.assertEquals(ids, self.client._ids) |
|
196 | 116 | self.assertFalse(ids is self.client._ids) |
|
197 | 117 | ids.remove(ids[-1]) |
|
198 | 118 | self.assertNotEquals(ids, self.client._ids) |
|
199 | 119 | |
|
200 |
def test_ |
|
|
201 | """test that run appends newline to files""" | |
|
202 | tmpfile = mktemp() | |
|
203 | with open(tmpfile, 'w') as f: | |
|
204 | f.write("""def g(): | |
|
205 | return 5 | |
|
206 | """) | |
|
207 |
|
|
|
208 | v.run(tmpfile, block=True) | |
|
209 | self.assertEquals(v.apply_sync(lambda : g()), 5) | |
|
120 | def test_queue_status(self): | |
|
121 | # self.addEngine(4) | |
|
122 | ids = self.client.ids | |
|
123 | id0 = ids[0] | |
|
124 | qs = self.client.queue_status(targets=id0) | |
|
125 | self.assertTrue(isinstance(qs, dict)) | |
|
126 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) | |
|
127 | allqs = self.client.queue_status() | |
|
128 | self.assertTrue(isinstance(allqs, dict)) | |
|
129 | self.assertEquals(sorted(allqs.keys()), self.client.ids) | |
|
130 | for eid,qs in allqs.items(): | |
|
131 | self.assertTrue(isinstance(qs, dict)) | |
|
132 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) | |
|
210 | 133 | |
|
211 |
def test_ |
|
|
212 | """test tracking for apply""" | |
|
213 |
|
|
|
214 |
|
|
|
215 |
self.client.block= |
|
|
216 | def echo(n=1024*1024, **kwargs): | |
|
217 | return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs) | |
|
218 | ar = echo(1) | |
|
219 | self.assertTrue(ar._tracker is None) | |
|
220 | self.assertTrue(ar.sent) | |
|
221 | ar = echo(track=True) | |
|
222 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
223 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
224 | ar._tracker.wait() | |
|
225 | self.assertTrue(ar.sent) | |
|
226 | ||
|
227 | def test_push_tracked(self): | |
|
228 | t = self.client.ids[-1] | |
|
229 | ns = dict(x='x'*1024*1024) | |
|
230 | ar = self.client.push(ns, targets=t, block=False) | |
|
231 | self.assertTrue(ar._tracker is None) | |
|
232 | self.assertTrue(ar.sent) | |
|
233 | ||
|
234 | ar = self.client.push(ns, targets=t, block=False, track=True) | |
|
235 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
236 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
237 | ar._tracker.wait() | |
|
238 | self.assertTrue(ar.sent) | |
|
239 | ar.get() | |
|
134 | def test_shutdown(self): | |
|
135 | # self.addEngine(4) | |
|
136 | ids = self.client.ids | |
|
137 | id0 = ids[0] | |
|
138 | self.client.shutdown(id0, block=True) | |
|
139 | while id0 in self.client.ids: | |
|
140 | time.sleep(0.1) | |
|
141 | self.client.spin() | |
|
240 | 142 | |
|
241 | def test_scatter_tracked(self): | |
|
242 | t = self.client.ids | |
|
243 | x='x'*1024*1024 | |
|
244 | ar = self.client.scatter('x', x, targets=t, block=False) | |
|
245 | self.assertTrue(ar._tracker is None) | |
|
246 | self.assertTrue(ar.sent) | |
|
143 | self.assertRaises(IndexError, lambda : self.client[id0]) | |
|
247 | 144 | |
|
248 | ar = self.client.scatter('x', x, targets=t, block=False, track=True) | |
|
249 | self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | |
|
250 | self.assertEquals(ar.sent, ar._tracker.done) | |
|
251 | ar._tracker.wait() | |
|
252 | self.assertTrue(ar.sent) | |
|
253 | ar.get() | |
|
254 | ||
|
255 | def test_remote_reference(self): | |
|
256 | v = self.client[-1] | |
|
257 | v['a'] = 123 | |
|
258 | ra = clientmod.Reference('a') | |
|
259 | b = v.apply_sync(lambda x: x, ra) | |
|
260 | self.assertEquals(b, 123) | |
|
261 | ||
|
262 | ||
|
145 | def test_result_status(self): | |
|
146 | pass | |
|
147 | # to be written |
@@ -1,5 +1,16 b'' | |||
|
1 | 1 | """test serialization with newserialized""" |
|
2 | 2 | |
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
10 | #------------------------------------------------------------------------------- | |
|
11 | # Imports | |
|
12 | #------------------------------------------------------------------------------- | |
|
13 | ||
|
3 | 14 | from unittest import TestCase |
|
4 | 15 | |
|
5 | 16 | from IPython.testing.parametric import parametric |
@@ -82,6 +93,16 b' class CanningTestCase(TestCase):' | |||
|
82 | 93 | # test non-copying: |
|
83 | 94 | a[2] = 1e9 |
|
84 | 95 | self.assertTrue((a==final).all()) |
|
85 |
|
|
|
96 | ||
|
97 | def test_uncan_function_globals(self): | |
|
98 | """test that uncanning a module function restores it into its module""" | |
|
99 | from re import search | |
|
100 | cf = can(search) | |
|
101 | csearch = uncan(cf) | |
|
102 | self.assertEqual(csearch.__module__, search.__module__) | |
|
103 | self.assertNotEqual(csearch('asd', 'asdf'), None) | |
|
104 | csearch = uncan(cf, dict(a=5)) | |
|
105 | self.assertEqual(csearch.__module__, search.__module__) | |
|
106 | self.assertNotEqual(csearch('asd', 'asdf'), None) | |
|
86 | 107 | |
|
87 | 108 | No newline at end of file |
@@ -1,3 +1,15 b'' | |||
|
1 | """test building messages with streamsession""" | |
|
2 | ||
|
3 | #------------------------------------------------------------------------------- | |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
|
5 | # | |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
|
7 | # the file COPYING, distributed as part of this software. | |
|
8 | #------------------------------------------------------------------------------- | |
|
9 | ||
|
10 | #------------------------------------------------------------------------------- | |
|
11 | # Imports | |
|
12 | #------------------------------------------------------------------------------- | |
|
1 | 13 | |
|
2 | 14 | import os |
|
3 | 15 | import uuid |
@@ -316,3 +316,39 b' def unpack_apply_message(bufs, g=None, copy=True):' | |||
|
316 | 316 | |
|
317 | 317 | return f,args,kwargs |
|
318 | 318 | |
|
319 | #-------------------------------------------------------------------------- | |
|
320 | # helpers for implementing old MEC API via view.apply | |
|
321 | #-------------------------------------------------------------------------- | |
|
322 | ||
|
323 | def interactive(f): | |
|
324 | """decorator for making functions appear as interactively defined. | |
|
325 | This results in the function being linked to the user_ns as globals() | |
|
326 | instead of the module globals(). | |
|
327 | """ | |
|
328 | f.__module__ = '__main__' | |
|
329 | return f | |
|
330 | ||
|
331 | @interactive | |
|
332 | def _push(ns): | |
|
333 | """helper method for implementing `client.push` via `client.apply`""" | |
|
334 | globals().update(ns) | |
|
335 | ||
|
336 | @interactive | |
|
337 | def _pull(keys): | |
|
338 | """helper method for implementing `client.pull` via `client.apply`""" | |
|
339 | user_ns = globals() | |
|
340 | if isinstance(keys, (list,tuple, set)): | |
|
341 | for key in keys: | |
|
342 | if not user_ns.has_key(key): | |
|
343 | raise NameError("name '%s' is not defined"%key) | |
|
344 | return map(user_ns.get, keys) | |
|
345 | else: | |
|
346 | if not user_ns.has_key(keys): | |
|
347 | raise NameError("name '%s' is not defined"%keys) | |
|
348 | return user_ns.get(keys) | |
|
349 | ||
|
350 | @interactive | |
|
351 | def _execute(code): | |
|
352 | """helper method for implementing `client.execute` via `client.apply`""" | |
|
353 | exec code in globals() | |
|
354 |
This diff has been collapsed as it changes many lines, (610 lines changed) Show them Hide them | |||
@@ -10,13 +10,20 b'' | |||
|
10 | 10 | # Imports |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | import warnings | |
|
14 | from contextlib import contextmanager | |
|
15 | ||
|
16 | import zmq | |
|
17 | ||
|
13 | 18 | from IPython.testing import decorators as testdec |
|
14 | 19 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance |
|
15 | 20 | |
|
16 | 21 | from IPython.external.decorator import decorator |
|
17 | 22 | |
|
18 | from .asyncresult import AsyncResult | |
|
19 | from .dependency import Dependency | |
|
23 | from . import map as Map | |
|
24 | from . import util | |
|
25 | from .asyncresult import AsyncResult, AsyncMapResult | |
|
26 | from .dependency import Dependency, dependent | |
|
20 | 27 | from .remotefunction import ParallelFunction, parallel, remote |
|
21 | 28 | |
|
22 | 29 | #----------------------------------------------------------------------------- |
@@ -24,25 +31,16 b' from .remotefunction import ParallelFunction, parallel, remote' | |||
|
24 | 31 | #----------------------------------------------------------------------------- |
|
25 | 32 | |
|
26 | 33 | @decorator |
|
27 | def myblock(f, self, *args, **kwargs): | |
|
28 | """override client.block with self.block during a call""" | |
|
29 | block = self.client.block | |
|
30 | self.client.block = self.block | |
|
31 | try: | |
|
32 | ret = f(self, *args, **kwargs) | |
|
33 | finally: | |
|
34 | self.client.block = block | |
|
35 | return ret | |
|
36 | ||
|
37 | @decorator | |
|
38 | 34 | def save_ids(f, self, *args, **kwargs): |
|
39 | 35 | """Keep our history and outstanding attributes up to date after a method call.""" |
|
40 | 36 | n_previous = len(self.client.history) |
|
41 | ret = f(self, *args, **kwargs) | |
|
42 | nmsgs = len(self.client.history) - n_previous | |
|
43 | msg_ids = self.client.history[-nmsgs:] | |
|
44 | self.history.extend(msg_ids) | |
|
45 | map(self.outstanding.add, msg_ids) | |
|
37 | try: | |
|
38 | ret = f(self, *args, **kwargs) | |
|
39 | finally: | |
|
40 | nmsgs = len(self.client.history) - n_previous | |
|
41 | msg_ids = self.client.history[-nmsgs:] | |
|
42 | self.history.extend(msg_ids) | |
|
43 | map(self.outstanding.add, msg_ids) | |
|
46 | 44 | return ret |
|
47 | 45 | |
|
48 | 46 | @decorator |
@@ -71,27 +69,54 b' class View(HasTraits):' | |||
|
71 | 69 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. |
|
72 | 70 | |
|
73 | 71 | Don't use this class, use subclasses. |
|
72 | ||
|
73 | Methods | |
|
74 | ------- | |
|
75 | ||
|
76 | spin | |
|
77 | flushes incoming results and registration state changes | |
|
78 | control methods spin, and requesting `ids` also ensures up to date | |
|
79 | ||
|
80 | wait | |
|
81 | wait on one or more msg_ids | |
|
82 | ||
|
83 | execution methods | |
|
84 | apply | |
|
85 | legacy: execute, run | |
|
86 | ||
|
87 | data movement | |
|
88 | push, pull, scatter, gather | |
|
89 | ||
|
90 | query methods | |
|
91 | get_result, queue_status, purge_results, result_status | |
|
92 | ||
|
93 | control methods | |
|
94 | abort, shutdown | |
|
95 | ||
|
74 | 96 | """ |
|
75 | 97 | block=Bool(False) |
|
76 | bound=Bool(False) | |
|
77 | track=Bool(False) | |
|
98 | track=Bool(True) | |
|
78 | 99 | history=List() |
|
79 | 100 | outstanding = Set() |
|
80 | 101 | results = Dict() |
|
81 | 102 | client = Instance('IPython.zmq.parallel.client.Client') |
|
82 | 103 | |
|
104 | _socket = Instance('zmq.Socket') | |
|
83 | 105 | _ntargets = Int(1) |
|
84 | _balanced = Bool(False) | |
|
85 | _default_names = List(['block', 'bound', 'track']) | |
|
106 | _flag_names = List(['block', 'track']) | |
|
86 | 107 | _targets = Any() |
|
108 | _idents = Any() | |
|
87 | 109 | |
|
88 | def __init__(self, client=None, targets=None): | |
|
89 | super(View, self).__init__(client=client) | |
|
90 | self._targets = targets | |
|
110 | def __init__(self, client=None, socket=None, targets=None): | |
|
111 | super(View, self).__init__(client=client, _socket=socket) | |
|
91 | 112 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) |
|
92 | 113 | self.block = client.block |
|
93 | 114 | |
|
94 | for name in self._default_names: | |
|
115 | self._idents, self._targets = self.client._build_targets(targets) | |
|
116 | if targets is None or isinstance(targets, int): | |
|
117 | self._targets = targets | |
|
118 | for name in self._flag_names: | |
|
119 | # set flags, if they haven't been set yet | |
|
95 | 120 | setattr(self, name, getattr(self, name, None)) |
|
96 | 121 | |
|
97 | 122 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" |
@@ -111,134 +136,127 b' class View(HasTraits):' | |||
|
111 | 136 | def targets(self, value): |
|
112 | 137 | raise AttributeError("Cannot set View `targets` after construction!") |
|
113 | 138 | |
|
114 | @property | |
|
115 | def balanced(self): | |
|
116 | return self._balanced | |
|
117 | ||
|
118 | @balanced.setter | |
|
119 | def balanced(self, value): | |
|
120 | raise AttributeError("Cannot set View `balanced` after construction!") | |
|
121 | ||
|
122 | def _defaults(self, *excludes): | |
|
123 | """return dict of our default attributes, excluding names given.""" | |
|
124 | d = dict(balanced=self._balanced, targets=self._targets) | |
|
125 | for name in self._default_names: | |
|
126 | if name not in excludes: | |
|
127 | d[name] = getattr(self, name) | |
|
128 | return d | |
|
129 | ||
|
130 | 139 | def set_flags(self, **kwargs): |
|
131 | 140 | """set my attribute flags by keyword. |
|
132 | 141 | |
|
133 | A View is a wrapper for the Client's apply method, but | |
|
134 | with attributes that specify keyword arguments, those attributes | |
|
135 | can be set by keyword argument with this method. | |
|
142 | Views determine behavior with a few attributes (`block`, `track`, etc.). | |
|
143 | These attributes can be set all at once by name with this method. | |
|
136 | 144 | |
|
137 | 145 | Parameters |
|
138 | 146 | ---------- |
|
139 | 147 | |
|
140 | 148 | block : bool |
|
141 | 149 | whether to wait for results |
|
142 | bound : bool | |
|
143 | whether to pass the client's Namespace as the first argument | |
|
144 | to functions called via `apply`. | |
|
145 | 150 | track : bool |
|
146 | 151 | whether to create a MessageTracker to allow the user to |
|
147 | 152 | safely edit after arrays and buffers during non-copying |
|
148 | 153 | sends. |
|
149 | 154 | """ |
|
150 |
for |
|
|
151 |
if |
|
|
152 |
raise KeyError("Invalid name: %r"% |
|
|
153 | for name in ('block', 'bound'): | |
|
154 | if name in kwargs: | |
|
155 | setattr(self, name, kwargs[name]) | |
|
155 | for name, value in kwargs.iteritems(): | |
|
156 | if name not in self._flag_names: | |
|
157 | raise KeyError("Invalid name: %r"%name) | |
|
158 | else: | |
|
159 | setattr(self, name, value) | |
|
156 | 160 | |
|
161 | @contextmanager | |
|
162 | def temp_flags(self, **kwargs): | |
|
163 | """temporarily set flags, for use in `with` statements. | |
|
164 | ||
|
165 | See set_flags for permanent setting of flags | |
|
166 | ||
|
167 | Examples | |
|
168 | -------- | |
|
169 | ||
|
170 | >>> view.track=False | |
|
171 | ... | |
|
172 | >>> with view.temp_flags(track=True): | |
|
173 | ... ar = view.apply(dostuff, my_big_array) | |
|
174 | ... ar.tracker.wait() # wait for send to finish | |
|
175 | >>> view.track | |
|
176 | False | |
|
177 | ||
|
178 | """ | |
|
179 | # preflight: save flags, and set temporaries | |
|
180 | saved_flags = {} | |
|
181 | for f in self._flag_names: | |
|
182 | saved_flags[f] = getattr(self, f) | |
|
183 | self.set_flags(**kwargs) | |
|
184 | # yield to the with-statement block | |
|
185 | yield | |
|
186 | # postflight: restore saved flags | |
|
187 | self.set_flags(**saved_flags) | |
|
188 | ||
|
189 | ||
|
157 | 190 | #---------------------------------------------------------------- |
|
158 | # wrappers for client methods: | |
|
191 | # apply | |
|
159 | 192 | #---------------------------------------------------------------- |
|
160 | @sync_results | |
|
161 | def spin(self): | |
|
162 | """spin the client, and sync""" | |
|
163 | self.client.spin() | |
|
164 | 193 | |
|
165 | 194 | @sync_results |
|
166 | 195 | @save_ids |
|
196 | def _really_apply(self, f, args, kwargs, block=None, **options): | |
|
197 | """wrapper for client.send_apply_message""" | |
|
198 | raise NotImplementedError("Implement in subclasses") | |
|
199 | ||
|
167 | 200 | def apply(self, f, *args, **kwargs): |
|
168 | 201 | """calls f(*args, **kwargs) on remote engines, returning the result. |
|
169 | 202 | |
|
170 |
This method sets all |
|
|
171 | View's attributes. | |
|
203 | This method sets all apply flags via this View's attributes. | |
|
172 | 204 | |
|
173 | 205 | if self.block is False: |
|
174 | 206 | returns AsyncResult |
|
175 | 207 | else: |
|
176 | 208 | returns actual result of f(*args, **kwargs) |
|
177 | 209 | """ |
|
178 |
return self. |
|
|
210 | return self._really_apply(f, args, kwargs) | |
|
179 | 211 | |
|
180 | @save_ids | |
|
181 | 212 | def apply_async(self, f, *args, **kwargs): |
|
182 | 213 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. |
|
183 | 214 | |
|
184 | 215 | returns AsyncResult |
|
185 | 216 | """ |
|
186 | d = self._defaults('block', 'bound') | |
|
187 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | |
|
217 | return self._really_apply(f, args, kwargs, block=False) | |
|
188 | 218 | |
|
189 | 219 | @spin_after |
|
190 | @save_ids | |
|
191 | 220 | def apply_sync(self, f, *args, **kwargs): |
|
192 | 221 | """calls f(*args, **kwargs) on remote engines in a blocking manner, |
|
193 | 222 | returning the result. |
|
194 | 223 | |
|
195 | 224 | returns: actual result of f(*args, **kwargs) |
|
196 | 225 | """ |
|
197 | d = self._defaults('block', 'bound', 'track') | |
|
198 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | |
|
226 | return self._really_apply(f, args, kwargs, block=True) | |
|
199 | 227 | |
|
200 | # @sync_results | |
|
201 | # @save_ids | |
|
202 | # def apply_bound(self, f, *args, **kwargs): | |
|
203 | # """calls f(*args, **kwargs) bound to engine namespace(s). | |
|
204 | # | |
|
205 | # if self.block is False: | |
|
206 | # returns msg_id | |
|
207 | # else: | |
|
208 | # returns actual result of f(*args, **kwargs) | |
|
209 | # | |
|
210 | # This method has access to the targets' namespace via globals() | |
|
211 | # | |
|
212 | # """ | |
|
213 | # d = self._defaults('bound') | |
|
214 | # return self.client.apply(f, args, kwargs, bound=True, **d) | |
|
215 | # | |
|
228 | #---------------------------------------------------------------- | |
|
229 | # wrappers for client and control methods | |
|
230 | #---------------------------------------------------------------- | |
|
216 | 231 | @sync_results |
|
217 | @save_ids | |
|
218 | def apply_async_bound(self, f, *args, **kwargs): | |
|
219 | """calls f(*args, **kwargs) bound to engine namespace(s) | |
|
220 | in a nonblocking manner. | |
|
221 | ||
|
222 | The first argument to `f` will be the Engine's Namespace | |
|
223 | ||
|
224 | returns: AsyncResult | |
|
232 | def spin(self): | |
|
233 | """spin the client, and sync""" | |
|
234 | self.client.spin() | |
|
235 | ||
|
236 | @sync_results | |
|
237 | def wait(self, jobs=None, timeout=-1): | |
|
238 | """waits on one or more `jobs`, for up to `timeout` seconds. | |
|
225 | 239 | |
|
226 | """ | |
|
227 | d = self._defaults('block', 'bound') | |
|
228 | return self.client.apply(f, args, kwargs, block=False, bound=True, **d) | |
|
229 | ||
|
230 | @spin_after | |
|
231 | @save_ids | |
|
232 | def apply_sync_bound(self, f, *args, **kwargs): | |
|
233 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |
|
240 | Parameters | |
|
241 | ---------- | |
|
234 | 242 | |
|
235 | The first argument to `f` will be the Engine's Namespace | |
|
243 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | |
|
244 | ints are indices to self.history | |
|
245 | strs are msg_ids | |
|
246 | default: wait on all outstanding messages | |
|
247 | timeout : float | |
|
248 | a time in seconds, after which to give up. | |
|
249 | default is -1, which means no timeout | |
|
236 | 250 | |
|
237 | returns: actual result of f(*args, **kwargs) | |
|
251 | Returns | |
|
252 | ------- | |
|
238 | 253 | |
|
254 | True : when all msg_ids are done | |
|
255 | False : timeout reached, some msg_ids still outstanding | |
|
239 | 256 | """ |
|
240 | d = self._defaults('block', 'bound') | |
|
241 | return self.client.apply(f, args, kwargs, block=True, bound=True, **d) | |
|
257 | if jobs is None: | |
|
258 | jobs = self.history | |
|
259 | return self.client.wait(jobs, timeout) | |
|
242 | 260 | |
|
243 | 261 | def abort(self, jobs=None, block=None): |
|
244 | 262 | """Abort jobs on my engines. |
@@ -318,6 +336,7 b' class View(HasTraits):' | |||
|
318 | 336 | """Parallel version of `itertools.imap`. |
|
319 | 337 | |
|
320 | 338 | See `self.map` for details. |
|
339 | ||
|
321 | 340 | """ |
|
322 | 341 | |
|
323 | 342 | return iter(self.map_async(f,*sequences, **kwargs)) |
@@ -326,14 +345,15 b' class View(HasTraits):' | |||
|
326 | 345 | # Decorators |
|
327 | 346 | #------------------------------------------------------------------- |
|
328 | 347 | |
|
329 |
def remote(self, |
|
|
348 | def remote(self, block=True, **flags): | |
|
330 | 349 | """Decorator for making a RemoteFunction""" |
|
331 | return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
|
350 | block = self.block if block is None else block | |
|
351 | return remote(self, block=block, **flags) | |
|
332 | 352 | |
|
333 |
def parallel(self, dist='b', |
|
|
353 | def parallel(self, dist='b', block=None, **flags): | |
|
334 | 354 | """Decorator for making a ParallelFunction""" |
|
335 | 355 | block = self.block if block is None else block |
|
336 |
return parallel(self |
|
|
356 | return parallel(self, dist=dist, block=block, **flags) | |
|
337 | 357 | |
|
338 | 358 | @testdec.skip_doctest |
|
339 | 359 | class DirectView(View): |
@@ -355,14 +375,65 b' class DirectView(View):' | |||
|
355 | 375 | |
|
356 | 376 | """ |
|
357 | 377 | |
|
358 | def __init__(self, client=None, targets=None): | |
|
359 | super(DirectView, self).__init__(client=client, targets=targets) | |
|
360 | self._balanced = False | |
|
378 | def __init__(self, client=None, socket=None, targets=None): | |
|
379 | super(DirectView, self).__init__(client=client, socket=socket, targets=targets) | |
|
380 | ||
|
361 | 381 | |
|
362 | @spin_after | |
|
382 | @sync_results | |
|
363 | 383 | @save_ids |
|
384 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None): | |
|
385 | """calls f(*args, **kwargs) on remote engines, returning the result. | |
|
386 | ||
|
387 | This method sets all of `apply`'s flags via this View's attributes. | |
|
388 | ||
|
389 | Parameters | |
|
390 | ---------- | |
|
391 | ||
|
392 | f : callable | |
|
393 | ||
|
394 | args : list [default: empty] | |
|
395 | ||
|
396 | kwargs : dict [default: empty] | |
|
397 | ||
|
398 | block : bool [default: self.block] | |
|
399 | whether to block | |
|
400 | track : bool [default: self.track] | |
|
401 | whether to ask zmq to track the message, for safe non-copying sends | |
|
402 | ||
|
403 | Returns | |
|
404 | ------- | |
|
405 | ||
|
406 | if self.block is False: | |
|
407 | returns AsyncResult | |
|
408 | else: | |
|
409 | returns actual result of f(*args, **kwargs) on the engine(s) | |
|
410 | This will be a list of self.targets is also a list (even length 1), or | |
|
411 | the single result if self.targets is an integer engine id | |
|
412 | """ | |
|
413 | args = [] if args is None else args | |
|
414 | kwargs = {} if kwargs is None else kwargs | |
|
415 | block = self.block if block is None else block | |
|
416 | track = self.track if track is None else track | |
|
417 | msg_ids = [] | |
|
418 | trackers = [] | |
|
419 | for ident in self._idents: | |
|
420 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | |
|
421 | ident=ident) | |
|
422 | if track: | |
|
423 | trackers.append(msg['tracker']) | |
|
424 | msg_ids.append(msg['msg_id']) | |
|
425 | tracker = None if track is False else zmq.MessageTracker(*trackers) | |
|
426 | ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=self._targets, tracker=tracker) | |
|
427 | if block: | |
|
428 | try: | |
|
429 | return ar.get() | |
|
430 | except KeyboardInterrupt: | |
|
431 | pass | |
|
432 | return ar | |
|
433 | ||
|
434 | @spin_after | |
|
364 | 435 | def map(self, f, *sequences, **kwargs): |
|
365 |
"""view.map(f, *sequences, block=self.block |
|
|
436 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult | |
|
366 | 437 | |
|
367 | 438 | Parallel version of builtin `map`, using this View's `targets`. |
|
368 | 439 | |
@@ -380,8 +451,6 b' class DirectView(View):' | |||
|
380 | 451 | the sequences to be distributed and passed to `f` |
|
381 | 452 | block : bool |
|
382 | 453 | whether to wait for the result or not [default self.block] |
|
383 | bound : bool | |
|
384 | whether to pass the client's Namespace as the first argument to `f` | |
|
385 | 454 | |
|
386 | 455 | Returns |
|
387 | 456 | ------- |
@@ -396,70 +465,140 b' class DirectView(View):' | |||
|
396 | 465 | the result of map(f,*sequences) |
|
397 | 466 | """ |
|
398 | 467 | |
|
399 |
block = kwargs. |
|
|
400 | bound = kwargs.get('bound', self.bound) | |
|
468 | block = kwargs.pop('block', self.block) | |
|
401 | 469 | for k in kwargs.keys(): |
|
402 |
if k not in ['block', ' |
|
|
470 | if k not in ['block', 'track']: | |
|
403 | 471 | raise TypeError("invalid keyword arg, %r"%k) |
|
404 | 472 | |
|
405 | 473 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
406 |
pf = ParallelFunction(self |
|
|
407 | targets=self._targets, balanced=False) | |
|
474 | pf = ParallelFunction(self, f, block=block, **kwargs) | |
|
408 | 475 | return pf.map(*sequences) |
|
409 | 476 | |
|
410 | @sync_results | |
|
411 | @save_ids | |
|
412 | 477 | def execute(self, code, block=None): |
|
413 | """execute some code on my targets.""" | |
|
478 | """Executes `code` on `targets` in blocking or nonblocking manner. | |
|
414 | 479 |
|
|
415 | block = block if block is not None else self.block | |
|
480 | ``execute`` is always `bound` (affects engine namespace) | |
|
416 | 481 |
|
|
417 | return self.client.execute(code, block=block, targets=self._targets) | |
|
482 | Parameters | |
|
483 | ---------- | |
|
484 | ||
|
485 | code : str | |
|
486 | the code string to be executed | |
|
487 | block : bool | |
|
488 | whether or not to wait until done to return | |
|
489 | default: self.block | |
|
490 | """ | |
|
491 | return self._really_apply(util._execute, args=(code,), block=block) | |
|
418 | 492 | |
|
419 | @sync_results | |
|
420 | @save_ids | |
|
421 | def run(self, fname, block=None): | |
|
422 | """execute the code in a file on my targets.""" | |
|
493 | def run(self, filename, block=None): | |
|
494 | """Execute contents of `filename` on my engine(s). | |
|
423 | 495 |
|
|
424 | block = block if block is not None else self.block | |
|
496 | This simply reads the contents of the file and calls `execute`. | |
|
425 | 497 |
|
|
426 | return self.client.run(fname, block=block, targets=self._targets) | |
|
498 | Parameters | |
|
499 | ---------- | |
|
500 | ||
|
501 | filename : str | |
|
502 | The path to the file | |
|
503 | targets : int/str/list of ints/strs | |
|
504 | the engines on which to execute | |
|
505 | default : all | |
|
506 | block : bool | |
|
507 | whether or not to wait until done | |
|
508 | default: self.block | |
|
509 | ||
|
510 | """ | |
|
511 | with open(filename, 'r') as f: | |
|
512 | # add newline in case of trailing indented whitespace | |
|
513 | # which will cause SyntaxError | |
|
514 | code = f.read()+'\n' | |
|
515 | return self.execute(code, block=block) | |
|
427 | 516 | |
|
428 | 517 | def update(self, ns): |
|
429 |
"""update remote namespace with dict `ns` |
|
|
430 | return self.client.push(ns, targets=self._targets, block=self.block) | |
|
518 | """update remote namespace with dict `ns` | |
|
519 | ||
|
520 | See `push` for details. | |
|
521 | """ | |
|
522 | return self.push(ns, block=self.block, track=self.track) | |
|
431 | 523 | |
|
432 | def push(self, ns, block=None): | |
|
433 |
"""update remote namespace with dict `ns` |
|
|
524 | def push(self, ns, block=None, track=None): | |
|
525 | """update remote namespace with dict `ns` | |
|
434 | 526 |
|
|
435 | block = block if block is not None else self.block | |
|
527 | Parameters | |
|
528 | ---------- | |
|
436 | 529 |
|
|
437 | return self.client.push(ns, targets=self._targets, block=block) | |
|
530 | ns : dict | |
|
531 | dict of keys with which to update engine namespace(s) | |
|
532 | block : bool [default : self.block] | |
|
533 | whether to wait to be notified of engine receipt | |
|
534 | ||
|
535 | """ | |
|
536 | ||
|
537 | block = block if block is not None else self.block | |
|
538 | track = track if track is not None else self.track | |
|
539 | # applier = self.apply_sync if block else self.apply_async | |
|
540 | if not isinstance(ns, dict): | |
|
541 | raise TypeError("Must be a dict, not %s"%type(ns)) | |
|
542 | return self._really_apply(util._push, (ns,),block=block, track=track) | |
|
438 | 543 | |
|
439 | 544 | def get(self, key_s): |
|
440 | 545 | """get object(s) by `key_s` from remote namespace |
|
441 | will return one object if it is a key. | |
|
442 | It also takes a list of keys, and will return a list of objects.""" | |
|
546 | ||
|
547 | see `pull` for details. | |
|
548 | """ | |
|
443 | 549 | # block = block if block is not None else self.block |
|
444 |
return self |
|
|
550 | return self.pull(key_s, block=True) | |
|
445 | 551 | |
|
446 | @sync_results | |
|
447 | @save_ids | |
|
448 | def pull(self, key_s, block=True): | |
|
449 | """get object(s) by `key_s` from remote namespace | |
|
552 | def pull(self, names, block=True): | |
|
553 | """get object(s) by `name` from remote namespace | |
|
554 | ||
|
450 | 555 | will return one object if it is a key. |
|
451 |
|
|
|
556 | can also take a list of keys, in which case it will return a list of objects. | |
|
557 | """ | |
|
452 | 558 | block = block if block is not None else self.block |
|
453 | return self.client.pull(key_s, block=block, targets=self._targets) | |
|
559 | applier = self.apply_sync if block else self.apply_async | |
|
560 | if isinstance(names, basestring): | |
|
561 | pass | |
|
562 | elif isinstance(names, (list,tuple,set)): | |
|
563 | for key in names: | |
|
564 | if not isinstance(key, basestring): | |
|
565 | raise TypeError("keys must be str, not type %r"%type(key)) | |
|
566 | else: | |
|
567 | raise TypeError("names must be strs, not %r"%names) | |
|
568 | return applier(util._pull, names) | |
|
454 | 569 | |
|
455 | def scatter(self, key, seq, dist='b', flatten=False, block=None): | |
|
570 | def scatter(self, key, seq, dist='b', flatten=False, block=None, track=None): | |
|
456 | 571 | """ |
|
457 | 572 | Partition a Python sequence and send the partitions to a set of engines. |
|
458 | 573 | """ |
|
459 | 574 | block = block if block is not None else self.block |
|
575 | track = track if track is not None else self.track | |
|
576 | targets = self._targets | |
|
577 | mapObject = Map.dists[dist]() | |
|
578 | nparts = len(targets) | |
|
579 | msg_ids = [] | |
|
580 | trackers = [] | |
|
581 | for index, engineid in enumerate(targets): | |
|
582 | push = self.client[engineid].push | |
|
583 | partition = mapObject.getPartition(seq, index, nparts) | |
|
584 | if flatten and len(partition) == 1: | |
|
585 | r = push({key: partition[0]}, block=False, track=track) | |
|
586 | else: | |
|
587 | r = push({key: partition},block=False, track=track) | |
|
588 | msg_ids.extend(r.msg_ids) | |
|
589 | if track: | |
|
590 | trackers.append(r._tracker) | |
|
460 | 591 | |
|
461 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, | |
|
462 | targets=self._targets, block=block) | |
|
592 | if track: | |
|
593 | tracker = zmq.MessageTracker(*trackers) | |
|
594 | else: | |
|
595 | tracker = None | |
|
596 | ||
|
597 | r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker) | |
|
598 | if block: | |
|
599 | r.wait() | |
|
600 | else: | |
|
601 | return r | |
|
463 | 602 | |
|
464 | 603 | @sync_results |
|
465 | 604 | @save_ids |
@@ -468,8 +607,20 b' class DirectView(View):' | |||
|
468 | 607 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
469 | 608 | """ |
|
470 | 609 | block = block if block is not None else self.block |
|
610 | mapObject = Map.dists[dist]() | |
|
611 | msg_ids = [] | |
|
612 | for index, engineid in enumerate(self._targets): | |
|
613 | ||
|
614 | msg_ids.extend(self.client[engineid].pull(key, block=False).msg_ids) | |
|
471 | 615 | |
|
472 | return self.client.gather(key, dist=dist, targets=self._targets, block=block) | |
|
616 | r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather') | |
|
617 | ||
|
618 | if block: | |
|
619 | try: | |
|
620 | return r.get() | |
|
621 | except KeyboardInterrupt: | |
|
622 | pass | |
|
623 | return r | |
|
473 | 624 | |
|
474 | 625 | def __getitem__(self, key): |
|
475 | 626 | return self.get(key) |
@@ -523,22 +674,25 b' class LoadBalancedView(View):' | |||
|
523 | 674 | |
|
524 | 675 | Load-balanced views can be created with the client's `view` method: |
|
525 | 676 | |
|
526 |
>>> v = client. |
|
|
677 | >>> v = client.load_balanced_view() | |
|
527 | 678 | |
|
528 | 679 | or targets can be specified, to restrict the potential destinations: |
|
529 | 680 | |
|
530 |
>>> v = client. |
|
|
681 | >>> v = client.client.load_balanced_view(([1,3]) | |
|
531 | 682 | |
|
532 | 683 | which would restrict loadbalancing to between engines 1 and 3. |
|
533 | 684 | |
|
534 | 685 | """ |
|
535 | 686 | |
|
536 |
_ |
|
|
687 | _flag_names = ['block', 'track', 'follow', 'after', 'timeout'] | |
|
537 | 688 | |
|
538 | def __init__(self, client=None, targets=None): | |
|
539 | super(LoadBalancedView, self).__init__(client=client, targets=targets) | |
|
689 | def __init__(self, client=None, socket=None, targets=None): | |
|
690 | super(LoadBalancedView, self).__init__(client=client, socket=socket, targets=targets) | |
|
540 | 691 | self._ntargets = 1 |
|
541 | self._balanced = True | |
|
692 | self._task_scheme=client._task_scheme | |
|
693 | if targets is None: | |
|
694 | self._targets = None | |
|
695 | self._idents=[] | |
|
542 | 696 | |
|
543 | 697 | def _validate_dependency(self, dep): |
|
544 | 698 | """validate a dependency. |
@@ -549,7 +703,7 b' class LoadBalancedView(View):' | |||
|
549 | 703 | return True |
|
550 | 704 | elif isinstance(dep, (list,set, tuple)): |
|
551 | 705 | for d in dep: |
|
552 | if not isinstance(d, str, AsyncResult): | |
|
706 | if not isinstance(d, (str, AsyncResult)): | |
|
553 | 707 | return False |
|
554 | 708 | elif isinstance(dep, dict): |
|
555 | 709 | if set(dep.keys()) != set(Dependency().as_dict().keys()): |
@@ -561,7 +715,21 b' class LoadBalancedView(View):' | |||
|
561 | 715 | return False |
|
562 | 716 | else: |
|
563 | 717 | return False |
|
718 | ||
|
719 | return True | |
|
564 | 720 | |
|
721 | def _render_dependency(self, dep): | |
|
722 | """helper for building jsonable dependencies from various input forms.""" | |
|
723 | if isinstance(dep, Dependency): | |
|
724 | return dep.as_dict() | |
|
725 | elif isinstance(dep, AsyncResult): | |
|
726 | return dep.msg_ids | |
|
727 | elif dep is None: | |
|
728 | return [] | |
|
729 | else: | |
|
730 | # pass to Dependency constructor | |
|
731 | return list(Dependency(dep)) | |
|
732 | ||
|
565 | 733 | def set_flags(self, **kwargs): |
|
566 | 734 | """set my attribute flags by keyword. |
|
567 | 735 | |
@@ -574,19 +742,28 b' class LoadBalancedView(View):' | |||
|
574 | 742 | |
|
575 | 743 | block : bool |
|
576 | 744 | whether to wait for results |
|
577 | bound : bool | |
|
578 | whether to pass the client's Namespace as the first argument | |
|
579 | to functions called via `apply`. | |
|
580 | 745 | track : bool |
|
581 | 746 | whether to create a MessageTracker to allow the user to |
|
582 | 747 | safely edit after arrays and buffers during non-copying |
|
583 | 748 | sends. |
|
584 | follow : Dependency, list, msg_id, AsyncResult | |
|
585 | the location dependencies of tasks | |
|
586 | after : Dependency, list, msg_id, AsyncResult | |
|
587 | the time dependencies of tasks | |
|
588 | timeout : int,None | |
|
589 | the timeout to be used for tasks | |
|
749 | # | |
|
750 | after : Dependency or collection of msg_ids | |
|
751 | Only for load-balanced execution (targets=None) | |
|
752 | Specify a list of msg_ids as a time-based dependency. | |
|
753 | This job will only be run *after* the dependencies | |
|
754 | have been met. | |
|
755 | ||
|
756 | follow : Dependency or collection of msg_ids | |
|
757 | Only for load-balanced execution (targets=None) | |
|
758 | Specify a list of msg_ids as a location-based dependency. | |
|
759 | This job will only be run on an engine where this dependency | |
|
760 | is met. | |
|
761 | ||
|
762 | timeout : float/int or None | |
|
763 | Only for load-balanced execution (targets=None) | |
|
764 | Specify an amount of time (in seconds) for the scheduler to | |
|
765 | wait for dependencies to be met before failing with a | |
|
766 | DependencyTimeout. | |
|
590 | 767 | """ |
|
591 | 768 | |
|
592 | 769 | super(LoadBalancedView, self).set_flags(**kwargs) |
@@ -599,23 +776,101 b' class LoadBalancedView(View):' | |||
|
599 | 776 | raise ValueError("Invalid dependency: %r"%value) |
|
600 | 777 | if 'timeout' in kwargs: |
|
601 | 778 | t = kwargs['timeout'] |
|
602 | if not isinstance(t, (int, long, float, None)): | |
|
779 | if not isinstance(t, (int, long, float, type(None))): | |
|
603 | 780 | raise TypeError("Invalid type for timeout: %r"%type(t)) |
|
604 | 781 | if t is not None: |
|
605 | 782 | if t < 0: |
|
606 | 783 | raise ValueError("Invalid timeout: %s"%t) |
|
607 | 784 | self.timeout = t |
|
608 | ||
|
785 | ||
|
786 | @sync_results | |
|
787 | @save_ids | |
|
788 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, | |
|
789 | after=None, follow=None, timeout=None): | |
|
790 | """calls f(*args, **kwargs) on a remote engine, returning the result. | |
|
791 | ||
|
792 | This method temporarily sets all of `apply`'s flags for a single call. | |
|
793 | ||
|
794 | Parameters | |
|
795 | ---------- | |
|
796 | ||
|
797 | f : callable | |
|
798 | ||
|
799 | args : list [default: empty] | |
|
800 | ||
|
801 | kwargs : dict [default: empty] | |
|
802 | ||
|
803 | block : bool [default: self.block] | |
|
804 | whether to block | |
|
805 | track : bool [default: self.track] | |
|
806 | whether to ask zmq to track the message, for safe non-copying sends | |
|
807 | ||
|
808 | !!!!!! TODO: THE REST HERE !!!! | |
|
809 | ||
|
810 | Returns | |
|
811 | ------- | |
|
812 | ||
|
813 | if self.block is False: | |
|
814 | returns AsyncResult | |
|
815 | else: | |
|
816 | returns actual result of f(*args, **kwargs) on the engine(s) | |
|
817 | This will be a list of self.targets is also a list (even length 1), or | |
|
818 | the single result if self.targets is an integer engine id | |
|
819 | """ | |
|
820 | ||
|
821 | # validate whether we can run | |
|
822 | if self._socket.closed: | |
|
823 | msg = "Task farming is disabled" | |
|
824 | if self._task_scheme == 'pure': | |
|
825 | msg += " because the pure ZMQ scheduler cannot handle" | |
|
826 | msg += " disappearing engines." | |
|
827 | raise RuntimeError(msg) | |
|
828 | ||
|
829 | if self._task_scheme == 'pure': | |
|
830 | # pure zmq scheme doesn't support dependencies | |
|
831 | msg = "Pure ZMQ scheduler doesn't support dependencies" | |
|
832 | if (follow or after): | |
|
833 | # hard fail on DAG dependencies | |
|
834 | raise RuntimeError(msg) | |
|
835 | if isinstance(f, dependent): | |
|
836 | # soft warn on functional dependencies | |
|
837 | warnings.warn(msg, RuntimeWarning) | |
|
838 | ||
|
839 | # build args | |
|
840 | args = [] if args is None else args | |
|
841 | kwargs = {} if kwargs is None else kwargs | |
|
842 | block = self.block if block is None else block | |
|
843 | track = self.track if track is None else track | |
|
844 | after = self.after if after is None else after | |
|
845 | follow = self.follow if follow is None else follow | |
|
846 | timeout = self.timeout if timeout is None else timeout | |
|
847 | after = self._render_dependency(after) | |
|
848 | follow = self._render_dependency(follow) | |
|
849 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=self._idents) | |
|
850 | ||
|
851 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | |
|
852 | subheader=subheader) | |
|
853 | tracker = None if track is False else msg['tracker'] | |
|
854 | ||
|
855 | ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker) | |
|
856 | ||
|
857 | if block: | |
|
858 | try: | |
|
859 | return ar.get() | |
|
860 | except KeyboardInterrupt: | |
|
861 | pass | |
|
862 | return ar | |
|
863 | ||
|
609 | 864 | @spin_after |
|
610 | 865 | @save_ids |
|
611 | 866 | def map(self, f, *sequences, **kwargs): |
|
612 |
"""view.map(f, *sequences, block=self.block, |
|
|
867 | """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult | |
|
613 | 868 | |
|
614 | 869 | Parallel version of builtin `map`, load-balanced by this View. |
|
615 | 870 | |
|
616 |
`block`, |
|
|
871 | `block`, and `chunksize` can be specified by keyword only. | |
|
617 | 872 | |
|
618 |
Each `chunk |
|
|
873 | Each `chunksize` elements will be a separate task, and will be | |
|
619 | 874 | load-balanced. This lets individual elements be available for iteration |
|
620 | 875 | as soon as they arrive. |
|
621 | 876 | |
@@ -628,13 +883,11 b' class LoadBalancedView(View):' | |||
|
628 | 883 | the sequences to be distributed and passed to `f` |
|
629 | 884 | block : bool |
|
630 | 885 | whether to wait for the result or not [default self.block] |
|
631 | bound : bool | |
|
632 | whether to pass the client's Namespace as the first argument to `f` | |
|
633 | 886 | track : bool |
|
634 | 887 | whether to create a MessageTracker to allow the user to |
|
635 | 888 | safely edit after arrays and buffers during non-copying |
|
636 | 889 | sends. |
|
637 |
chunk |
|
|
890 | chunksize : int | |
|
638 | 891 | how many elements should be in each task [default 1] |
|
639 | 892 | |
|
640 | 893 | Returns |
@@ -652,19 +905,16 b' class LoadBalancedView(View):' | |||
|
652 | 905 | |
|
653 | 906 | # default |
|
654 | 907 | block = kwargs.get('block', self.block) |
|
655 |
|
|
|
656 | chunk_size = kwargs.get('chunk_size', 1) | |
|
908 | chunksize = kwargs.get('chunksize', 1) | |
|
657 | 909 | |
|
658 | 910 | keyset = set(kwargs.keys()) |
|
659 |
extra_keys = keyset.difference_update(set(['block', ' |
|
|
911 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) | |
|
660 | 912 | if extra_keys: |
|
661 | 913 | raise TypeError("Invalid kwargs: %s"%list(extra_keys)) |
|
662 | 914 | |
|
663 | 915 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
664 | 916 | |
|
665 |
pf = ParallelFunction(self |
|
|
666 | targets=self._targets, balanced=True, | |
|
667 | chunk_size=chunk_size) | |
|
917 | pf = ParallelFunction(self, f, block=block, chunksize=chunksize) | |
|
668 | 918 | return pf.map(*sequences) |
|
669 | 919 | |
|
670 | 920 | __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file |
@@ -53,12 +53,12 b' def make_bintree(levels):' | |||
|
53 | 53 | add_children(G, root, levels, 2) |
|
54 | 54 | return G |
|
55 | 55 | |
|
56 |
def submit_jobs( |
|
|
56 | def submit_jobs(view, G, jobs): | |
|
57 | 57 | """Submit jobs via client where G describes the time dependencies.""" |
|
58 | 58 | results = {} |
|
59 | 59 | for node in nx.topological_sort(G): |
|
60 |
|
|
|
61 |
results[node] = |
|
|
60 | with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]): | |
|
61 | results[node] = view.apply(jobs[node]) | |
|
62 | 62 | return results |
|
63 | 63 | |
|
64 | 64 | def validate_tree(G, results): |
@@ -76,7 +76,7 b' def main(nodes, edges):' | |||
|
76 | 76 | in-degree on the y (just for spread). All arrows must |
|
77 | 77 | point at least slightly to the right if the graph is valid. |
|
78 | 78 | """ |
|
79 | import pylab | |
|
79 | from matplotlib import pyplot as plt | |
|
80 | 80 | from matplotlib.dates import date2num |
|
81 | 81 | from matplotlib.cm import gist_rainbow |
|
82 | 82 | print "building DAG" |
@@ -88,10 +88,11 b' def main(nodes, edges):' | |||
|
88 | 88 | jobs[node] = randomwait |
|
89 | 89 | |
|
90 | 90 | client = cmod.Client() |
|
91 | view = client.load_balanced_view() | |
|
91 | 92 | print "submitting %i tasks with %i dependencies"%(nodes,edges) |
|
92 |
results = submit_jobs( |
|
|
93 | results = submit_jobs(view, G, jobs) | |
|
93 | 94 | print "waiting for results" |
|
94 | client.barrier() | |
|
95 | view.wait() | |
|
95 | 96 | print "done" |
|
96 | 97 | for node in G: |
|
97 | 98 | md = results[node].metadata |
@@ -107,13 +108,13 b' def main(nodes, edges):' | |||
|
107 | 108 | xmax,ymax = map(max, (x,y)) |
|
108 | 109 | xscale = xmax-xmin |
|
109 | 110 | yscale = ymax-ymin |
|
110 |
p |
|
|
111 |
p |
|
|
111 | plt.xlim(xmin-xscale*.1,xmax+xscale*.1) | |
|
112 | plt.ylim(ymin-yscale*.1,ymax+yscale*.1) | |
|
112 | 113 | return G,results |
|
113 | 114 | |
|
114 | 115 | if __name__ == '__main__': |
|
115 | import pylab | |
|
116 | from matplotlib import pyplot as plt | |
|
116 | 117 | # main(5,10) |
|
117 | 118 | main(32,96) |
|
118 |
p |
|
|
119 | plt.show() | |
|
119 | 120 | No newline at end of file |
@@ -31,7 +31,7 b' def getpid2():' | |||
|
31 | 31 | import os |
|
32 | 32 | return os.getpid() |
|
33 | 33 | |
|
34 |
view = client |
|
|
34 | view = client.load_balanced_view() | |
|
35 | 35 | view.block=True |
|
36 | 36 | |
|
37 | 37 | # will run on anything: |
@@ -58,29 +58,41 b' successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids' | |||
|
58 | 58 | failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ] |
|
59 | 59 | |
|
60 | 60 | mixed = [failures[0],successes[0]] |
|
61 |
d1a = Dependency(mixed, |
|
|
62 |
d1b = Dependency(mixed, |
|
|
63 |
d2a = Dependency(mixed, |
|
|
64 |
d2b = Dependency(mixed, |
|
|
65 |
d3 = Dependency(failures, |
|
|
66 |
d4 = Dependency(failures, |
|
|
67 |
d5 = Dependency(failures, |
|
|
68 |
d6 = Dependency(successes, |
|
|
69 | ||
|
70 |
|
|
|
71 | ||
|
72 | r1a = client.apply(getpid, after=d1a) | |
|
73 |
|
|
|
74 | r2a = client.apply(getpid, after=d2b, follow=d2a) | |
|
75 | r2b = client.apply(getpid, after=d2a, follow=d2b) | |
|
76 | r3 = client.apply(getpid, after=d3) | |
|
77 |
|
|
|
78 | r4b = client.apply(getpid, follow=d4) | |
|
79 | r4c = client.apply(getpid, after=d3, follow=d4) | |
|
80 | r5 = client.apply(getpid, after=d5) | |
|
81 | r5b = client.apply(getpid, follow=d5, after=d3) | |
|
82 | r6 = client.apply(getpid, follow=d6) | |
|
83 | r6b = client.apply(getpid, after=d6, follow=d2b) | |
|
61 | d1a = Dependency(mixed, all=False, failure=True) # yes | |
|
62 | d1b = Dependency(mixed, all=False) # yes | |
|
63 | d2a = Dependency(mixed, all=True, failure=True) # yes after / no follow | |
|
64 | d2b = Dependency(mixed, all=True) # no | |
|
65 | d3 = Dependency(failures, all=False) # no | |
|
66 | d4 = Dependency(failures, all=False, failure=True) # yes | |
|
67 | d5 = Dependency(failures, all=True, failure=True) # yes after / no follow | |
|
68 | d6 = Dependency(successes, all=True, failure=True) # yes after / no follow | |
|
69 | ||
|
70 | view.block = False | |
|
71 | flags = view.temp_flags | |
|
72 | with flags(after=d1a): | |
|
73 | r1a = view.apply(getpid) | |
|
74 | with flags(follow=d1b): | |
|
75 | r1b = view.apply(getpid) | |
|
76 | with flags(after=d2b, follow=d2a): | |
|
77 | r2a = view.apply(getpid) | |
|
78 | with flags(after=d2a, follow=d2b): | |
|
79 | r2b = view.apply(getpid) | |
|
80 | with flags(after=d3): | |
|
81 | r3 = view.apply(getpid) | |
|
82 | with flags(after=d4): | |
|
83 | r4a = view.apply(getpid) | |
|
84 | with flags(follow=d4): | |
|
85 | r4b = view.apply(getpid) | |
|
86 | with flags(after=d3, follow=d4): | |
|
87 | r4c = view.apply(getpid) | |
|
88 | with flags(after=d5): | |
|
89 | r5 = view.apply(getpid) | |
|
90 | with flags(follow=d5, after=d3): | |
|
91 | r5b = view.apply(getpid) | |
|
92 | with flags(follow=d6): | |
|
93 | r6 = view.apply(getpid) | |
|
94 | with flags(after=d6, follow=d2b): | |
|
95 | r6b = view.apply(getpid) | |
|
84 | 96 | |
|
85 | 97 | def should_fail(f): |
|
86 | 98 | try: |
@@ -1,8 +1,9 b'' | |||
|
1 | 1 | from IPython.zmq.parallel.client import * |
|
2 | 2 | |
|
3 | 3 | client = Client() |
|
4 | view = client[:] | |
|
4 | 5 | |
|
5 |
@remote( |
|
|
6 | @view.remote(block=True) | |
|
6 | 7 | def square(a): |
|
7 | 8 | """return square of a number""" |
|
8 | 9 | return a*a |
@@ -21,7 +22,7 b' squares2 = [ r.get() for r in arlist ]' | |||
|
21 | 22 | |
|
22 | 23 | # now the more convenient @parallel decorator, which has a map method: |
|
23 | 24 | |
|
24 |
@parallel( |
|
|
25 | @view.parallel(block=False) | |
|
25 | 26 | def psquare(a): |
|
26 | 27 | """return square of a number""" |
|
27 | 28 | return a*a |
@@ -3,12 +3,12 b' from IPython.zmq.parallel.client import *' | |||
|
3 | 3 | client = Client() |
|
4 | 4 | |
|
5 | 5 | for id in client.ids: |
|
6 |
client.push(dict(ids=id*id) |
|
|
6 | client[id].push(dict(ids=id*id)) | |
|
7 | 7 | |
|
8 |
|
|
|
9 |
|
|
|
8 | v = client[0] | |
|
9 | v['a'] = 5 | |
|
10 | 10 | |
|
11 |
print |
|
|
11 | print v['a'] | |
|
12 | 12 | |
|
13 | 13 | remotes = client[:] |
|
14 | 14 |
@@ -49,7 +49,7 b' c = client.Client(profile=cluster_profile)' | |||
|
49 | 49 | |
|
50 | 50 | # A LoadBalancedView is an interface to the engines that provides dynamic load |
|
51 | 51 | # balancing at the expense of not knowing which engine will execute the code. |
|
52 | view = c.view() | |
|
52 | view = c.load_balanced_view() | |
|
53 | 53 | |
|
54 | 54 | # Initialize the common code on the engines. This Python module has the |
|
55 | 55 | # price_options function that prices the options. |
@@ -75,7 +75,7 b' print "Submitted tasks: ", len(async_results)' | |||
|
75 | 75 | sys.stdout.flush() |
|
76 | 76 | |
|
77 | 77 | # Block until all tasks are completed. |
|
78 |
c. |
|
|
78 | c.wait(async_results) | |
|
79 | 79 | t2 = time.time() |
|
80 | 80 | t = t2-t1 |
|
81 | 81 |
@@ -27,14 +27,14 b" filestring = 'pi200m.ascii.%(i)02dof20'" | |||
|
27 | 27 | files = [filestring % {'i':i} for i in range(1,16)] |
|
28 | 28 | |
|
29 | 29 | # Connect to the IPython cluster |
|
30 |
c = client.Client( |
|
|
31 | c.run('pidigits.py') | |
|
30 | c = client.Client() | |
|
31 | c[:].run('pidigits.py') | |
|
32 | 32 | |
|
33 | 33 | # the number of engines |
|
34 | 34 | n = len(c) |
|
35 | 35 | id0 = c.ids[0] |
|
36 | 36 | v = c[:] |
|
37 | v.set_flags(bound=True,block=True) | |
|
37 | v.block=True | |
|
38 | 38 | # fetch the pi-files |
|
39 | 39 | print "downloading %i files of pi"%n |
|
40 | 40 | v.map(fetch_pi_file, files[:n]) |
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt' | |||
|
30 | 30 | from IPython.external import argparse |
|
31 | 31 | from IPython.zmq.parallel.client import Client, Reference |
|
32 | 32 | |
|
33 |
def setup_partitioner( |
|
|
33 | def setup_partitioner(index, num_procs, gnum_cells, parts): | |
|
34 | 34 | """create a partitioner in the engine namespace""" |
|
35 | global partitioner | |
|
35 | 36 | p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) |
|
36 | 37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
37 | 38 | p.prepare_communication() |
|
38 | 39 | # put the partitioner into the global namespace: |
|
39 |
|
|
|
40 | partitioner=p | |
|
40 | 41 | |
|
41 |
def setup_solver( |
|
|
42 | def setup_solver(*args, **kwargs): | |
|
42 | 43 | """create a WaveSolver in the engine namespace""" |
|
43 | ns.solver = WaveSolver(*args, **kwargs) | |
|
44 | global solver | |
|
45 | solver = WaveSolver(*args, **kwargs) | |
|
44 | 46 | |
|
45 | 47 | def wave_saver(u, x, y, t): |
|
46 | 48 | """save the wave log""" |
@@ -146,11 +148,11 b" if __name__ == '__main__':" | |||
|
146 | 148 | # setup remote partitioner |
|
147 | 149 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
148 | 150 | # object named 'my_id' in the engine's namespace |
|
149 |
view.apply_sync |
|
|
151 | view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition) | |
|
150 | 152 | # wait for initial communication to complete |
|
151 | 153 | view.execute('mpi.barrier()') |
|
152 | 154 | # setup remote solvers |
|
153 |
view.apply_sync |
|
|
155 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
|
154 | 156 | |
|
155 | 157 | # lambda for calling solver.solve: |
|
156 | 158 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
@@ -172,7 +174,7 b" if __name__ == '__main__':" | |||
|
172 | 174 | |
|
173 | 175 | impl['inner'] = 'vectorized' |
|
174 | 176 | # setup new solvers |
|
175 |
view.apply_sync |
|
|
177 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
|
176 | 178 | view.execute('mpi.barrier()') |
|
177 | 179 | |
|
178 | 180 | # run again with numpy vectorized inner-implementation |
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt' | |||
|
30 | 30 | from IPython.external import argparse |
|
31 | 31 | from IPython.zmq.parallel.client import Client, Reference |
|
32 | 32 | |
|
33 |
def setup_partitioner( |
|
|
33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): | |
|
34 | 34 | """create a partitioner in the engine namespace""" |
|
35 | global partitioner | |
|
35 | 36 | p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) |
|
36 | 37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
37 | 38 | p.prepare_communication() |
|
38 | 39 | # put the partitioner into the global namespace: |
|
39 |
|
|
|
40 | partitioner=p | |
|
40 | 41 | |
|
41 |
def setup_solver( |
|
|
42 | def setup_solver(*args, **kwargs): | |
|
42 | 43 | """create a WaveSolver in the engine namespace.""" |
|
43 | ns.solver = WaveSolver(*args, **kwargs) | |
|
44 | global solver | |
|
45 | solver = WaveSolver(*args, **kwargs) | |
|
44 | 46 | |
|
45 | 47 | def wave_saver(u, x, y, t): |
|
46 | 48 | """save the wave state for each timestep.""" |
@@ -156,7 +158,7 b" if __name__ == '__main__':" | |||
|
156 | 158 | # setup remote partitioner |
|
157 | 159 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
158 | 160 | # object named 'com' in the engine's namespace |
|
159 |
view.apply_sync |
|
|
161 | view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) | |
|
160 | 162 | time.sleep(1) |
|
161 | 163 | # convenience lambda to call solver.solve: |
|
162 | 164 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
@@ -164,7 +166,7 b" if __name__ == '__main__':" | |||
|
164 | 166 | if ns.scalar: |
|
165 | 167 | impl['inner'] = 'scalar' |
|
166 | 168 | # setup remote solvers |
|
167 |
view.apply_sync |
|
|
169 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
|
168 | 170 | |
|
169 | 171 | # run first with element-wise Python operations for each cell |
|
170 | 172 | t0 = time.time() |
@@ -182,7 +184,7 b" if __name__ == '__main__':" | |||
|
182 | 184 | # run again with faster numpy-vectorized inner implementation: |
|
183 | 185 | impl['inner'] = 'vectorized' |
|
184 | 186 | # setup remote solvers |
|
185 |
view.apply_sync |
|
|
187 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
|
186 | 188 | |
|
187 | 189 | t0 = time.time() |
|
188 | 190 |
@@ -19,7 +19,7 b' Contents' | |||
|
19 | 19 | whatsnew/index.txt |
|
20 | 20 | install/index.txt |
|
21 | 21 | interactive/index.txt |
|
22 | parallel/index.txt | |
|
22 | .. parallel/index.txt | |
|
23 | 23 | parallelz/index.txt |
|
24 | 24 | config/index.txt |
|
25 | 25 | development/index.txt |
@@ -45,7 +45,7 b' A possible sequence of events for this workflow:' | |||
|
45 | 45 | |
|
46 | 46 | |
|
47 | 47 | Further, taking failures into account, assuming all dependencies are run with the default |
|
48 |
`success |
|
|
48 | `success=True,failure=False`, the following cases would occur for each node's failure: | |
|
49 | 49 | |
|
50 | 50 | 0. fails: all other tasks fail as Impossible |
|
51 | 51 | 1. 2 can still succeed, but 3,4 are unreachable |
@@ -111,7 +111,8 b' on which it depends:' | |||
|
111 | 111 | |
|
112 | 112 | .. sourcecode:: ipython |
|
113 | 113 | |
|
114 | In [5]: c = client.Client() | |
|
114 | In [5]: rc = client.Client() | |
|
115 | In [5]: view = rc.load_balanced_view() | |
|
115 | 116 | |
|
116 | 117 | In [6]: results = {} |
|
117 | 118 | |
@@ -120,13 +121,13 b' on which it depends:' | |||
|
120 | 121 | ...: # leading into this one as dependencies |
|
121 | 122 | ...: deps = [ results[n] for n in G.predecessors(node) ] |
|
122 | 123 | ...: # submit and store AsyncResult object |
|
123 |
...: results[node] = |
|
|
124 | ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False) | |
|
124 | 125 | |
|
125 | 126 | Now that we have submitted all the jobs, we can wait for the results: |
|
126 | 127 | |
|
127 | 128 | .. sourcecode:: ipython |
|
128 | 129 | |
|
129 |
In [8]: |
|
|
130 | In [8]: view.wait(results.values()) | |
|
130 | 131 | |
|
131 | 132 | Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have |
|
132 | 133 | raised an error if a task failed). But we don't know that the ordering was properly |
@@ -17,5 +17,6 b' Using IPython for parallel computing (ZMQ)' | |||
|
17 | 17 | parallel_demos.txt |
|
18 | 18 | dag_dependencies.txt |
|
19 | 19 | parallel_details.txt |
|
20 | parallel_transition.txt | |
|
20 | 21 | |
|
21 | 22 |
@@ -135,7 +135,7 b' calculation can also be run by simply typing the commands from' | |||
|
135 | 135 | # We simply pass Client the name of the cluster profile we |
|
136 | 136 | # are using. |
|
137 | 137 | In [2]: c = client.Client(profile='mycluster') |
|
138 |
In [3]: view = c. |
|
|
138 | In [3]: view = c.load_balanced_view() | |
|
139 | 139 | |
|
140 | 140 | In [3]: c.ids |
|
141 | 141 | Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] |
@@ -166,11 +166,11 b' calculation can also be run by simply typing the commands from' | |||
|
166 | 166 | 'pi200m.ascii.15of20'] |
|
167 | 167 | |
|
168 | 168 | # download the data files if they don't already exist: |
|
169 |
In [8]: |
|
|
169 | In [8]: v.map(fetch_pi_file, files) | |
|
170 | 170 | |
|
171 | 171 | # This is the parallel calculation using the Client.map method |
|
172 | 172 | # which applies compute_two_digit_freqs to each file in files in parallel. |
|
173 |
In [9]: freqs_all = |
|
|
173 | In [9]: freqs_all = v.map(compute_two_digit_freqs, files) | |
|
174 | 174 | |
|
175 | 175 | # Add up the frequencies from each engine. |
|
176 | 176 | In [10]: freqs = reduce_freqs(freqs_all) |
@@ -18,13 +18,14 b' Non-copying sends and numpy arrays' | |||
|
18 | 18 | ---------------------------------- |
|
19 | 19 | |
|
20 | 20 | When numpy arrays are passed as arguments to apply or via data-movement methods, they are not |
|
21 |
copied. |
|
|
22 |
PyZMQ does allow you to track when a message has been sent so you can know when it is safe |
|
|
23 | IPython only allows for this. | |
|
21 | copied. This means that you must be careful if you are sending an array that you intend to work | |
|
22 | on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe | |
|
23 | to edit the buffer, but IPython only allows for this. | |
|
24 | 24 | |
|
25 |
It is also important to note that the non-copying receive of a message is *read-only*. |
|
|
26 |
means that if you intend to work in-place on an array that you have sent or received, you must |
|
|
27 |
it. |
|
|
25 | It is also important to note that the non-copying receive of a message is *read-only*. That | |
|
26 | means that if you intend to work in-place on an array that you have sent or received, you must | |
|
27 | copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as | |
|
28 | results. | |
|
28 | 29 | |
|
29 | 30 | The following will fail: |
|
30 | 31 | |
@@ -69,6 +70,24 b' The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an ar' | |||
|
69 | 70 | In [6]: _.flags.writeable |
|
70 | 71 | Out[6]: False |
|
71 | 72 | |
|
73 | If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You | |
|
74 | must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method | |
|
75 | for checking and waiting for 0MQ to finish with a buffer. | |
|
76 | ||
|
77 | .. sourcecode:: ipython | |
|
78 | ||
|
79 | In [5]: A = numpy.random.random((1024,1024)) | |
|
80 | ||
|
81 | In [6]: view.track=True | |
|
82 | ||
|
83 | In [7]: ar = view.apply_async(lambda x: 2*x, A) | |
|
84 | ||
|
85 | In [8]: ar.sent | |
|
86 | Out[8]: False | |
|
87 | ||
|
88 | In [9]: ar.wait_on_send() # blocks until sent is True | |
|
89 | ||
|
90 | ||
|
72 | 91 | What is sendable? |
|
73 | 92 | ----------------- |
|
74 | 93 | |
@@ -83,6 +102,61 b' buffer without copying - and reconstruct the object on the other side in your ow' | |||
|
83 | 102 | possible that the object reconstruction will become extensible, so you can add your own |
|
84 | 103 | non-copying types, but this does not yet exist. |
|
85 | 104 | |
|
105 | Closures | |
|
106 | ******** | |
|
107 | ||
|
108 | Just about anything in Python is pickleable. The one notable exception is objects (generally | |
|
109 | functions) with *closures*. Closures can be a complicated topic, but the basic principal is that | |
|
110 | functions that refer to variables in their parent scope have closures. | |
|
111 | ||
|
112 | An example of a function that uses a closure: | |
|
113 | ||
|
114 | .. sourcecode:: python | |
|
115 | ||
|
116 | def f(a): | |
|
117 | def inner(): | |
|
118 | # inner will have a closure | |
|
119 | return a | |
|
120 | return echo | |
|
121 | ||
|
122 | f1 = f(1) | |
|
123 | f2 = f(2) | |
|
124 | f1() # returns 1 | |
|
125 | f2() # returns 2 | |
|
126 | ||
|
127 | f1 and f2 will have closures referring to the scope in which `inner` was defined, because they | |
|
128 | use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython. | |
|
129 | Note that you *would* be able to send `f`. This is only true for interactively defined | |
|
130 | functions (as are often used in decorators), and only when there are variables used inside the | |
|
131 | inner function, that are defined in the outer function. If the names are *not* in the outer | |
|
132 | function, then there will not be a closure, and the generated function will look in | |
|
133 | ``globals()`` for the name: | |
|
134 | ||
|
135 | .. sourcecode:: python | |
|
136 | ||
|
137 | def g(b): | |
|
138 | # note that `b` is not referenced in inner's scope | |
|
139 | def inner(): | |
|
140 | # this inner will *not* have a closure | |
|
141 | return a | |
|
142 | return echo | |
|
143 | g1 = g(1) | |
|
144 | g2 = g(2) | |
|
145 | g1() # raises NameError on 'a' | |
|
146 | a=5 | |
|
147 | g2() # returns 5 | |
|
148 | ||
|
149 | `g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as | |
|
150 | globals(). The :meth:`pull` method is implemented based on this principal. If we did not | |
|
151 | provide pull, you could implement it yourself with `apply`, by simply returning objects out | |
|
152 | of the global namespace: | |
|
153 | ||
|
154 | .. sourcecode:: ipython | |
|
155 | ||
|
156 | In [10]: view.apply(lambda : a) | |
|
157 | ||
|
158 | # is equivalent to | |
|
159 | In [11]: view.pull('a') | |
|
86 | 160 | |
|
87 | 161 | Running Code |
|
88 | 162 | ============ |
@@ -94,7 +168,9 b' Client method, called `apply`.' | |||
|
94 | 168 | Apply |
|
95 | 169 | ----- |
|
96 | 170 | |
|
97 |
The principal method of remote execution is :meth:`apply`, of |
|
|
171 | The principal method of remote execution is :meth:`apply`, of View objects. The Client provides | |
|
172 | the full execution and communication API for engines via its low-level | |
|
173 | :meth:`send_apply_message` method. | |
|
98 | 174 | |
|
99 | 175 | f : function |
|
100 | 176 | The fuction to be called remotely |
@@ -102,8 +178,6 b' args : tuple/list' | |||
|
102 | 178 | The positional arguments passed to `f` |
|
103 | 179 | kwargs : dict |
|
104 | 180 | The keyword arguments passed to `f` |
|
105 | bound : bool (default: False) | |
|
106 | Whether to pass the Engine(s) Namespace as the first argument to `f`. | |
|
107 | 181 | block : bool (default: self.block) |
|
108 | 182 | Whether to wait for the result, or return immediately. |
|
109 | 183 | False: |
@@ -135,8 +209,6 b' balanced : bool, default None' | |||
|
135 | 209 | If `balanced` and `targets` are both specified, the task will |
|
136 | 210 | be assigne to *one* of the targets by the scheduler. |
|
137 | 211 | |
|
138 | The following arguments are only used when balanced is True: | |
|
139 | ||
|
140 | 212 | after : Dependency or collection of msg_ids |
|
141 | 213 | Only for load-balanced execution (targets=None) |
|
142 | 214 | Specify a list of msg_ids as a time-based dependency. |
@@ -158,11 +230,11 b' timeout : float/int or None' | |||
|
158 | 230 | execute and run |
|
159 | 231 | --------------- |
|
160 | 232 | |
|
161 |
For executing strings of Python code, |
|
|
162 |
method, which rather than take functions and arguments, take simple strings. |
|
|
163 |
takes a string of Python code to execute, and sends it to the Engine(s). `run` |
|
|
164 |
`execute`, but for a *file*, rather than a string. It is simply a wrapper that |
|
|
165 | very similar to ``execute(open(f).read())``. | |
|
233 | For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a | |
|
234 | :meth:`run` method, which rather than take functions and arguments, take simple strings. | |
|
235 | `execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run` | |
|
236 | is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that | |
|
237 | does something very similar to ``execute(open(f).read())``. | |
|
166 | 238 | |
|
167 | 239 | .. note:: |
|
168 | 240 | |
@@ -172,44 +244,25 b' Views' | |||
|
172 | 244 | ===== |
|
173 | 245 | |
|
174 | 246 | The principal extension of the :class:`~parallel.client.Client` is the |
|
175 |
:class:`~parallel.view.View` class. The client |
|
|
176 | execution patterns, where you must specify everything about the execution as keywords to each | |
|
177 | call to :meth:`apply`. For users who want to more conveniently specify various options for | |
|
178 | several similar calls, we have the :class:`~parallel.view.View` objects. The basic principle of | |
|
179 | the views is to encapsulate the keyword arguments to :meth:`client.apply` as attributes, | |
|
180 | allowing users to specify them once and apply to any subsequent calls until the attribute is | |
|
181 | changed. | |
|
247 | :class:`~parallel.view.View` class. The client | |
|
182 | 248 | |
|
183 | 249 | Two of apply's keyword arguments are set at the construction of the View, and are immutable for |
|
184 | 250 | a given View: `balanced` and `targets`. `balanced` determines whether the View will be a |
|
185 | 251 | :class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets` |
|
186 | 252 | attribute. Attempts to change this will raise errors. |
|
187 | 253 | |
|
188 |
Views are cached by targets |
|
|
254 | Views are cached by targets/class, so requesting a view multiple times will always return the | |
|
255 | *same object*, not create a new one: | |
|
189 | 256 | |
|
190 | 257 | .. sourcecode:: ipython |
|
191 | 258 | |
|
192 |
In [3]: v1 = rc.view([1,2,3] |
|
|
193 |
In [4]: v2 = rc.view([1,2,3] |
|
|
259 | In [3]: v1 = rc.load_balanced_view([1,2,3]) | |
|
260 | In [4]: v2 = rc.load_balanced_view([1,2,3]) | |
|
194 | 261 | |
|
195 | 262 | In [5]: v2 is v1 |
|
196 | 263 | Out[5]: True |
|
197 | 264 | |
|
198 | 265 | |
|
199 | A :class:`View` always uses its `targets` attribute, and it will use its `bound` | |
|
200 | and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x` | |
|
201 | methods allow overriding `bound` and `block` for a single call. | |
|
202 | ||
|
203 | ================== ========== ========== | |
|
204 | method block bound | |
|
205 | ================== ========== ========== | |
|
206 | apply self.block self.bound | |
|
207 | apply_sync True False | |
|
208 | apply_async False False | |
|
209 | apply_sync_bound True True | |
|
210 | apply_async_bound False True | |
|
211 | ================== ========== ========== | |
|
212 | ||
|
213 | 266 | DirectView |
|
214 | 267 | ---------- |
|
215 | 268 | |
@@ -379,24 +432,26 b' interactive session - you must poll the 0MQ sockets for incoming messages. Note' | |||
|
379 | 432 | this polling *does not* actually make any network requests. It simply performs a `select` |
|
380 | 433 | operation, to check if messages are already in local memory, waiting to be handled. |
|
381 | 434 | |
|
382 |
The method that handles incoming messages is :meth:`spin`. |
|
|
435 | The method that handles incoming messages is :meth:`spin`. This method flushes any waiting | |
|
436 | messages on the various incoming sockets, and updates the state of the Client. | |
|
383 | 437 | |
|
384 |
If you need to wait for particular results to finish, you can use the :meth:` |
|
|
438 | If you need to wait for particular results to finish, you can use the :meth:`wait` method, | |
|
385 | 439 | which will call :meth:`spin` until the messages are no longer outstanding. Anything that |
|
386 | 440 | represents a collection of messages, such as a list of msg_ids or one or more AsyncResult |
|
387 |
objects, can be passed as argument to |
|
|
388 |
the |
|
|
441 | objects, can be passed as argument to wait. A timeout can be specified, which will prevent | |
|
442 | the call from blocking for more than a specified time, but the default behavior is to wait | |
|
389 | 443 | forever. |
|
390 | 444 | |
|
391 | 445 | |
|
392 | 446 | |
|
393 | 447 | The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies. |
|
394 |
This is the default if |
|
|
448 | This is the default if wait is called with no arguments - i.e. wait on *all* outstanding | |
|
449 | messages. | |
|
395 | 450 | |
|
396 | 451 | |
|
397 | 452 | .. note:: |
|
398 | 453 | |
|
399 |
TODO |
|
|
454 | TODO wait example | |
|
400 | 455 | |
|
401 | 456 | Map |
|
402 | 457 | === |
@@ -89,7 +89,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote' | |||
|
89 | 89 | The controller also provides a single point of contact for users who wish to |
|
90 | 90 | utilize the engines connected to the controller. There are different ways of |
|
91 | 91 | working with a controller. In IPython, all of these models are implemented via |
|
92 |
the client's :meth:`. |
|
|
92 | the client's :meth:`.View.apply` method, with various arguments, or | |
|
93 | 93 | constructing :class:`.View` objects to represent subsets of engines. The two |
|
94 | 94 | primary models for interacting with engines are: |
|
95 | 95 | |
@@ -124,12 +124,13 b' themselves block when user code is run, the schedulers hide that from the user t' | |||
|
124 | 124 | a fully asynchronous interface to a set of engines. |
|
125 | 125 | |
|
126 | 126 | |
|
127 | IPython client | |
|
128 | -------------- | |
|
127 | IPython client and views | |
|
128 | ------------------------ | |
|
129 | 129 | |
|
130 | There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a | |
|
131 |
|
|
|
132 |
interact with a set of engines through the interface. Here are the two default |
|
|
130 | There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a cluster. | |
|
131 | For each execution model, there is a corresponding :class:`~.parallel.view.View`. These views | |
|
132 | allow users to interact with a set of engines through the interface. Here are the two default | |
|
133 | views: | |
|
133 | 134 | |
|
134 | 135 | * The :class:`DirectView` class for explicit addressing. |
|
135 | 136 | * The :class:`LoadBalancedView` class for destination-agnostic scheduling. |
@@ -212,7 +213,7 b' everything is working correctly, try the following commands:' | |||
|
212 | 213 | In [4]: c.ids |
|
213 | 214 | Out[4]: set([0, 1, 2, 3]) |
|
214 | 215 | |
|
215 |
In [5]: c.apply(lambda : "Hello, World" |
|
|
216 | In [5]: c[:].apply_sync(lambda : "Hello, World") | |
|
216 | 217 | Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ] |
|
217 | 218 | |
|
218 | 219 | |
@@ -234,10 +235,10 b' then you would connect to it with:' | |||
|
234 | 235 | In [2]: c = client.Client(sshserver='myhub.example.com') |
|
235 | 236 | |
|
236 | 237 | Where 'myhub.example.com' is the url or IP address of the machine on |
|
237 | which the Hub process is running. | |
|
238 | which the Hub process is running (or another machine that has direct access to the Hub's ports). | |
|
238 | 239 | |
|
239 | 240 | You are now ready to learn more about the :ref:`Direct |
|
240 | <parallelmultiengine>` and :ref:`LoadBalanced <paralleltask>` interfaces to the | |
|
241 | <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the | |
|
241 | 242 | controller. |
|
242 | 243 | |
|
243 | 244 | .. [ZeroMQ] ZeroMQ. http://www.zeromq.org |
@@ -1,4 +1,4 b'' | |||
|
1 | .. _parallelmultiengine: | |
|
1 | .. _parallel_multiengine: | |
|
2 | 2 | |
|
3 | 3 | ========================== |
|
4 | 4 | IPython's Direct interface |
@@ -9,7 +9,7 b' IPython engines. The basic idea behind the multiengine interface is that the' | |||
|
9 | 9 | capabilities of each engine are directly and explicitly exposed to the user. |
|
10 | 10 | Thus, in the multiengine interface, each engine is given an id that is used to |
|
11 | 11 | identify the engine and give it work to do. This interface is very intuitive |
|
12 |
and is designed with interactive usage in mind, and is th |
|
|
12 | and is designed with interactive usage in mind, and is the best place for | |
|
13 | 13 | new users of IPython to begin. |
|
14 | 14 | |
|
15 | 15 | Starting the IPython controller and engines |
@@ -91,9 +91,7 b" DirectView's :meth:`map` method:" | |||
|
91 | 91 | |
|
92 | 92 | In [62]: serial_result = map(lambda x:x**10, range(32)) |
|
93 | 93 | |
|
94 | In [63]: dview.block = True | |
|
95 | ||
|
96 | In [66]: parallel_result = dview.map(lambda x: x**10, range(32)) | |
|
94 | In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32)) | |
|
97 | 95 | |
|
98 | 96 | In [67]: serial_result==parallel_result |
|
99 | 97 | Out[67]: True |
@@ -103,8 +101,7 b" DirectView's :meth:`map` method:" | |||
|
103 | 101 | |
|
104 | 102 | The :class:`DirectView`'s version of :meth:`map` does |
|
105 | 103 | not do dynamic load balancing. For a load balanced version, use a |
|
106 |
:class:`LoadBalancedView` |
|
|
107 | `balanced=True`. | |
|
104 | :class:`LoadBalancedView`. | |
|
108 | 105 | |
|
109 | 106 | .. seealso:: |
|
110 | 107 | |
@@ -119,7 +116,7 b' two decorators:' | |||
|
119 | 116 | |
|
120 | 117 | .. sourcecode:: ipython |
|
121 | 118 | |
|
122 |
In [10]: @ |
|
|
119 | In [10]: @dview.remote(block=True) | |
|
123 | 120 | ...: def getpid(): |
|
124 | 121 | ...: import os |
|
125 | 122 | ...: return os.getpid() |
@@ -128,7 +125,7 b' two decorators:' | |||
|
128 | 125 | In [11]: getpid() |
|
129 | 126 | Out[11]: [12345, 12346, 12347, 12348] |
|
130 | 127 | |
|
131 |
|
|
|
128 | The ``@parallel`` decorator creates parallel functions, that break up an element-wise | |
|
132 | 129 | operations and distribute them, reconstructing the result. |
|
133 | 130 | |
|
134 | 131 | .. sourcecode:: ipython |
@@ -137,13 +134,13 b' operations and distribute them, reconstructing the result.' | |||
|
137 | 134 | |
|
138 | 135 | In [13]: A = np.random.random((64,48)) |
|
139 | 136 | |
|
140 |
In [14]: @ |
|
|
137 | In [14]: @dview.parallel(block=True) | |
|
141 | 138 | ...: def pmul(A,B): |
|
142 | 139 | ...: return A*B |
|
143 | 140 | |
|
144 | 141 | In [15]: C_local = A*A |
|
145 | 142 | |
|
146 |
In [16]: C_remote |
|
|
143 | In [16]: C_remote = pmul(A,A) | |
|
147 | 144 | |
|
148 | 145 | In [17]: (C_local == C_remote).all() |
|
149 | 146 | Out[17]: True |
@@ -159,38 +156,36 b' Calling Python functions' | |||
|
159 | 156 | The most basic type of operation that can be performed on the engines is to |
|
160 | 157 | execute Python code or call Python functions. Executing Python code can be |
|
161 | 158 | done in blocking or non-blocking mode (non-blocking is default) using the |
|
162 | :meth:`execute` method, and calling functions can be done via the | |
|
159 | :meth:`.View.execute` method, and calling functions can be done via the | |
|
163 | 160 | :meth:`.View.apply` method. |
|
164 | 161 | |
|
165 | 162 | apply |
|
166 | 163 | ----- |
|
167 | 164 | |
|
168 | 165 | The main method for doing remote execution (in fact, all methods that |
|
169 |
communicate with the engines are built on top of it), is :meth:` |
|
|
170 | Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``, | |
|
171 | which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients` | |
|
172 | require some more options, they cannot easily provide this interface. | |
|
173 | Instead, they provide the signature: | |
|
174 | ||
|
175 | .. sourcecode:: python | |
|
166 | communicate with the engines are built on top of it), is :meth:`View.apply`. | |
|
176 | 167 | |
|
177 | c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None, | |
|
178 | after=None, follow=None, timeout=None) | |
|
168 | We strive to provide the cleanest interface we can, so `apply` has the following | |
|
169 | signature: | |
|
179 | 170 | |
|
180 | Where various behavior is controlled via keyword arguments. This means that in the client, | |
|
181 | you must pass `args` as a tuple, and `kwargs` as a dict. | |
|
171 | .. sourcecode:: python | |
|
182 | 172 | |
|
183 | In order to provide the nicer interface, we have :class:`View` classes, which wrap | |
|
184 | :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine | |
|
185 | the extra keyword arguments. This means that the views can have the desired pattern: | |
|
173 | view.apply(f, *args, **kwargs) | |
|
186 | 174 | |
|
187 | .. sourcecode:: python | |
|
175 | There are various ways to call functions with IPython, and these flags are set as | |
|
176 | attributes of the View. The ``DirectView`` has just two of these flags: | |
|
188 | 177 | |
|
189 | v.apply(f, *args, **kwargs) | |
|
178 | dv.block : bool | |
|
179 | whether to wait for the result, or return an :class:`AsyncResult` object | |
|
180 | immediately | |
|
181 | dv.track : bool | |
|
182 | whether to instruct pyzmq to track when | |
|
183 | This is primarily useful for non-copying sends of numpy arrays that you plan to | |
|
184 | edit in-place. You need to know when it becomes safe to edit the buffer | |
|
185 | without corrupting the message. | |
|
190 | 186 | |
|
191 | 187 | |
|
192 | For instance, performing index-access on a client creates a | |
|
193 | :class:`.DirectView`. | |
|
188 | Creating a view is simple: index-access on a client creates a :class:`.DirectView`. | |
|
194 | 189 | |
|
195 | 190 | .. sourcecode:: ipython |
|
196 | 191 | |
@@ -198,23 +193,9 b' For instance, performing index-access on a client creates a' | |||
|
198 | 193 | Out[4]: <DirectView [1, 2]> |
|
199 | 194 | |
|
200 | 195 | In [5]: view.apply<tab> |
|
201 |
view.apply view.apply_async view.apply_ |
|
|
202 | ||
|
203 | A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound` | |
|
204 | and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x` | |
|
205 | methods allow specifying `bound` and `block` via the different methods. | |
|
196 | view.apply view.apply_async view.apply_sync view.apply_with_flags | |
|
206 | 197 | |
|
207 | ================== ========== ========== | |
|
208 | method block bound | |
|
209 | ================== ========== ========== | |
|
210 | apply self.block self.bound | |
|
211 | apply_sync True False | |
|
212 | apply_async False False | |
|
213 | apply_sync_bound True True | |
|
214 | apply_async_bound False True | |
|
215 | ================== ========== ========== | |
|
216 | ||
|
217 | For explanation of these values, read on. | |
|
198 | For convenience, you can set block temporarily for a single call with the extra sync/async methods. | |
|
218 | 199 | |
|
219 | 200 | Blocking execution |
|
220 | 201 | ------------------ |
@@ -232,63 +213,29 b' blocks until the engines are done executing the command:' | |||
|
232 | 213 | |
|
233 | 214 | In [5]: dview['b'] = 10 |
|
234 | 215 | |
|
235 |
In [6]: dview.apply |
|
|
216 | In [6]: dview.apply(lambda x: a+b+x, 27) | |
|
236 | 217 | Out[6]: [42, 42, 42, 42] |
|
237 | 218 | |
|
238 | Python commands can be executed on specific engines by calling execute using the ``targets`` | |
|
239 | keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by | |
|
240 | index-access to the client: | |
|
241 | ||
|
242 | .. sourcecode:: ipython | |
|
243 | ||
|
244 | In [6]: rc.execute('c=a+b', targets=[0,2]) | |
|
245 | ||
|
246 | In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3]) | |
|
247 | ||
|
248 | In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all') | |
|
249 | Out[8]: [15, -5, 15, -5] | |
|
250 | ||
|
251 | .. note:: | |
|
219 | You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync` | |
|
220 | method: | |
|
252 | 221 | |
|
253 | Note that every call to ``rc.<meth>(...,targets=x)`` can be made via | |
|
254 | ``rc[<x>].<meth>(...)``, which constructs a View object. The only place | |
|
255 | where this differs in in :meth:`apply`. The :class:`Client` takes many | |
|
256 | arguments to apply, so it requires `args` and `kwargs` to be passed as | |
|
257 | individual arguments. Extended options such as `bound`,`targets`, and | |
|
258 | `block` are controlled by the attributes of the :class:`View` objects, so | |
|
259 | they can provide the much more convenient | |
|
260 | :meth:`View.apply(f,*args,**kwargs)`, which simply calls | |
|
261 | ``f(*args,**kwargs)`` remotely. | |
|
222 | In [7]: dview.block=False | |
|
262 | 223 | |
|
263 | Bound and unbound execution | |
|
264 | --------------------------- | |
|
224 | In [8]: dview.apply_sync(lambda x: a+b+x, 27) | |
|
225 | Out[8]: [42, 42, 42, 42] | |
|
265 | 226 | |
|
266 | The previous example also shows one of the most important things about the IPython | |
|
267 | engines: they have a persistent user namespaces. The :meth:`apply` method can | |
|
268 | be run in either a bound or unbound manner. | |
|
227 | Python commands can be executed as strings on specific engines by using a View's ``execute`` | |
|
228 | method: | |
|
269 | 229 | |
|
270 | When applying a function in a `bound` manner, the first argument to that function | |
|
271 | will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary | |
|
272 | also providing attribute-access to keys. | |
|
230 | .. sourcecode:: ipython | |
|
273 | 231 | |
|
274 | In all (unbound and bound) execution | |
|
232 | In [6]: rc[::2].execute('c=a+b') | |
|
275 | 233 | |
|
276 | .. sourcecode:: ipython | |
|
234 | In [7]: rc[1::2].execute('c=a-b') | |
|
277 | 235 | |
|
278 | In [9]: dview['b'] = 5 # assign b to 5 everywhere | |
|
279 | ||
|
280 | In [10]: v0 = rc[0] | |
|
281 | ||
|
282 | # multiply b*2 inplace | |
|
283 | In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2) | |
|
284 | ||
|
285 | # b is still available in globals during unbound execution | |
|
286 | In [13]: v0.apply_sync(lambda a: a*b, 3) | |
|
287 | Out[13]: 30 | |
|
236 | In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True) | |
|
237 | Out[8]: [15, -5, 15, -5] | |
|
288 | 238 | |
|
289 | `bound=True` specifies that the engine's namespace is to be passed as the first argument when | |
|
290 | the function is called, and the default `bound=False` specifies that the normal behavior, but | |
|
291 | the engine's namespace will be available as the globals() when the function is called. | |
|
292 | 239 | |
|
293 | 240 | Non-blocking execution |
|
294 | 241 | ---------------------- |
@@ -351,22 +298,24 b' local Python/IPython session:' | |||
|
351 | 298 | .. Note:: |
|
352 | 299 | |
|
353 | 300 | Note the import inside the function. This is a common model, to ensure |
|
354 | that the appropriate modules are imported where the task is run. | |
|
301 | that the appropriate modules are imported where the task is run. You can | |
|
302 | also manually import modules into the engine(s) namespace(s) via | |
|
303 | :meth:`view.execute('import numpy')`. | |
|
355 | 304 | |
|
356 | 305 | Often, it is desirable to wait until a set of :class:`AsyncResult` objects |
|
357 |
are done. For this, there is a the method :meth:` |
|
|
306 | are done. For this, there is a the method :meth:`wait`. This method takes a | |
|
358 | 307 | tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History), |
|
359 | 308 | and blocks until all of the associated results are ready: |
|
360 | 309 | |
|
361 | 310 | .. sourcecode:: ipython |
|
362 | 311 | |
|
363 |
In [72]: |
|
|
312 | In [72]: dview.block=False | |
|
364 | 313 | |
|
365 | 314 | # A trivial list of AsyncResults objects |
|
366 | 315 | In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] |
|
367 | 316 | |
|
368 | 317 | # Wait until all of them are done |
|
369 |
In [74]: |
|
|
318 | In [74]: dview.wait(pr_list) | |
|
370 | 319 | |
|
371 | 320 | # Then, their results are ready using get() or the `.r` attribute |
|
372 | 321 | In [75]: pr_list[0].get() |
@@ -374,12 +323,12 b' and blocks until all of the associated results are ready:' | |||
|
374 | 323 | |
|
375 | 324 | |
|
376 | 325 | |
|
377 |
The ``block`` |
|
|
378 |
----------------------- |
|
|
326 | The ``block`` attribute | |
|
327 | ----------------------- | |
|
379 | 328 | |
|
380 |
M |
|
|
329 | Many View methods(excluding :meth:`apply`) accept | |
|
381 | 330 | ``block`` as a keyword argument. As we have seen above, these |
|
382 |
keyword arguments control the blocking mode. The :class:` |
|
|
331 | keyword arguments control the blocking mode. The :class:`View` class also has | |
|
383 | 332 | a :attr:`block` attribute that controls the default behavior when the keyword |
|
384 | 333 | argument is not provided. Thus the following logic is used for :attr:`block`: |
|
385 | 334 | |
@@ -387,37 +336,33 b' argument is not provided. Thus the following logic is used for :attr:`block`:' | |||
|
387 | 336 | * Keyword argument, if provided override the instance attributes for |
|
388 | 337 | the duration of a single call. |
|
389 | 338 | |
|
390 | DirectView objects also have a ``bound`` attribute, which is used in the same way. | |
|
391 | ||
|
392 | 339 | The following examples demonstrate how to use the instance attributes: |
|
393 | 340 | |
|
394 | 341 | .. sourcecode:: ipython |
|
395 | 342 | |
|
396 |
In [17]: |
|
|
343 | In [17]: dview.block = False | |
|
397 | 344 | |
|
398 |
In [18]: ar = |
|
|
345 | In [18]: ar = dview.apply(lambda : 10) | |
|
399 | 346 | |
|
400 | 347 | In [19]: ar.get() |
|
401 | Out[19]: [10,10] | |
|
348 | Out[19]: [10, 10, 10, 10] | |
|
402 | 349 | |
|
403 |
In [21]: |
|
|
350 | In [21]: dview.block = True | |
|
404 | 351 | |
|
405 | 352 | # Note targets='all' means all engines |
|
406 |
In [22]: |
|
|
353 | In [22]: dview.apply(lambda : 42) | |
|
407 | 354 | Out[22]: [42, 42, 42, 42] |
|
408 | 355 | |
|
409 |
The :attr:`block` |
|
|
356 | The :attr:`block` and :attr:`targets` instance attributes of the | |
|
410 | 357 | :class:`.DirectView` also determine the behavior of the parallel magic commands. |
|
411 | 358 | |
|
412 | ||
|
413 | 359 | Parallel magic commands |
|
414 | 360 | ----------------------- |
|
415 | 361 | |
|
416 | 362 | .. warning:: |
|
417 | 363 | |
|
418 |
The magics have not been changed to work with the zeromq system. |
|
|
419 | and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do | |
|
420 | not* print stdin/out. | |
|
364 | The magics have not been changed to work with the zeromq system. The | |
|
365 | magics do work, but *do not* print stdin/out like they used to in IPython.kernel. | |
|
421 | 366 | |
|
422 | 367 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) |
|
423 | 368 | that make it more pleasant to execute Python commands on the engines |
@@ -428,6 +373,9 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||
|
428 | 373 | |
|
429 | 374 | .. sourcecode:: ipython |
|
430 | 375 | |
|
376 | # load the parallel magic extension: | |
|
377 | In [21]: %load_ext parallelmagic | |
|
378 | ||
|
431 | 379 | # Create a DirectView for all targets |
|
432 | 380 | In [22]: dv = rc[:] |
|
433 | 381 | |
@@ -512,7 +460,7 b' Moving Python objects around' | |||
|
512 | 460 | In addition to calling functions and executing code on engines, you can |
|
513 | 461 | transfer Python objects to and from your IPython session and the engines. In |
|
514 | 462 | IPython, these operations are called :meth:`push` (sending an object to the |
|
515 |
engines) and :meth:`pull` (getting an object from the engines). |
|
|
463 | engines) and :meth:`pull` (getting an object from the engines). | |
|
516 | 464 | |
|
517 | 465 | Basic push and pull |
|
518 | 466 | ------------------- |
@@ -521,23 +469,19 b' Here are some examples of how you use :meth:`push` and :meth:`pull`:' | |||
|
521 | 469 | |
|
522 | 470 | .. sourcecode:: ipython |
|
523 | 471 | |
|
524 |
In [38]: |
|
|
472 | In [38]: dview.push(dict(a=1.03234,b=3453)) | |
|
525 | 473 | Out[38]: [None,None,None,None] |
|
526 | 474 | |
|
527 |
In [39]: |
|
|
475 | In [39]: dview.pull('a') | |
|
528 | 476 | Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] |
|
529 | 477 | |
|
530 |
In [40]: rc.pull('b' |
|
|
478 | In [40]: rc[0].pull('b') | |
|
531 | 479 | Out[40]: 3453 |
|
532 | 480 | |
|
533 |
In [41]: |
|
|
481 | In [41]: dview.pull(('a','b')) | |
|
534 | 482 | Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ] |
|
535 | 483 | |
|
536 | # zmq client does not have zip_pull | |
|
537 | In [42]: rc.zip_pull(('a','b')) | |
|
538 | Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)] | |
|
539 | ||
|
540 | In [43]: rc.push(dict(c='speed')) | |
|
484 | In [43]: dview.push(dict(c='speed')) | |
|
541 | 485 | Out[43]: [None,None,None,None] |
|
542 | 486 | |
|
543 | 487 | In non-blocking mode :meth:`push` and :meth:`pull` also return |
@@ -545,9 +489,7 b' In non-blocking mode :meth:`push` and :meth:`pull` also return' | |||
|
545 | 489 | |
|
546 | 490 | .. sourcecode:: ipython |
|
547 | 491 | |
|
548 |
In [4 |
|
|
549 | ||
|
550 | In [48]: ar = rc.pull('a') | |
|
492 | In [48]: ar = dview.pull('a', block=False) | |
|
551 | 493 | |
|
552 | 494 | In [49]: ar.get() |
|
553 | 495 | Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] |
@@ -563,8 +505,6 b' appear as a local dictionary. Underneath, these methods call :meth:`apply`:' | |||
|
563 | 505 | |
|
564 | 506 | .. sourcecode:: ipython |
|
565 | 507 | |
|
566 | In [50]: dview.block=True | |
|
567 | ||
|
568 | 508 | In [51]: dview['a']=['foo','bar'] |
|
569 | 509 | |
|
570 | 510 | In [52]: dview['a'] |
@@ -606,7 +546,7 b' basic effect using :meth:`scatter` and :meth:`gather`:' | |||
|
606 | 546 | |
|
607 | 547 | In [66]: dview.scatter('x',range(64)) |
|
608 | 548 | |
|
609 | In [67]: px y = [i**10 for i in x] | |
|
549 | In [67]: %px y = [i**10 for i in x] | |
|
610 | 550 | Parallel execution on engines: [0, 1, 2, 3] |
|
611 | 551 | Out[67]: |
|
612 | 552 | |
@@ -633,31 +573,47 b' more other types of exceptions. Here is how it works:' | |||
|
633 | 573 | In [77]: dview.execute('1/0') |
|
634 | 574 | --------------------------------------------------------------------------- |
|
635 | 575 | CompositeError Traceback (most recent call last) |
|
636 |
/ |
|
|
637 | ----> 1 dview.execute('1/0') | |
|
638 | ||
|
639 | ... | |
|
640 | ||
|
641 | /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout) | |
|
642 | 1012 raise ValueError(msg) | |
|
643 | 1013 else: | |
|
644 | -> 1014 return self._apply_direct(f, args, kwargs, **options) | |
|
645 | 1015 | |
|
646 | 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, | |
|
647 | ||
|
648 | /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets) | |
|
649 | 1100 if block: | |
|
650 | 1101 try: | |
|
651 | -> 1102 return ar.get() | |
|
652 | 1103 except KeyboardInterrupt: | |
|
653 | 1104 return ar | |
|
576 | /home/you/<ipython-input-10-15c2c22dec39> in <module>() | |
|
577 | ----> 1 dview.execute('1/0', block=True) | |
|
578 | ||
|
579 | /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block) | |
|
580 | 460 default: self.block | |
|
581 | 461 """ | |
|
582 | --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block) | |
|
583 | 463 | |
|
584 | 464 def run(self, filename, block=None): | |
|
585 | ||
|
586 | /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track) | |
|
587 | ||
|
588 | /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs) | |
|
589 | 46 def sync_results(f, self, *args, **kwargs): | |
|
590 | 47 """sync relevant results from self.client to our results attribute.""" | |
|
591 | ---> 48 ret = f(self, *args, **kwargs) | |
|
592 | 49 delta = self.outstanding.difference(self.client.outstanding) | |
|
593 | 50 completed = self.outstanding.intersection(delta) | |
|
594 | ||
|
595 | /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track) | |
|
596 | ||
|
597 | /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs) | |
|
598 | 35 n_previous = len(self.client.history) | |
|
599 | 36 try: | |
|
600 | ---> 37 ret = f(self, *args, **kwargs) | |
|
601 | 38 finally: | |
|
602 | 39 nmsgs = len(self.client.history) - n_previous | |
|
603 | ||
|
604 | /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track) | |
|
605 | 398 if block: | |
|
606 | 399 try: | |
|
607 | --> 400 return ar.get() | |
|
608 | 401 except KeyboardInterrupt: | |
|
609 | 402 pass | |
|
654 | 610 | |
|
655 |
/ |
|
|
656 |
|
|
|
657 |
|
|
|
658 |
---> 8 |
|
|
659 |
|
|
|
660 |
|
|
|
611 | /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) | |
|
612 | 87 return self._result | |
|
613 | 88 else: | |
|
614 | ---> 89 raise self._exception | |
|
615 | 90 else: | |
|
616 | 91 raise error.TimeoutError("Result not ready.") | |
|
661 | 617 | |
|
662 | 618 | CompositeError: one or more exceptions from call to method: _execute |
|
663 | 619 | [0:apply]: ZeroDivisionError: integer division or modulo by zero |
@@ -665,6 +621,7 b' more other types of exceptions. Here is how it works:' | |||
|
665 | 621 | [2:apply]: ZeroDivisionError: integer division or modulo by zero |
|
666 | 622 | [3:apply]: ZeroDivisionError: integer division or modulo by zero |
|
667 | 623 | |
|
624 | ||
|
668 | 625 | Notice how the error message printed when :exc:`CompositeError` is raised has |
|
669 | 626 | information about the individual exceptions that were raised on each engine. |
|
670 | 627 | If you want, you can even raise one of these original exceptions: |
@@ -672,7 +629,7 b' If you want, you can even raise one of these original exceptions:' | |||
|
672 | 629 | .. sourcecode:: ipython |
|
673 | 630 | |
|
674 | 631 | In [80]: try: |
|
675 |
....: |
|
|
632 | ....: dview.execute('1/0') | |
|
676 | 633 | ....: except client.CompositeError, e: |
|
677 | 634 | ....: e.raise_exception() |
|
678 | 635 | ....: |
@@ -697,57 +654,50 b' instance:' | |||
|
697 | 654 | |
|
698 | 655 | .. sourcecode:: ipython |
|
699 | 656 | |
|
700 |
In [81]: |
|
|
657 | In [81]: dview.execute('1/0') | |
|
701 | 658 | --------------------------------------------------------------------------- |
|
702 | 659 | CompositeError Traceback (most recent call last) |
|
703 |
/ |
|
|
704 |
----> 1 |
|
|
705 | ||
|
706 | /Users/minrk/<string> in execute(self, code, targets, block) | |
|
707 | ||
|
708 | /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs) | |
|
709 | 88 self.block = block | |
|
710 | 89 try: | |
|
711 | ---> 90 ret = f(self, *args, **kwargs) | |
|
712 | 91 finally: | |
|
713 | 92 self.block = saveblock | |
|
714 | ||
|
715 | /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in execute(self, code, targets, block) | |
|
716 | 855 default: self.block | |
|
717 | 856 """ | |
|
718 | --> 857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) | |
|
719 | 858 if not block: | |
|
720 | 859 return result | |
|
721 | ||
|
722 |
/ |
|
|
723 | ||
|
724 |
/ |
|
|
725 | 88 self.block = block | |
|
726 |
|
|
|
727 |
---> |
|
|
728 |
|
|
|
729 | 92 self.block = saveblock | |
|
730 | ||
|
731 |
/ |
|
|
732 | 1012 raise ValueError(msg) | |
|
733 |
|
|
|
734 | -> 1014 return self._apply_direct(f, args, kwargs, **options) | |
|
735 | 1015 | |
|
736 | 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, | |
|
737 | ||
|
738 | /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets) | |
|
739 | 1100 if block: | |
|
740 | 1101 try: | |
|
741 | -> 1102 return ar.get() | |
|
742 | 1103 except KeyboardInterrupt: | |
|
743 | 1104 return ar | |
|
660 | /home/you/<ipython-input-10-15c2c22dec39> in <module>() | |
|
661 | ----> 1 dview.execute('1/0', block=True) | |
|
662 | ||
|
663 | /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block) | |
|
664 | 460 default: self.block | |
|
665 | 461 """ | |
|
666 | --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block) | |
|
667 | 463 | |
|
668 | 464 def run(self, filename, block=None): | |
|
669 | ||
|
670 | /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track) | |
|
671 | ||
|
672 | /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs) | |
|
673 | 46 def sync_results(f, self, *args, **kwargs): | |
|
674 | 47 """sync relevant results from self.client to our results attribute.""" | |
|
675 | ---> 48 ret = f(self, *args, **kwargs) | |
|
676 | 49 delta = self.outstanding.difference(self.client.outstanding) | |
|
677 | 50 completed = self.outstanding.intersection(delta) | |
|
678 | ||
|
679 | /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track) | |
|
680 | ||
|
681 | /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs) | |
|
682 | 35 n_previous = len(self.client.history) | |
|
683 | 36 try: | |
|
684 | ---> 37 ret = f(self, *args, **kwargs) | |
|
685 | 38 finally: | |
|
686 | 39 nmsgs = len(self.client.history) - n_previous | |
|
687 | ||
|
688 | /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track) | |
|
689 | 398 if block: | |
|
690 | 399 try: | |
|
691 | --> 400 return ar.get() | |
|
692 | 401 except KeyboardInterrupt: | |
|
693 | 402 pass | |
|
744 | 694 | |
|
745 |
/ |
|
|
746 |
|
|
|
747 |
|
|
|
748 |
---> 8 |
|
|
749 |
|
|
|
750 |
|
|
|
695 | /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) | |
|
696 | 87 return self._result | |
|
697 | 88 else: | |
|
698 | ---> 89 raise self._exception | |
|
699 | 90 else: | |
|
700 | 91 raise error.TimeoutError("Result not ready.") | |
|
751 | 701 | |
|
752 | 702 | CompositeError: one or more exceptions from call to method: _execute |
|
753 | 703 | [0:apply]: ZeroDivisionError: integer division or modulo by zero |
@@ -815,14 +765,18 b' instance:' | |||
|
815 | 765 | ZeroDivisionError: integer division or modulo by zero |
|
816 | 766 | |
|
817 | 767 | |
|
768 | .. note:: | |
|
769 | ||
|
770 | TODO: The above tracebacks are not up to date | |
|
771 | ||
|
818 | 772 | |
|
819 | 773 | All of this same error handling magic even works in non-blocking mode: |
|
820 | 774 | |
|
821 | 775 | .. sourcecode:: ipython |
|
822 | 776 | |
|
823 |
In [83]: |
|
|
777 | In [83]: dview.block=False | |
|
824 | 778 | |
|
825 |
In [84]: ar = |
|
|
779 | In [84]: ar = dview.execute('1/0') | |
|
826 | 780 | |
|
827 | 781 | In [85]: ar.get() |
|
828 | 782 | --------------------------------------------------------------------------- |
@@ -1,4 +1,4 b'' | |||
|
1 | .. _paralleltask: | |
|
1 | .. _parallel_task: | |
|
2 | 2 | |
|
3 | 3 | ========================== |
|
4 | 4 | The IPython task interface |
@@ -54,11 +54,12 b' argument to the constructor:' | |||
|
54 | 54 | # or to connect with a specific profile you have set up: |
|
55 | 55 | In [3]: rc = client.Client(profile='mpi') |
|
56 | 56 | |
|
57 |
For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can |
|
|
57 | For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can | |
|
58 | be constructed via the client's :meth:`load_balanced_view` method: | |
|
58 | 59 | |
|
59 | 60 | .. sourcecode:: ipython |
|
60 | 61 | |
|
61 | In [4]: lview = rc.view() # default load-balanced view | |
|
62 | In [4]: lview = rc.load_balanced_view() # default load-balanced view | |
|
62 | 63 | |
|
63 | 64 | .. seealso:: |
|
64 | 65 | |
@@ -110,6 +111,8 b' that turns any Python function into a parallel function:' | |||
|
110 | 111 | In [11]: f.map(range(32)) # this is done in parallel |
|
111 | 112 | Out[11]: [0.0,10.0,160.0,...] |
|
112 | 113 | |
|
114 | .. _parallel_dependencies: | |
|
115 | ||
|
113 | 116 | Dependencies |
|
114 | 117 | ============ |
|
115 | 118 | |
@@ -230,12 +233,18 b' any|all' | |||
|
230 | 233 | only after *all* of them have finished. This is set by a Dependency's :attr:`all` |
|
231 | 234 | boolean attribute, which defaults to ``True``. |
|
232 | 235 | |
|
233 | success_only | |
|
234 |
Whether to consider |
|
|
235 | Sometimes you want to run a task after another, but only if that task succeeded. In | |
|
236 | this case, ``success_only`` should be ``True``. However sometimes you may not care | |
|
237 | whether the task succeeds, and always want the second task to run, in which case | |
|
238 | you should use `success_only=False`. The default behavior is to only use successes. | |
|
236 | success [default: True] | |
|
237 | Whether to consider tasks that succeeded as fulfilling dependencies. | |
|
238 | ||
|
239 | failure [default : False] | |
|
240 | Whether to consider tasks that failed as fulfilling dependencies. | |
|
241 | using `failure=True,success=False` is useful for setting up cleanup tasks, to be run | |
|
242 | only when tasks have failed. | |
|
243 | ||
|
244 | Sometimes you want to run a task after another, but only if that task succeeded. In this case, | |
|
245 | ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may | |
|
246 | not care whether the task succeeds, and always want the second task to run, in which case you | |
|
247 | should use `success=failure=True`. The default behavior is to only use successes. | |
|
239 | 248 | |
|
240 | 249 | There are other switches for interpretation that are made at the *task* level. These are |
|
241 | 250 | specified via keyword arguments to the client's :meth:`apply` method. |
@@ -258,7 +267,7 b' timeout' | |||
|
258 | 267 | Dependencies only work within the task scheduler. You cannot instruct a load-balanced |
|
259 | 268 | task to run after a job submitted via the MUX interface. |
|
260 | 269 | |
|
261 |
The simplest form of Dependencies is with `all=True,success |
|
|
270 | The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases, | |
|
262 | 271 | you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the |
|
263 | 272 | `follow` and `after` keywords to :meth:`client.apply`: |
|
264 | 273 | |
@@ -266,13 +275,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||
|
266 | 275 | |
|
267 | 276 | In [14]: client.block=False |
|
268 | 277 | |
|
269 |
In [15]: ar = |
|
|
278 | In [15]: ar = lview.apply(f, args, kwargs) | |
|
270 | 279 | |
|
271 |
In [16]: ar2 = |
|
|
280 | In [16]: ar2 = lview.apply(f2) | |
|
272 | 281 | |
|
273 |
In [17]: ar3 = |
|
|
282 | In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2]) | |
|
274 | 283 | |
|
275 |
In [17]: ar4 = |
|
|
284 | In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5) | |
|
276 | 285 | |
|
277 | 286 | |
|
278 | 287 | .. seealso:: |
@@ -297,8 +306,8 b' The basic cases that are checked:' | |||
|
297 | 306 | |
|
298 | 307 | * depending on nonexistent messages |
|
299 | 308 | * `follow` dependencies were run on more than one machine and `all=True` |
|
300 |
* any dependencies failed and `all=True,success |
|
|
301 |
* all dependencies failed and `all=False,success |
|
|
309 | * any dependencies failed and `all=True,success=True,failures=False` | |
|
310 | * all dependencies failed and `all=False,success=True,failure=False` | |
|
302 | 311 | |
|
303 | 312 | .. warning:: |
|
304 | 313 | |
@@ -386,27 +395,25 b' Disabled features when using the ZMQ Scheduler:' | |||
|
386 | 395 | More details |
|
387 | 396 | ============ |
|
388 | 397 | |
|
389 |
The :class:` |
|
|
398 | The :class:`LoadBalancedView` has many more powerful features that allow quite a bit | |
|
390 | 399 | of flexibility in how tasks are defined and run. The next places to look are |
|
391 | 400 | in the following classes: |
|
392 | 401 | |
|
393 |
* :class:`IPython.zmq.parallel. |
|
|
402 | * :class:`IPython.zmq.parallel.view.LoadBalancedView` | |
|
394 | 403 | * :class:`IPython.zmq.parallel.client.AsyncResult` |
|
395 |
* :meth:`IPython.zmq.parallel. |
|
|
404 | * :meth:`IPython.zmq.parallel.view.LoadBalancedView.apply` | |
|
396 | 405 | * :mod:`IPython.zmq.parallel.dependency` |
|
397 | 406 | |
|
398 | 407 | The following is an overview of how to use these classes together: |
|
399 | 408 | |
|
400 |
1. Create a :class:`Client` |
|
|
409 | 1. Create a :class:`Client` and :class:`LoadBalancedView` | |
|
401 | 410 | 2. Define some functions to be run as tasks |
|
402 | 411 | 3. Submit your tasks to using the :meth:`apply` method of your |
|
403 | :class:`Client` instance, specifying `balanced=True`. This signals | |
|
404 | the :class:`Client` to entrust the Scheduler with assigning tasks to engines. | |
|
405 | 4. Use :meth:`Client.get_results` to get the results of the | |
|
412 | :class:`LoadBalancedView` instance. | |
|
413 | 4. Use :meth:`Client.get_result` to get the results of the | |
|
406 | 414 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
|
407 | 415 | for and then receive the results. |
|
408 | 416 | |
|
409 | ||
|
410 | 417 | .. seealso:: |
|
411 | 418 | |
|
412 | 419 | A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython. |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now