##// END OF EJS Templates
update API after sagedays29...
MinRK -
Show More
@@ -0,0 +1,69 b''
1 """Tests for asyncresult.py"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14
15 from IPython.zmq.parallel.error import TimeoutError
16
17 from IPython.zmq.parallel.tests import add_engines
18 from .clienttest import ClusterTestCase
19
20 def setup():
21 add_engines(2)
22
23 def wait(n):
24 import time
25 time.sleep(n)
26 return n
27
28 class AsyncResultTest(ClusterTestCase):
29
30 def test_single_result(self):
31 eid = self.client.ids[-1]
32 ar = self.client[eid].apply_async(lambda : 42)
33 self.assertEquals(ar.get(), 42)
34 ar = self.client[[eid]].apply_async(lambda : 42)
35 self.assertEquals(ar.get(), [42])
36 ar = self.client[-1:].apply_async(lambda : 42)
37 self.assertEquals(ar.get(), [42])
38
39 def test_get_after_done(self):
40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 ar.wait()
43 self.assertTrue(ar.ready())
44 self.assertEquals(ar.get(), 42)
45 self.assertEquals(ar.get(), 42)
46
47 def test_get_before_done(self):
48 ar = self.client[-1].apply_async(wait, 0.1)
49 self.assertRaises(TimeoutError, ar.get, 0)
50 ar.wait(0)
51 self.assertFalse(ar.ready())
52 self.assertEquals(ar.get(), 0.1)
53
54 def test_get_after_error(self):
55 ar = self.client[-1].apply_async(lambda : 1/0)
56 ar.wait()
57 self.assertRaisesRemote(ZeroDivisionError, ar.get)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
60
61 def test_get_dict(self):
62 n = len(self.client)
63 ar = self.client[:].apply_async(lambda : 5)
64 self.assertEquals(ar.get(), [5]*n)
65 d = ar.get_dict()
66 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
67 for eid,r in d.iteritems():
68 self.assertEquals(r, 5)
69
@@ -0,0 +1,101 b''
1 """Tests for dependency.py"""
2
3 __docformat__ = "restructuredtext en"
4
5 #-------------------------------------------------------------------------------
6 # Copyright (C) 2011 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
10 #-------------------------------------------------------------------------------
11
12 #-------------------------------------------------------------------------------
13 # Imports
14 #-------------------------------------------------------------------------------
15
16 # import
17 import os
18
19 from IPython.utils.pickleutil import can, uncan
20
21 from IPython.zmq.parallel import dependency as dmod
22 from IPython.zmq.parallel.util import interactive
23
24 from IPython.zmq.parallel.tests import add_engines
25 from .clienttest import ClusterTestCase
26
27 def setup():
28 add_engines(1)
29
30 @dmod.require('time')
31 def wait(n):
32 time.sleep(n)
33 return n
34
35 mixed = map(str, range(10))
36 completed = map(str, range(0,10,2))
37 failed = map(str, range(1,10,2))
38
39 class DependencyTest(ClusterTestCase):
40
41 def setUp(self):
42 ClusterTestCase.setUp(self)
43 self.user_ns = {'__builtins__' : __builtins__}
44 self.view = self.client.load_balanced_view()
45 self.dview = self.client[-1]
46 self.succeeded = set(map(str, range(0,25,2)))
47 self.failed = set(map(str, range(1,25,2)))
48
49 def assertMet(self, dep):
50 self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met")
51
52 def assertUnmet(self, dep):
53 self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met")
54
55 def assertUnreachable(self, dep):
56 self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable")
57
58 def assertReachable(self, dep):
59 self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable")
60
61 def cancan(self, f):
62 """decorator to pass through canning into self.user_ns"""
63 return uncan(can(f), self.user_ns)
64
65 def test_require_imports(self):
66 """test that @require imports names"""
67 @self.cancan
68 @dmod.require('urllib')
69 @interactive
70 def encode(dikt):
71 return urllib.urlencode(dikt)
72 # must pass through canning to properly connect namespaces
73 self.assertEquals(encode(dict(a=5)), 'a=5')
74
75 def test_success_only(self):
76 dep = dmod.Dependency(mixed, success=True, failure=False)
77 self.assertUnmet(dep)
78 self.assertUnreachable(dep)
79 dep.all=False
80 self.assertMet(dep)
81 self.assertReachable(dep)
82 dep = dmod.Dependency(completed, success=True, failure=False)
83 self.assertMet(dep)
84 self.assertReachable(dep)
85 dep.all=False
86 self.assertMet(dep)
87 self.assertReachable(dep)
88
89 def test_failure_only(self):
90 dep = dmod.Dependency(mixed, success=False, failure=True)
91 self.assertUnmet(dep)
92 self.assertUnreachable(dep)
93 dep.all=False
94 self.assertMet(dep)
95 self.assertReachable(dep)
96 dep = dmod.Dependency(completed, success=False, failure=True)
97 self.assertUnmet(dep)
98 self.assertUnreachable(dep)
99 dep.all=False
100 self.assertUnmet(dep)
101 self.assertUnreachable(dep)
@@ -0,0 +1,287 b''
1 """test View objects"""
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
8
9 #-------------------------------------------------------------------------------
10 # Imports
11 #-------------------------------------------------------------------------------
12
13 import time
14 from tempfile import mktemp
15
16 import zmq
17
18 from IPython.zmq.parallel import client as clientmod
19 from IPython.zmq.parallel import error
20 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult, AsyncMapResult
21 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
22 from IPython.zmq.parallel.util import interactive
23
24 from IPython.zmq.parallel.tests import add_engines
25
26 from .clienttest import ClusterTestCase, segfault, wait, skip_without
27
28 def setup():
29 add_engines(3)
30
31 class TestView(ClusterTestCase):
32
33 def test_segfault_task(self):
34 """test graceful handling of engine death (balanced)"""
35 # self.add_engines(1)
36 ar = self.client[-1].apply_async(segfault)
37 self.assertRaisesRemote(error.EngineError, ar.get)
38 eid = ar.engine_id
39 while eid in self.client.ids:
40 time.sleep(.01)
41 self.client.spin()
42
43 def test_segfault_mux(self):
44 """test graceful handling of engine death (direct)"""
45 # self.add_engines(1)
46 eid = self.client.ids[-1]
47 ar = self.client[eid].apply_async(segfault)
48 self.assertRaisesRemote(error.EngineError, ar.get)
49 eid = ar.engine_id
50 while eid in self.client.ids:
51 time.sleep(.01)
52 self.client.spin()
53
54 def test_push_pull(self):
55 """test pushing and pulling"""
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 t = self.client.ids[-1]
58 v = self.client[t]
59 push = v.push
60 pull = v.pull
61 v.block=True
62 nengines = len(self.client)
63 push({'data':data})
64 d = pull('data')
65 self.assertEquals(d, data)
66 self.client[:].push({'data':data})
67 d = self.client[:].pull('data', block=True)
68 self.assertEquals(d, nengines*[data])
69 ar = push({'data':data}, block=False)
70 self.assertTrue(isinstance(ar, AsyncResult))
71 r = ar.get()
72 ar = self.client[:].pull('data', block=False)
73 self.assertTrue(isinstance(ar, AsyncResult))
74 r = ar.get()
75 self.assertEquals(r, nengines*[data])
76 self.client[:].push(dict(a=10,b=20))
77 r = self.client[:].pull(('a','b'))
78 self.assertEquals(r, nengines*[[10,20]])
79
80 def test_push_pull_function(self):
81 "test pushing and pulling functions"
82 def testf(x):
83 return 2.0*x
84
85 t = self.client.ids[-1]
86 self.client[t].block=True
87 push = self.client[t].push
88 pull = self.client[t].pull
89 execute = self.client[t].execute
90 push({'testf':testf})
91 r = pull('testf')
92 self.assertEqual(r(1.0), testf(1.0))
93 execute('r = testf(10)')
94 r = pull('r')
95 self.assertEquals(r, testf(10))
96 ar = self.client[:].push({'testf':testf}, block=False)
97 ar.get()
98 ar = self.client[:].pull('testf', block=False)
99 rlist = ar.get()
100 for r in rlist:
101 self.assertEqual(r(1.0), testf(1.0))
102 execute("def g(x): return x*x")
103 r = pull(('testf','g'))
104 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
105
106 def test_push_function_globals(self):
107 """test that pushed functions have access to globals"""
108 @interactive
109 def geta():
110 return a
111 # self.add_engines(1)
112 v = self.client[-1]
113 v.block=True
114 v['f'] = geta
115 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
116 v.execute('a=5')
117 v.execute('b=f()')
118 self.assertEquals(v['b'], 5)
119
120 def test_push_function_defaults(self):
121 """test that pushed functions preserve default args"""
122 def echo(a=10):
123 return a
124 v = self.client[-1]
125 v.block=True
126 v['f'] = echo
127 v.execute('b=f()')
128 self.assertEquals(v['b'], 10)
129
130 def test_get_result(self):
131 """test getting results from the Hub."""
132 c = clientmod.Client(profile='iptest')
133 # self.add_engines(1)
134 t = c.ids[-1]
135 v = c[t]
136 v2 = self.client[t]
137 ar = v.apply_async(wait, 1)
138 # give the monitor time to notice the message
139 time.sleep(.25)
140 ahr = v2.get_result(ar.msg_ids)
141 self.assertTrue(isinstance(ahr, AsyncHubResult))
142 self.assertEquals(ahr.get(), ar.get())
143 ar2 = v2.get_result(ar.msg_ids)
144 self.assertFalse(isinstance(ar2, AsyncHubResult))
145 c.spin()
146 c.close()
147
148 def test_run_newline(self):
149 """test that run appends newline to files"""
150 tmpfile = mktemp()
151 with open(tmpfile, 'w') as f:
152 f.write("""def g():
153 return 5
154 """)
155 v = self.client[-1]
156 v.run(tmpfile, block=True)
157 self.assertEquals(v.apply_sync(lambda f: f(), clientmod.Reference('g')), 5)
158
159 def test_apply_tracked(self):
160 """test tracking for apply"""
161 # self.add_engines(1)
162 t = self.client.ids[-1]
163 v = self.client[t]
164 v.block=False
165 def echo(n=1024*1024, **kwargs):
166 with v.temp_flags(**kwargs):
167 return v.apply(lambda x: x, 'x'*n)
168 ar = echo(1, track=False)
169 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
170 self.assertTrue(ar.sent)
171 ar = echo(track=True)
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertEquals(ar.sent, ar._tracker.done)
174 ar._tracker.wait()
175 self.assertTrue(ar.sent)
176
177 def test_push_tracked(self):
178 t = self.client.ids[-1]
179 ns = dict(x='x'*1024*1024)
180 v = self.client[t]
181 ar = v.push(ns, block=False, track=False)
182 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertTrue(ar.sent)
184
185 ar = v.push(ns, block=False, track=True)
186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertEquals(ar.sent, ar._tracker.done)
188 ar._tracker.wait()
189 self.assertTrue(ar.sent)
190 ar.get()
191
192 def test_scatter_tracked(self):
193 t = self.client.ids
194 x='x'*1024*1024
195 ar = self.client[t].scatter('x', x, block=False, track=False)
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(ar.sent)
198
199 ar = self.client[t].scatter('x', x, block=False, track=True)
200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertEquals(ar.sent, ar._tracker.done)
202 ar._tracker.wait()
203 self.assertTrue(ar.sent)
204 ar.get()
205
206 def test_remote_reference(self):
207 v = self.client[-1]
208 v['a'] = 123
209 ra = clientmod.Reference('a')
210 b = v.apply_sync(lambda x: x, ra)
211 self.assertEquals(b, 123)
212
213
214 def test_scatter_gather(self):
215 view = self.client[:]
216 seq1 = range(16)
217 view.scatter('a', seq1)
218 seq2 = view.gather('a', block=True)
219 self.assertEquals(seq2, seq1)
220 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
221
222 @skip_without('numpy')
223 def test_scatter_gather_numpy(self):
224 import numpy
225 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
226 view = self.client[:]
227 a = numpy.arange(64)
228 view.scatter('a', a)
229 b = view.gather('a', block=True)
230 assert_array_equal(b, a)
231
232 def test_map(self):
233 view = self.client[:]
234 def f(x):
235 return x**2
236 data = range(16)
237 r = view.map_sync(f, data)
238 self.assertEquals(r, map(f, data))
239
240 def test_scatterGatherNonblocking(self):
241 data = range(16)
242 view = self.client[:]
243 view.scatter('a', data, block=False)
244 ar = view.gather('a', block=False)
245 self.assertEquals(ar.get(), data)
246
247 @skip_without('numpy')
248 def test_scatter_gather_numpy_nonblocking(self):
249 import numpy
250 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
251 a = numpy.arange(64)
252 view = self.client[:]
253 ar = view.scatter('a', a, block=False)
254 self.assertTrue(isinstance(ar, AsyncResult))
255 amr = view.gather('a', block=False)
256 self.assertTrue(isinstance(amr, AsyncMapResult))
257 assert_array_equal(amr.get(), a)
258
259 def test_execute(self):
260 view = self.client[:]
261 # self.client.debug=True
262 execute = view.execute
263 ar = execute('c=30', block=False)
264 self.assertTrue(isinstance(ar, AsyncResult))
265 ar = execute('d=[0,1,2]', block=False)
266 self.client.wait(ar, 1)
267 self.assertEquals(len(ar.get()), len(self.client))
268 for c in view['c']:
269 self.assertEquals(c, 30)
270
271 def test_abort(self):
272 view = self.client[-1]
273 ar = view.execute('import time; time.sleep(0.25)', block=False)
274 ar2 = view.apply_async(lambda : 2)
275 ar3 = view.apply_async(lambda : 3)
276 view.abort(ar2)
277 view.abort(ar3.msg_ids)
278 self.assertRaises(error.TaskAborted, ar2.get)
279 self.assertRaises(error.TaskAborted, ar3.get)
280
281 def test_temp_flags(self):
282 view = self.client[-1]
283 view.block=True
284 with view.temp_flags(block=False):
285 self.assertFalse(view.block)
286 self.assertTrue(view.block)
287
@@ -0,0 +1,97 b''
1 """
2 An exceptionally lousy site spider
3 Ken Kinder <ken@kenkinder.com>
4
5 Updated for newparallel by Min Ragan-Kelley <benjaminrk@gmail.com>
6
7 This module gives an example of how the task interface to the
8 IPython controller works. Before running this script start the IPython controller
9 and some engines using something like::
10
11 ipclusterz start -n 4
12 """
13 import sys
14 from IPython.zmq.parallel import client, error
15 import time
16 import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier
17
18 def fetchAndParse(url, data=None):
19 import urllib2
20 import urlparse
21 import BeautifulSoup
22 links = []
23 try:
24 page = urllib2.urlopen(url, data=data)
25 except Exception:
26 return links
27 else:
28 if page.headers.type == 'text/html':
29 doc = BeautifulSoup.BeautifulSoup(page.read())
30 for node in doc.findAll('a'):
31 href = node.get('href', None)
32 if href:
33 links.append(urlparse.urljoin(url, href))
34 return links
35
36 class DistributedSpider(object):
37
38 # Time to wait between polling for task results.
39 pollingDelay = 0.5
40
41 def __init__(self, site):
42 self.client = client.Client()
43 self.view = self.client.load_balanced_view()
44 self.mux = self.client[:]
45
46 self.allLinks = []
47 self.linksWorking = {}
48 self.linksDone = {}
49
50 self.site = site
51
52 def visitLink(self, url):
53 if url not in self.allLinks:
54 self.allLinks.append(url)
55 if url.startswith(self.site):
56 print ' ', url
57 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
58
59 def onVisitDone(self, links, url):
60 print url, ':'
61 self.linksDone[url] = None
62 del self.linksWorking[url]
63 for link in links:
64 self.visitLink(link)
65
66 def run(self):
67 self.visitLink(self.site)
68 while self.linksWorking:
69 print len(self.linksWorking), 'pending...'
70 self.synchronize()
71 time.sleep(self.pollingDelay)
72
73 def synchronize(self):
74 for url, ar in self.linksWorking.items():
75 # Calling get_task_result with block=False will return None if the
76 # task is not done yet. This provides a simple way of polling.
77 try:
78 links = ar.get(0)
79 except error.TimeoutError:
80 continue
81 except Exception as e:
82 self.linksDone[url] = None
83 del self.linksWorking[url]
84 print url, ':', e.traceback
85 else:
86 self.onVisitDone(links, url)
87
88 def main():
89 if len(sys.argv) > 1:
90 site = sys.argv[1]
91 else:
92 site = raw_input('Enter site to crawl: ')
93 distributedSpider = DistributedSpider(site)
94 distributedSpider.run()
95
96 if __name__ == '__main__':
97 main()
@@ -0,0 +1,19 b''
1 """
2 A Distributed Hello world
3 Ken Kinder <ken@kenkinder.com>
4 """
5 from IPython.zmq.parallel import client
6
7 rc = client.Client()
8
9 def sleep_and_echo(t, msg):
10 import time
11 time.sleep(t)
12 return msg
13
14 view = rc.load_balanced_view()
15
16 world = view.apply_async(sleep_and_echo, 3, 'World!')
17 hello = view.apply_async(sleep_and_echo, 2, 'Hello')
18 print "Submitted tasks:", hello.msg_ids, world.msg_ids
19 print hello.get(), world.get()
@@ -15,10 +15,9 b' __docformat__ = "restructuredtext en"'
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 from types import FunctionType
19 18 import copy
20
21 from IPython.zmq.parallel.dependency import dependent
19 import sys
20 from types import FunctionType
22 21
23 22 import codeutil
24 23
@@ -67,12 +66,22 b' class CannedFunction(CannedObject):'
67 66 self._checkType(f)
68 67 self.code = f.func_code
69 68 self.defaults = f.func_defaults
69 self.module = f.__module__ or '__main__'
70 70 self.__name__ = f.__name__
71 71
72 72 def _checkType(self, obj):
73 73 assert isinstance(obj, FunctionType), "Not a function type"
74 74
75 75 def getObject(self, g=None):
76 # try to load function back into its module:
77 if not self.module.startswith('__'):
78 try:
79 __import__(self.module)
80 except ImportError:
81 pass
82 else:
83 g = sys.modules[self.module].__dict__
84
76 85 if g is None:
77 86 g = globals()
78 87 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
@@ -82,8 +91,9 b' class CannedFunction(CannedObject):'
82 91 # Functions
83 92 #-------------------------------------------------------------------------------
84 93
85
86 94 def can(obj):
95 # import here to prevent module-level circular imports
96 from IPython.zmq.parallel.dependency import dependent
87 97 if isinstance(obj, dependent):
88 98 keys = ('f','df')
89 99 return CannedObject(obj, keys=keys)
@@ -12,6 +12,8 b''
12 12
13 13 import time
14 14
15 from zmq import MessageTracker
16
15 17 from IPython.external.decorator import decorator
16 18 from . import error
17 19
@@ -19,6 +21,9 b' from . import error'
19 21 # Classes
20 22 #-----------------------------------------------------------------------------
21 23
24 # global empty tracker that's always done:
25 finished_tracker = MessageTracker()
26
22 27 @decorator
23 28 def check_ready(f, self, *args, **kwargs):
24 29 """Call spin() to sync state prior to calling the method."""
@@ -36,18 +41,26 b' class AsyncResult(object):'
36 41 msg_ids = None
37 42 _targets = None
38 43 _tracker = None
44 _single_result = False
39 45
40 46 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
41 self._client = client
42 47 if isinstance(msg_ids, basestring):
48 # always a list
43 49 msg_ids = [msg_ids]
50 if tracker is None:
51 # default to always done
52 tracker = finished_tracker
53 self._client = client
44 54 self.msg_ids = msg_ids
45 55 self._fname=fname
46 56 self._targets = targets
47 57 self._tracker = tracker
48 58 self._ready = False
49 59 self._success = None
50 self._single_result = len(msg_ids) == 1
60 if len(msg_ids) == 1:
61 self._single_result = not isinstance(targets, (list, tuple))
62 else:
63 self._single_result = False
51 64
52 65 def __repr__(self):
53 66 if self._ready:
@@ -99,7 +112,7 b' class AsyncResult(object):'
99 112 """
100 113 if self._ready:
101 114 return
102 self._ready = self._client.barrier(self.msg_ids, timeout)
115 self._ready = self._client.wait(self.msg_ids, timeout)
103 116 if self._ready:
104 117 try:
105 118 results = map(self._client.results.get, self.msg_ids)
@@ -149,10 +162,9 b' class AsyncResult(object):'
149 162 return dict(zip(engine_ids,results))
150 163
151 164 @property
152 @check_ready
153 165 def result(self):
154 166 """result property wrapper for `get(timeout=0)`."""
155 return self._result
167 return self.get()
156 168
157 169 # abbreviated alias:
158 170 r = result
@@ -169,7 +181,7 b' class AsyncResult(object):'
169 181 @property
170 182 def result_dict(self):
171 183 """result property as a dict."""
172 return self.get_dict(0)
184 return self.get_dict()
173 185
174 186 def __dict__(self):
175 187 return self.get_dict(0)
@@ -181,12 +193,18 b' class AsyncResult(object):'
181 193
182 194 @property
183 195 def sent(self):
184 """check whether my messages have been sent"""
185 if self._tracker is None:
186 return True
187 else:
196 """check whether my messages have been sent."""
188 197 return self._tracker.done
189 198
199 def wait_for_send(self, timeout=-1):
200 """wait for pyzmq send to complete.
201
202 This is necessary when sending arrays that you intend to edit in-place.
203 `timeout` is in seconds, and will raise TimeoutError if it is reached
204 before the send completes.
205 """
206 return self._tracker.wait(timeout)
207
190 208 #-------------------------------------
191 209 # dict-access
192 210 #-------------------------------------
@@ -285,7 +303,7 b' class AsyncHubResult(AsyncResult):'
285 303 if self._ready:
286 304 return
287 305 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
288 local_ready = self._client.barrier(local_ids, timeout)
306 local_ready = self._client.wait(local_ids, timeout)
289 307 if local_ready:
290 308 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
291 309 if not remote_ids:
This diff has been collapsed as it changes many lines, (632 lines changed) Show them Hide them
@@ -1,4 +1,4 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
@@ -31,57 +31,26 b' from IPython.external.decorator import decorator'
31 31 from IPython.external.ssh import tunnel
32 32
33 33 from . import error
34 from . import map as Map
35 34 from . import util
36 35 from . import streamsession as ss
37 36 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
38 37 from .clusterdir import ClusterDir, ClusterDirError
39 38 from .dependency import Dependency, depend, require, dependent
40 39 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
41 from .util import ReverseDict, validate_url, disambiguate_url
42 40 from .view import DirectView, LoadBalancedView
43 41
44 42 #--------------------------------------------------------------------------
45 # helpers for implementing old MEC API via client.apply
46 #--------------------------------------------------------------------------
47
48 def _push(user_ns, **ns):
49 """helper method for implementing `client.push` via `client.apply`"""
50 user_ns.update(ns)
51
52 def _pull(user_ns, keys):
53 """helper method for implementing `client.pull` via `client.apply`"""
54 if isinstance(keys, (list,tuple, set)):
55 for key in keys:
56 if not user_ns.has_key(key):
57 raise NameError("name '%s' is not defined"%key)
58 return map(user_ns.get, keys)
59 else:
60 if not user_ns.has_key(keys):
61 raise NameError("name '%s' is not defined"%keys)
62 return user_ns.get(keys)
63
64 def _clear(user_ns):
65 """helper method for implementing `client.clear` via `client.apply`"""
66 user_ns.clear()
67
68 def _execute(user_ns, code):
69 """helper method for implementing `client.execute` via `client.apply`"""
70 exec code in user_ns
71
72
73 #--------------------------------------------------------------------------
74 43 # Decorators for Client methods
75 44 #--------------------------------------------------------------------------
76 45
77 46 @decorator
78 def spinfirst(f, self, *args, **kwargs):
47 def spin_first(f, self, *args, **kwargs):
79 48 """Call spin() to sync state prior to calling the method."""
80 49 self.spin()
81 50 return f(self, *args, **kwargs)
82 51
83 52 @decorator
84 def defaultblock(f, self, *args, **kwargs):
53 def default_block(f, self, *args, **kwargs):
85 54 """Default to self.block; preserve self.block."""
86 55 block = kwargs.get('block',None)
87 56 block = self.block if block is None else block
@@ -151,7 +120,7 b' class Metadata(dict):'
151 120
152 121
153 122 class Client(HasTraits):
154 """A semi-synchronous client to the IPython ZMQ controller
123 """A semi-synchronous client to the IPython ZMQ cluster
155 124
156 125 Parameters
157 126 ----------
@@ -193,11 +162,11 b' class Client(HasTraits):'
193 162 flag for whether to use paramiko instead of shell ssh for tunneling.
194 163 [default: True on win32, False else]
195 164
196 #------- exec authentication args -------
197 # If even localhost is untrusted, you can have some protection against
198 # unauthorized execution by using a key. Messages are still sent
199 # as cleartext, so if someone can snoop your loopback traffic this will
200 # not help against malicious attacks.
165 ------- exec authentication args -------
166 If even localhost is untrusted, you can have some protection against
167 unauthorized execution by using a key. Messages are still sent
168 as cleartext, so if someone can snoop your loopback traffic this will
169 not help against malicious attacks.
201 170
202 171 exec_key : str
203 172 an authentication key or file containing a key
@@ -207,7 +176,7 b' class Client(HasTraits):'
207 176 Attributes
208 177 ----------
209 178
210 ids : set of int engine IDs
179 ids : list of int engine IDs
211 180 requesting the ids attribute always synchronizes
212 181 the registration state. To request ids without synchronization,
213 182 use semi-private _ids attributes.
@@ -234,15 +203,18 b' class Client(HasTraits):'
234 203 flushes incoming results and registration state changes
235 204 control methods spin, and requesting `ids` also ensures up to date
236 205
237 barrier
206 wait
238 207 wait on one or more msg_ids
239 208
240 209 execution methods
241 210 apply
242 211 legacy: execute, run
243 212
213 data movement
214 push, pull, scatter, gather
215
244 216 query methods
245 queue_status, get_result, purge
217 queue_status, get_result, purge, result_status
246 218
247 219 control methods
248 220 abort, shutdown
@@ -264,23 +236,25 b' class Client(HasTraits):'
264 236 _ssh=Bool(False)
265 237 _context = Instance('zmq.Context')
266 238 _config = Dict()
267 _engines=Instance(ReverseDict, (), {})
239 _engines=Instance(util.ReverseDict, (), {})
268 240 # _hub_socket=Instance('zmq.Socket')
269 241 _query_socket=Instance('zmq.Socket')
270 242 _control_socket=Instance('zmq.Socket')
271 243 _iopub_socket=Instance('zmq.Socket')
272 244 _notification_socket=Instance('zmq.Socket')
273 _apply_socket=Instance('zmq.Socket')
274 _mux_ident=Str()
275 _task_ident=Str()
245 _mux_socket=Instance('zmq.Socket')
246 _task_socket=Instance('zmq.Socket')
276 247 _task_scheme=Str()
277 248 _balanced_views=Dict()
278 249 _direct_views=Dict()
279 250 _closed = False
251 _ignored_control_replies=Int(0)
252 _ignored_hub_replies=Int(0)
280 253
281 254 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
282 255 context=None, username=None, debug=False, exec_key=None,
283 256 sshserver=None, sshkey=None, password=None, paramiko=None,
257 timeout=10
284 258 ):
285 259 super(Client, self).__init__(debug=debug, profile=profile)
286 260 if context is None:
@@ -292,11 +266,11 b' class Client(HasTraits):'
292 266 if self._cd is not None:
293 267 if url_or_file is None:
294 268 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
295 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
269 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
296 270 " Please specify at least one of url_or_file or profile."
297 271
298 272 try:
299 validate_url(url_or_file)
273 util.validate_url(url_or_file)
300 274 except AssertionError:
301 275 if not os.path.exists(url_or_file):
302 276 if self._cd:
@@ -316,7 +290,7 b' class Client(HasTraits):'
316 290 sshserver=cfg['ssh']
317 291 url = cfg['url']
318 292 location = cfg.setdefault('location', None)
319 cfg['url'] = disambiguate_url(cfg['url'], location)
293 cfg['url'] = util.disambiguate_url(cfg['url'], location)
320 294 url = cfg['url']
321 295
322 296 self._config = cfg
@@ -351,10 +325,11 b' class Client(HasTraits):'
351 325
352 326 self._notification_handlers = {'registration_notification' : self._register_engine,
353 327 'unregistration_notification' : self._unregister_engine,
328 'shutdown_notification' : lambda msg: self.close(),
354 329 }
355 330 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
356 331 'apply_reply' : self._handle_apply_reply}
357 self._connect(sshserver, ssh_kwargs)
332 self._connect(sshserver, ssh_kwargs, timeout)
358 333
359 334 def __del__(self):
360 335 """cleanup sockets, but _not_ context."""
@@ -378,22 +353,6 b' class Client(HasTraits):'
378 353 pass
379 354 self._cd = None
380 355
381 @property
382 def ids(self):
383 """Always up-to-date ids property."""
384 self._flush_notifications()
385 # always copy:
386 return list(self._ids)
387
388 def close(self):
389 if self._closed:
390 return
391 snames = filter(lambda n: n.endswith('socket'), dir(self))
392 for socket in map(lambda name: getattr(self, name), snames):
393 if isinstance(socket, zmq.Socket) and not socket.closed:
394 socket.close()
395 self._closed = True
396
397 356 def _update_engines(self, engines):
398 357 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
399 358 for k,v in engines.iteritems():
@@ -402,16 +361,15 b' class Client(HasTraits):'
402 361 self._ids.append(eid)
403 362 self._ids = sorted(self._ids)
404 363 if sorted(self._engines.keys()) != range(len(self._engines)) and \
405 self._task_scheme == 'pure' and self._task_ident:
364 self._task_scheme == 'pure' and self._task_socket:
406 365 self._stop_scheduling_tasks()
407 366
408 367 def _stop_scheduling_tasks(self):
409 368 """Stop scheduling tasks because an engine has been unregistered
410 369 from a pure ZMQ scheduler.
411 370 """
412 self._task_ident = ''
413 # self._task_socket.close()
414 # self._task_socket = None
371 self._task_socket.close()
372 self._task_socket = None
415 373 msg = "An engine has been unregistered, and we are using pure " +\
416 374 "ZMQ task scheduling. Task farming will be disabled."
417 375 if self.outstanding:
@@ -434,8 +392,8 b' class Client(HasTraits):'
434 392 targets = [targets]
435 393 return [self._engines[t] for t in targets], list(targets)
436 394
437 def _connect(self, sshserver, ssh_kwargs):
438 """setup all our socket connections to the controller. This is called from
395 def _connect(self, sshserver, ssh_kwargs, timeout):
396 """setup all our socket connections to the cluster. This is called from
439 397 __init__."""
440 398
441 399 # Maybe allow reconnecting?
@@ -444,13 +402,16 b' class Client(HasTraits):'
444 402 self._connected=True
445 403
446 404 def connect_socket(s, url):
447 url = disambiguate_url(url, self._config['location'])
405 url = util.disambiguate_url(url, self._config['location'])
448 406 if self._ssh:
449 407 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
450 408 else:
451 409 return s.connect(url)
452 410
453 411 self.session.send(self._query_socket, 'connection_request')
412 r,w,x = zmq.select([self._query_socket],[],[], timeout)
413 if not r:
414 raise error.TimeoutError("Hub connection request timed out")
454 415 idents,msg = self.session.recv(self._query_socket,mode=0)
455 416 if self.debug:
456 417 pprint(msg)
@@ -458,18 +419,15 b' class Client(HasTraits):'
458 419 content = msg.content
459 420 self._config['registration'] = dict(content)
460 421 if content.status == 'ok':
461 self._apply_socket = self._context.socket(zmq.XREP)
462 self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 422 if content.mux:
464 # self._mux_socket = self._context.socket(zmq.XREQ)
465 self._mux_ident = 'mux'
466 connect_socket(self._apply_socket, content.mux)
423 self._mux_socket = self._context.socket(zmq.XREQ)
424 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
425 connect_socket(self._mux_socket, content.mux)
467 426 if content.task:
468 427 self._task_scheme, task_addr = content.task
469 # self._task_socket = self._context.socket(zmq.XREQ)
470 # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
471 connect_socket(self._apply_socket, task_addr)
472 self._task_ident = 'task'
428 self._task_socket = self._context.socket(zmq.XREQ)
429 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
430 connect_socket(self._task_socket, task_addr)
473 431 if content.notification:
474 432 self._notification_socket = self._context.socket(zmq.SUB)
475 433 connect_socket(self._notification_socket, content.notification)
@@ -488,8 +446,6 b' class Client(HasTraits):'
488 446 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
489 447 connect_socket(self._iopub_socket, content.iopub)
490 448 self._update_engines(dict(content.engines))
491 # give XREP apply_socket some time to connect
492 time.sleep(0.25)
493 449 else:
494 450 self._connected = False
495 451 raise Exception("Failed to connect!")
@@ -499,7 +455,7 b' class Client(HasTraits):'
499 455 #--------------------------------------------------------------------------
500 456
501 457 def _unwrap_exception(self, content):
502 """unwrap exception, and remap engineid to int."""
458 """unwrap exception, and remap engine_id to int."""
503 459 e = error.unwrap_exception(content)
504 460 # print e.traceback
505 461 if e.engine_info:
@@ -545,7 +501,7 b' class Client(HasTraits):'
545 501
546 502 self._handle_stranded_msgs(eid, uuid)
547 503
548 if self._task_ident and self._task_scheme == 'pure':
504 if self._task_socket and self._task_scheme == 'pure':
549 505 self._stop_scheduling_tasks()
550 506
551 507 def _handle_stranded_msgs(self, eid, uuid):
@@ -622,7 +578,7 b' class Client(HasTraits):'
622 578 if content['status'] == 'ok':
623 579 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
624 580 elif content['status'] == 'aborted':
625 self.results[msg_id] = error.AbortedTask(msg_id)
581 self.results[msg_id] = error.TaskAborted(msg_id)
626 582 elif content['status'] == 'resubmitted':
627 583 # TODO: handle resubmission
628 584 pass
@@ -665,12 +621,26 b' class Client(HasTraits):'
665 621 in the ZMQ queue.
666 622
667 623 Currently: ignore them."""
624 if self._ignored_control_replies <= 0:
625 return
668 626 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
669 627 while msg is not None:
628 self._ignored_control_replies -= 1
670 629 if self.debug:
671 630 pprint(msg)
672 631 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
673 632
633 def _flush_ignored_control(self):
634 """flush ignored control replies"""
635 while self._ignored_control_replies > 0:
636 self.session.recv(self._control_socket)
637 self._ignored_control_replies -= 1
638
639 def _flush_ignored_hub_replies(self):
640 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 while msg is not None:
642 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643
674 644 def _flush_iopub(self, sock):
675 645 """Flush replies from the iopub channel waiting
676 646 in the ZMQ queue.
@@ -718,26 +688,46 b' class Client(HasTraits):'
718 688 if not isinstance(key, (int, slice, tuple, list, xrange)):
719 689 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
720 690 else:
721 return self.view(key, balanced=False)
691 return self._get_view(key, balanced=False)
722 692
723 693 #--------------------------------------------------------------------------
724 694 # Begin public methods
725 695 #--------------------------------------------------------------------------
726 696
697 @property
698 def ids(self):
699 """Always up-to-date ids property."""
700 self._flush_notifications()
701 # always copy:
702 return list(self._ids)
703
704 def close(self):
705 if self._closed:
706 return
707 snames = filter(lambda n: n.endswith('socket'), dir(self))
708 for socket in map(lambda name: getattr(self, name), snames):
709 if isinstance(socket, zmq.Socket) and not socket.closed:
710 socket.close()
711 self._closed = True
712
727 713 def spin(self):
728 714 """Flush any registration notifications and execution results
729 715 waiting in the ZMQ queue.
730 716 """
731 717 if self._notification_socket:
732 718 self._flush_notifications()
733 if self._apply_socket:
734 self._flush_results(self._apply_socket)
719 if self._mux_socket:
720 self._flush_results(self._mux_socket)
721 if self._task_socket:
722 self._flush_results(self._task_socket)
735 723 if self._control_socket:
736 724 self._flush_control(self._control_socket)
737 725 if self._iopub_socket:
738 726 self._flush_iopub(self._iopub_socket)
727 if self._query_socket:
728 self._flush_ignored_hub_replies()
739 729
740 def barrier(self, jobs=None, timeout=-1):
730 def wait(self, jobs=None, timeout=-1):
741 731 """waits on one or more `jobs`, for up to `timeout` seconds.
742 732
743 733 Parameters
@@ -786,8 +776,8 b' class Client(HasTraits):'
786 776 # Control methods
787 777 #--------------------------------------------------------------------------
788 778
789 @spinfirst
790 @defaultblock
779 @spin_first
780 @default_block
791 781 def clear(self, targets=None, block=None):
792 782 """Clear the namespace in target(s)."""
793 783 targets = self._build_targets(targets)[0]
@@ -795,18 +785,21 b' class Client(HasTraits):'
795 785 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
796 786 error = False
797 787 if self.block:
788 self._flush_ignored_control()
798 789 for i in range(len(targets)):
799 790 idents,msg = self.session.recv(self._control_socket,0)
800 791 if self.debug:
801 792 pprint(msg)
802 793 if msg['content']['status'] != 'ok':
803 794 error = self._unwrap_exception(msg['content'])
795 else:
796 self._ignored_control_replies += len(targets)
804 797 if error:
805 798 raise error
806 799
807 800
808 @spinfirst
809 @defaultblock
801 @spin_first
802 @default_block
810 803 def abort(self, jobs=None, targets=None, block=None):
811 804 """Abort specific jobs from the execution queues of target(s).
812 805
@@ -839,35 +832,41 b' class Client(HasTraits):'
839 832 content=content, ident=t)
840 833 error = False
841 834 if self.block:
835 self._flush_ignored_control()
842 836 for i in range(len(targets)):
843 837 idents,msg = self.session.recv(self._control_socket,0)
844 838 if self.debug:
845 839 pprint(msg)
846 840 if msg['content']['status'] != 'ok':
847 841 error = self._unwrap_exception(msg['content'])
842 else:
843 self._ignored_control_replies += len(targets)
848 844 if error:
849 845 raise error
850 846
851 @spinfirst
852 @defaultblock
853 def shutdown(self, targets=None, restart=False, controller=False, block=None):
854 """Terminates one or more engine processes, optionally including the controller."""
855 if controller:
847 @spin_first
848 @default_block
849 def shutdown(self, targets=None, restart=False, hub=False, block=None):
850 """Terminates one or more engine processes, optionally including the hub."""
851 if hub:
856 852 targets = 'all'
857 853 targets = self._build_targets(targets)[0]
858 854 for t in targets:
859 855 self.session.send(self._control_socket, 'shutdown_request',
860 856 content={'restart':restart},ident=t)
861 857 error = False
862 if block or controller:
858 if block or hub:
859 self._flush_ignored_control()
863 860 for i in range(len(targets)):
864 861 idents,msg = self.session.recv(self._control_socket,0)
865 862 if self.debug:
866 863 pprint(msg)
867 864 if msg['content']['status'] != 'ok':
868 865 error = self._unwrap_exception(msg['content'])
866 else:
867 self._ignored_control_replies += len(targets)
869 868
870 if controller:
869 if hub:
871 870 time.sleep(0.25)
872 871 self.session.send(self._query_socket, 'shutdown_request')
873 872 idents,msg = self.session.recv(self._query_socket, 0)
@@ -883,8 +882,8 b' class Client(HasTraits):'
883 882 # Execution methods
884 883 #--------------------------------------------------------------------------
885 884
886 @defaultblock
887 def execute(self, code, targets='all', block=None):
885 @default_block
886 def _execute(self, code, targets='all', block=None):
888 887 """Executes `code` on `targets` in blocking or nonblocking manner.
889 888
890 889 ``execute`` is always `bound` (affects engine namespace)
@@ -901,33 +900,7 b' class Client(HasTraits):'
901 900 whether or not to wait until done to return
902 901 default: self.block
903 902 """
904 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
905 if not block:
906 return result
907
908 def run(self, filename, targets='all', block=None):
909 """Execute contents of `filename` on engine(s).
910
911 This simply reads the contents of the file and calls `execute`.
912
913 Parameters
914 ----------
915
916 filename : str
917 The path to the file
918 targets : int/str/list of ints/strs
919 the engines on which to execute
920 default : all
921 block : bool
922 whether or not to wait until done
923 default: self.block
924
925 """
926 with open(filename, 'r') as f:
927 # add newline in case of trailing indented whitespace
928 # which will cause SyntaxError
929 code = f.read()+'\n'
930 return self.execute(code, targets=targets, block=block)
903 return self[targets].execute(code, block=block)
931 904
932 905 def _maybe_raise(self, result):
933 906 """wrapper for maybe raising an exception if apply failed."""
@@ -936,287 +909,113 b' class Client(HasTraits):'
936 909
937 910 return result
938 911
939 def _build_dependency(self, dep):
940 """helper for building jsonable dependencies from various input forms"""
941 if isinstance(dep, Dependency):
942 return dep.as_dict()
943 elif isinstance(dep, AsyncResult):
944 return dep.msg_ids
945 elif dep is None:
946 return []
947 else:
948 # pass to Dependency constructor
949 return list(Dependency(dep))
950
951 @defaultblock
952 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
953 targets=None, balanced=None,
954 after=None, follow=None, timeout=None,
955 track=False):
956 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
957
958 This is the central execution command for the client.
959
960 Parameters
961 ----------
962
963 f : function
964 The fuction to be called remotely
965 args : tuple/list
966 The positional arguments passed to `f`
967 kwargs : dict
968 The keyword arguments passed to `f`
969 bound : bool (default: False)
970 Whether to pass the Engine(s) Namespace as the first argument to `f`.
971 block : bool (default: self.block)
972 Whether to wait for the result, or return immediately.
973 False:
974 returns AsyncResult
975 True:
976 returns actual result(s) of f(*args, **kwargs)
977 if multiple targets:
978 list of results, matching `targets`
979 track : bool
980 whether to track non-copying sends.
981 [default False]
982
983 targets : int,list of ints, 'all', None
984 Specify the destination of the job.
985 if None:
986 Submit via Task queue for load-balancing.
987 if 'all':
988 Run on all active engines
989 if list:
990 Run on each specified engine
991 if int:
992 Run on single engine
993 Note:
994 that if `balanced=True`, and `targets` is specified,
995 then the load-balancing will be limited to balancing
996 among `targets`.
997
998 balanced : bool, default None
999 whether to load-balance. This will default to True
1000 if targets is unspecified, or False if targets is specified.
1001
1002 If `balanced` and `targets` are both specified, the task will
1003 be assigne to *one* of the targets by the scheduler.
1004
1005 The following arguments are only used when balanced is True:
1006
1007 after : Dependency or collection of msg_ids
1008 Only for load-balanced execution (targets=None)
1009 Specify a list of msg_ids as a time-based dependency.
1010 This job will only be run *after* the dependencies
1011 have been met.
1012
1013 follow : Dependency or collection of msg_ids
1014 Only for load-balanced execution (targets=None)
1015 Specify a list of msg_ids as a location-based dependency.
1016 This job will only be run on an engine where this dependency
1017 is met.
1018
1019 timeout : float/int or None
1020 Only for load-balanced execution (targets=None)
1021 Specify an amount of time (in seconds) for the scheduler to
1022 wait for dependencies to be met before failing with a
1023 DependencyTimeout.
912 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
913 ident=None):
914 """construct and send an apply message via a socket.
1024 915
1025 Returns
1026 -------
1027
1028 if block is False:
1029 return AsyncResult wrapping msg_ids
1030 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1031 else:
1032 if single target (or balanced):
1033 return result of `f(*args, **kwargs)`
1034 else:
1035 return list of results, matching `targets`
916 This is the principal method with which all engine execution is performed by views.
1036 917 """
918
1037 919 assert not self._closed, "cannot use me anymore, I'm closed!"
1038 920 # defaults:
1039 block = block if block is not None else self.block
1040 921 args = args if args is not None else []
1041 922 kwargs = kwargs if kwargs is not None else {}
923 subheader = subheader if subheader is not None else {}
1042 924
1043 if not self._ids:
1044 # flush notification socket if no engines yet
1045 any_ids = self.ids
1046 if not any_ids:
1047 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
1048
1049 if balanced is None:
1050 if targets is None:
1051 # default to balanced if targets unspecified
1052 balanced = True
1053 else:
1054 # otherwise default to multiplexing
1055 balanced = False
1056
1057 if targets is None and balanced is False:
1058 # default to all if *not* balanced, and targets is unspecified
1059 targets = 'all'
1060
1061 # enforce types of f,args,kwrags
925 # validate arguments
1062 926 if not callable(f):
1063 927 raise TypeError("f must be callable, not %s"%type(f))
1064 928 if not isinstance(args, (tuple, list)):
1065 929 raise TypeError("args must be tuple or list, not %s"%type(args))
1066 930 if not isinstance(kwargs, dict):
1067 931 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
932 if not isinstance(subheader, dict):
933 raise TypeError("subheader must be dict, not %s"%type(subheader))
1068 934
1069 options = dict(bound=bound, block=block, targets=targets, track=track)
1070
1071 if balanced:
1072 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1073 after=after, follow=follow, **options)
1074 elif follow or after or timeout:
1075 msg = "follow, after, and timeout args are only used for"
1076 msg += " load-balanced execution."
1077 raise ValueError(msg)
1078 else:
1079 return self._apply_direct(f, args, kwargs, **options)
1080
1081 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1082 after=None, follow=None, timeout=None, track=None):
1083 """call f(*args, **kwargs) remotely in a load-balanced manner.
1084
1085 This is a private method, see `apply` for details.
1086 Not to be called directly!
1087 """
1088
1089 loc = locals()
1090 for name in ('bound', 'block', 'track'):
1091 assert loc[name] is not None, "kwarg %r must be specified!"%name
1092
1093 if not self._task_ident:
1094 msg = "Task farming is disabled"
1095 if self._task_scheme == 'pure':
1096 msg += " because the pure ZMQ scheduler cannot handle"
1097 msg += " disappearing engines."
1098 raise RuntimeError(msg)
1099
1100 if self._task_scheme == 'pure':
1101 # pure zmq scheme doesn't support dependencies
1102 msg = "Pure ZMQ scheduler doesn't support dependencies"
1103 if (follow or after):
1104 # hard fail on DAG dependencies
1105 raise RuntimeError(msg)
1106 if isinstance(f, dependent):
1107 # soft warn on functional dependencies
1108 warnings.warn(msg, RuntimeWarning)
1109
1110 # defaults:
1111 args = args if args is not None else []
1112 kwargs = kwargs if kwargs is not None else {}
1113
1114 if targets:
1115 idents,_ = self._build_targets(targets)
1116 else:
1117 idents = []
935 if not self._ids:
936 # flush notification socket if no engines yet
937 any_ids = self.ids
938 if not any_ids:
939 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
940 # enforce types of f,args,kwargs
1118 941
1119 after = self._build_dependency(after)
1120 follow = self._build_dependency(follow)
1121 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1122 942 bufs = util.pack_apply_message(f,args,kwargs)
1123 content = dict(bound=bound)
1124 943
1125 msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident,
1126 content=content, buffers=bufs, subheader=subheader, track=track)
1127 msg_id = msg['msg_id']
1128 self.outstanding.add(msg_id)
1129 self.history.append(msg_id)
1130 self.metadata[msg_id]['submitted'] = datetime.now()
1131 tracker = None if track is False else msg['tracker']
1132 ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker)
1133 if block:
1134 try:
1135 return ar.get()
1136 except KeyboardInterrupt:
1137 return ar
1138 else:
1139 return ar
1140
1141 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
1142 track=None):
1143 """Then underlying method for applying functions to specific engines
1144 via the MUX queue.
1145
1146 This is a private method, see `apply` for details.
1147 Not to be called directly!
1148 """
1149
1150 if not self._mux_ident:
1151 msg = "Multiplexing is disabled"
1152 raise RuntimeError(msg)
1153
1154 loc = locals()
1155 for name in ('bound', 'block', 'targets', 'track'):
1156 assert loc[name] is not None, "kwarg %r must be specified!"%name
944 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
945 subheader=subheader, track=track)
1157 946
1158 idents,targets = self._build_targets(targets)
1159
1160 subheader = {}
1161 content = dict(bound=bound)
1162 bufs = util.pack_apply_message(f,args,kwargs)
1163
1164 msg_ids = []
1165 trackers = []
1166 for ident in idents:
1167 msg = self.session.send(self._apply_socket, "apply_request",
1168 content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader,
1169 track=track)
1170 if track:
1171 trackers.append(msg['tracker'])
1172 947 msg_id = msg['msg_id']
1173 948 self.outstanding.add(msg_id)
949 if ident:
950 # possibly routed to a specific engine
951 if isinstance(ident, list):
952 ident = ident[-1]
953 if ident in self._engines.values():
954 # save for later, in case of engine death
1174 955 self._outstanding_dict[ident].add(msg_id)
1175 956 self.history.append(msg_id)
1176 msg_ids.append(msg_id)
1177
1178 tracker = None if track is False else zmq.MessageTracker(*trackers)
1179 ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
957 self.metadata[msg_id]['submitted'] = datetime.now()
1180 958
1181 if block:
1182 try:
1183 return ar.get()
1184 except KeyboardInterrupt:
1185 return ar
1186 else:
1187 return ar
959 return msg
1188 960
1189 961 #--------------------------------------------------------------------------
1190 962 # construct a View object
1191 963 #--------------------------------------------------------------------------
1192 964
1193 @defaultblock
1194 def remote(self, bound=False, block=None, targets=None, balanced=None):
1195 """Decorator for making a RemoteFunction"""
1196 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1197
1198 @defaultblock
1199 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1200 """Decorator for making a ParallelFunction"""
1201 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1202
1203 965 def _cache_view(self, targets, balanced):
1204 966 """save views, so subsequent requests don't create new objects."""
1205 967 if balanced:
968 # validate whether we can run
969 if not self._task_socket:
970 msg = "Task farming is disabled"
971 if self._task_scheme == 'pure':
972 msg += " because the pure ZMQ scheduler cannot handle"
973 msg += " disappearing engines."
974 raise RuntimeError(msg)
975 socket = self._task_socket
1206 976 view_class = LoadBalancedView
1207 977 view_cache = self._balanced_views
1208 978 else:
979 socket = self._mux_socket
1209 980 view_class = DirectView
1210 981 view_cache = self._direct_views
1211 982
1212 983 # use str, since often targets will be a list
1213 984 key = str(targets)
1214 985 if key not in view_cache:
1215 view_cache[key] = view_class(client=self, targets=targets)
986 view_cache[key] = view_class(client=self, socket=socket, targets=targets)
1216 987
1217 988 return view_cache[key]
1218 989
1219 def view(self, targets=None, balanced=None):
990 def load_balanced_view(self, targets=None):
991 """construct a DirectView object.
992
993 If no arguments are specified, create a LoadBalancedView
994 using all engines.
995
996 Parameters
997 ----------
998
999 targets: list,slice,int,etc. [default: use all engines]
1000 The subset of engines across which to load-balance
1001 """
1002 return self._get_view(targets, balanced=True)
1003
1004 def direct_view(self, targets='all'):
1005 """construct a DirectView object.
1006
1007 If no targets are specified, create a DirectView
1008 using all engines.
1009
1010 Parameters
1011 ----------
1012
1013 targets: list,slice,int,etc. [default: use all engines]
1014 The engines to use for the View
1015 """
1016 return self._get_view(targets, balanced=False)
1017
1018 def _get_view(self, targets, balanced):
1220 1019 """Method for constructing View objects.
1221 1020
1222 1021 If no arguments are specified, create a LoadBalancedView
@@ -1234,9 +1033,7 b' class Client(HasTraits):'
1234 1033
1235 1034 """
1236 1035
1237 balanced = (targets is None) if balanced is None else balanced
1238
1239 if targets is None:
1036 if targets in (None,'all'):
1240 1037 if balanced:
1241 1038 return self._cache_view(None,True)
1242 1039 else:
@@ -1261,20 +1058,20 b' class Client(HasTraits):'
1261 1058 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1262 1059
1263 1060 #--------------------------------------------------------------------------
1264 # Data movement
1061 # Data movement (TO BE REMOVED)
1265 1062 #--------------------------------------------------------------------------
1266 1063
1267 @defaultblock
1268 def push(self, ns, targets='all', block=None, track=False):
1064 @default_block
1065 def _push(self, ns, targets='all', block=None, track=False):
1269 1066 """Push the contents of `ns` into the namespace on `target`"""
1270 1067 if not isinstance(ns, dict):
1271 1068 raise TypeError("Must be a dict, not %s"%type(ns))
1272 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1069 result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1273 1070 if not block:
1274 1071 return result
1275 1072
1276 @defaultblock
1277 def pull(self, keys, targets='all', block=None):
1073 @default_block
1074 def _pull(self, keys, targets='all', block=None):
1278 1075 """Pull objects from `target`'s namespace by `keys`"""
1279 1076 if isinstance(keys, basestring):
1280 1077 pass
@@ -1284,64 +1081,15 b' class Client(HasTraits):'
1284 1081 raise TypeError("keys must be str, not type %r"%type(key))
1285 1082 else:
1286 1083 raise TypeError("keys must be strs, not %r"%keys)
1287 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1084 result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1288 1085 return result
1289 1086
1290 @defaultblock
1291 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False):
1292 """
1293 Partition a Python sequence and send the partitions to a set of engines.
1294 """
1295 targets = self._build_targets(targets)[-1]
1296 mapObject = Map.dists[dist]()
1297 nparts = len(targets)
1298 msg_ids = []
1299 trackers = []
1300 for index, engineid in enumerate(targets):
1301 partition = mapObject.getPartition(seq, index, nparts)
1302 if flatten and len(partition) == 1:
1303 r = self.push({key: partition[0]}, targets=engineid, block=False, track=track)
1304 else:
1305 r = self.push({key: partition}, targets=engineid, block=False, track=track)
1306 msg_ids.extend(r.msg_ids)
1307 if track:
1308 trackers.append(r._tracker)
1309
1310 if track:
1311 tracker = zmq.MessageTracker(*trackers)
1312 else:
1313 tracker = None
1314
1315 r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker)
1316 if block:
1317 r.wait()
1318 else:
1319 return r
1320
1321 @defaultblock
1322 def gather(self, key, dist='b', targets='all', block=None):
1323 """
1324 Gather a partitioned sequence on a set of engines as a single local seq.
1325 """
1326
1327 targets = self._build_targets(targets)[-1]
1328 mapObject = Map.dists[dist]()
1329 msg_ids = []
1330 for index, engineid in enumerate(targets):
1331 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1332
1333 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1334 if block:
1335 return r.get()
1336 else:
1337 return r
1338
1339 1087 #--------------------------------------------------------------------------
1340 1088 # Query methods
1341 1089 #--------------------------------------------------------------------------
1342 1090
1343 @spinfirst
1344 @defaultblock
1091 @spin_first
1092 @default_block
1345 1093 def get_result(self, indices_or_msg_ids=None, block=None):
1346 1094 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1347 1095
@@ -1406,7 +1154,7 b' class Client(HasTraits):'
1406 1154
1407 1155 return ar
1408 1156
1409 @spinfirst
1157 @spin_first
1410 1158 def result_status(self, msg_ids, status_only=True):
1411 1159 """Check on the status of the result(s) of the apply request with `msg_ids`.
1412 1160
@@ -1505,7 +1253,7 b' class Client(HasTraits):'
1505 1253 error.collect_exceptions(failures, "result_status")
1506 1254 return content
1507 1255
1508 @spinfirst
1256 @spin_first
1509 1257 def queue_status(self, targets='all', verbose=False):
1510 1258 """Fetch the status of engine queues.
1511 1259
@@ -1518,8 +1266,8 b' class Client(HasTraits):'
1518 1266 verbose : bool
1519 1267 Whether to return lengths only, or lists of ids for each element
1520 1268 """
1521 targets = self._build_targets(targets)[1]
1522 content = dict(targets=targets, verbose=verbose)
1269 engine_ids = self._build_targets(targets)[1]
1270 content = dict(targets=engine_ids, verbose=verbose)
1523 1271 self.session.send(self._query_socket, "queue_request", content=content)
1524 1272 idents,msg = self.session.recv(self._query_socket, 0)
1525 1273 if self.debug:
@@ -1528,11 +1276,15 b' class Client(HasTraits):'
1528 1276 status = content.pop('status')
1529 1277 if status != 'ok':
1530 1278 raise self._unwrap_exception(content)
1531 return util.rekey(content)
1279 content = util.rekey(content)
1280 if isinstance(targets, int):
1281 return content[targets]
1282 else:
1283 return content
1532 1284
1533 @spinfirst
1285 @spin_first
1534 1286 def purge_results(self, jobs=[], targets=[]):
1535 """Tell the controller to forget results.
1287 """Tell the Hub to forget results.
1536 1288
1537 1289 Individual results can be purged by msg_id, or the entire
1538 1290 history of specific targets can be purged.
@@ -1540,11 +1292,11 b' class Client(HasTraits):'
1540 1292 Parameters
1541 1293 ----------
1542 1294
1543 jobs : str or list of strs or AsyncResult objects
1295 jobs : str or list of str or AsyncResult objects
1544 1296 the msg_ids whose results should be forgotten.
1545 1297 targets : int/str/list of ints/strs
1546 1298 The targets, by uuid or int_id, whose entire history is to be purged.
1547 Use `targets='all'` to scrub everything from the controller's memory.
1299 Use `targets='all'` to scrub everything from the Hub's memory.
1548 1300
1549 1301 default : None
1550 1302 """
@@ -6,11 +6,9 b''
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 from IPython.external.decorator import decorator
10
11 9 from .asyncresult import AsyncResult
12 10 from .error import UnmetDependency
13
11 from .util import interactive
14 12
15 13 class depend(object):
16 14 """Dependency decorator, for use with tasks.
@@ -54,6 +52,8 b' class dependent(object):'
54 52 self.dkwargs = dkwargs
55 53
56 54 def __call__(self, *args, **kwargs):
55 # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'):
56 # self.df.func_globals = self.f.func_globals
57 57 if self.df(*self.dargs, **self.dkwargs) is False:
58 58 raise UnmetDependency()
59 59 return self.f(*args, **kwargs)
@@ -62,13 +62,18 b' class dependent(object):'
62 62 def __name__(self):
63 63 return self.func_name
64 64
65 @interactive
65 66 def _require(*names):
66 67 """Helper for @require decorator."""
68 from IPython.zmq.parallel.error import UnmetDependency
69 user_ns = globals()
67 70 for name in names:
71 if name in user_ns:
72 continue
68 73 try:
69 __import__(name)
74 exec 'import %s'%name in user_ns
70 75 except ImportError:
71 return False
76 raise UnmetDependency(name)
72 77 return True
73 78
74 79 def require(*names):
@@ -96,25 +101,31 b' class Dependency(set):'
96 101 all : bool [default True]
97 102 Whether the dependency should be considered met when *all* depending tasks have completed
98 103 or only when *any* have been completed.
99 success_only : bool [default True]
100 Whether to consider only successes for Dependencies, or consider failures as well.
101 If `all=success_only=True`, then this task will fail with an ImpossibleDependency
104 success : bool [default True]
105 Whether to consider successes as fulfilling dependencies.
106 failure : bool [default False]
107 Whether to consider failures as fulfilling dependencies.
108
109 If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
102 110 as soon as the first depended-upon task fails.
103 111 """
104 112
105 113 all=True
106 success_only=True
114 success=True
115 failure=True
107 116
108 def __init__(self, dependencies=[], all=True, success_only=True):
117 def __init__(self, dependencies=[], all=True, success=True, failure=False):
109 118 if isinstance(dependencies, dict):
110 119 # load from dict
111 120 all = dependencies.get('all', True)
112 success_only = dependencies.get('success_only', success_only)
121 success = dependencies.get('success', success)
122 failure = dependencies.get('failure', failure)
113 123 dependencies = dependencies.get('dependencies', [])
114 124 ids = []
115 if isinstance(dependencies, AsyncResult):
116 ids.extend(AsyncResult.msg_ids)
117 else:
125
126 # extract ids from various sources:
127 if isinstance(dependencies, (basestring, AsyncResult)):
128 dependencies = [dependencies]
118 129 for d in dependencies:
119 130 if isinstance(d, basestring):
120 131 ids.append(d)
@@ -122,28 +133,41 b' class Dependency(set):'
122 133 ids.extend(d.msg_ids)
123 134 else:
124 135 raise TypeError("invalid dependency type: %r"%type(d))
136
125 137 set.__init__(self, ids)
126 138 self.all = all
127 self.success_only=success_only
139 if not (success or failure):
140 raise ValueError("Must depend on at least one of successes or failures!")
141 self.success=success
142 self.failure = failure
128 143
129 144 def check(self, completed, failed=None):
130 if failed is not None and not self.success_only:
131 completed = completed.union(failed)
145 """check whether our dependencies have been met."""
132 146 if len(self) == 0:
133 147 return True
148 against = set()
149 if self.success:
150 against = completed
151 if failed is not None and self.failure:
152 against = against.union(failed)
134 153 if self.all:
135 return self.issubset(completed)
154 return self.issubset(against)
136 155 else:
137 return not self.isdisjoint(completed)
156 return not self.isdisjoint(against)
138 157
139 def unreachable(self, failed):
140 if len(self) == 0 or len(failed) == 0 or not self.success_only:
158 def unreachable(self, completed, failed=None):
159 """return whether this dependency has become impossible."""
160 if len(self) == 0:
141 161 return False
142 # print self, self.success_only, self.all, failed
162 against = set()
163 if not self.success:
164 against = completed
165 if failed is not None and not self.failure:
166 against = against.union(failed)
143 167 if self.all:
144 return not self.isdisjoint(failed)
168 return not self.isdisjoint(against)
145 169 else:
146 return self.issubset(failed)
170 return self.issubset(against)
147 171
148 172
149 173 def as_dict(self):
@@ -151,7 +175,8 b' class Dependency(set):'
151 175 return dict(
152 176 dependencies=list(self),
153 177 all=self.all,
154 success_only=self.success_only,
178 success=self.success,
179 failure=self.failure
155 180 )
156 181
157 182
@@ -890,13 +890,9 b' class Hub(LoggingFactory):'
890 890
891 891 def shutdown_request(self, client_id, msg):
892 892 """handle shutdown request."""
893 # s = self.context.socket(zmq.XREQ)
894 # s.connect(self.client_connections['mux'])
895 # time.sleep(0.1)
896 # for eid,ec in self.engines.iteritems():
897 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
898 # time.sleep(1)
899 893 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
894 # also notify other clients of shutdown
895 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
900 896 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
901 897 dc.start()
902 898
@@ -1,4 +1,4 b''
1 """Remote Functions and decorators for the client."""
1 """Remote Functions and decorators for Views."""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
@@ -22,33 +22,33 b' from .asyncresult import AsyncMapResult'
22 22 #-----------------------------------------------------------------------------
23 23
24 24 @testdec.skip_doctest
25 def remote(client, bound=False, block=None, targets=None, balanced=None):
25 def remote(view, block=None, **flags):
26 26 """Turn a function into a remote function.
27 27
28 28 This method can be used for map:
29 29
30 In [1]: @remote(client,block=True)
30 In [1]: @remote(view,block=True)
31 31 ...: def func(a):
32 32 ...: pass
33 33 """
34 34
35 35 def remote_function(f):
36 return RemoteFunction(client, f, bound, block, targets, balanced)
36 return RemoteFunction(view, f, block=block, **flags)
37 37 return remote_function
38 38
39 39 @testdec.skip_doctest
40 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
40 def parallel(view, dist='b', block=None, **flags):
41 41 """Turn a function into a parallel remote function.
42 42
43 43 This method can be used for map:
44 44
45 In [1]: @parallel(client,block=True)
45 In [1]: @parallel(view, block=True)
46 46 ...: def func(a):
47 47 ...: pass
48 48 """
49 49
50 50 def parallel_function(f):
51 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
51 return ParallelFunction(view, f, dist=dist, block=block, **flags)
52 52 return parallel_function
53 53
54 54 #--------------------------------------------------------------------------
@@ -61,44 +61,32 b' class RemoteFunction(object):'
61 61 Parameters
62 62 ----------
63 63
64 client : Client instance
65 The client to be used to connect to engines
64 view : View instance
65 The view to be used for execution
66 66 f : callable
67 67 The function to be wrapped into a remote function
68 bound : bool [default: False]
69 Whether the affect the remote namespace when called
70 68 block : bool [default: None]
71 69 Whether to wait for results or not. The default behavior is
72 to use the current `block` attribute of `client`
73 targets : valid target list [default: all]
74 The targets on which to execute.
75 balanced : bool
76 Whether to load-balance with the Task scheduler or not
70 to use the current `block` attribute of `view`
71
72 **flags : remaining kwargs are passed to View.temp_flags
77 73 """
78 74
79 client = None # the remote connection
75 view = None # the remote connection
80 76 func = None # the wrapped function
81 77 block = None # whether to block
82 bound = None # whether to affect the namespace
83 targets = None # where to execute
84 balanced = None # whether to load-balance
78 flags = None # dict of extra kwargs for temp_flags
85 79
86 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
87 self.client = client
80 def __init__(self, view, f, block=None, **flags):
81 self.view = view
88 82 self.func = f
89 83 self.block=block
90 self.bound=bound
91 self.targets=targets
92 if balanced is None:
93 if targets is None:
94 balanced = True
95 else:
96 balanced = False
97 self.balanced = balanced
84 self.flags=flags
98 85
99 86 def __call__(self, *args, **kwargs):
100 return self.client.apply(self.func, args=args, kwargs=kwargs,
101 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
87 block = self.view.block if self.block is None else self.block
88 with self.view.temp_flags(block=block, **self.flags):
89 return self.view.apply(self.func, *args, **kwargs)
102 90
103 91
104 92 class ParallelFunction(RemoteFunction):
@@ -111,51 +99,57 b' class ParallelFunction(RemoteFunction):'
111 99 Parameters
112 100 ----------
113 101
114 client : Client instance
115 The client to be used to connect to engines
102 view : View instance
103 The view to be used for execution
116 104 f : callable
117 105 The function to be wrapped into a remote function
118 bound : bool [default: False]
119 Whether the affect the remote namespace when called
106 dist : str [default: 'b']
107 The key for which mapObject to use to distribute sequences
108 options are:
109 * 'b' : use contiguous chunks in order
110 * 'r' : use round-robin striping
120 111 block : bool [default: None]
121 112 Whether to wait for results or not. The default behavior is
122 to use the current `block` attribute of `client`
123 targets : valid target list [default: all]
124 The targets on which to execute.
125 balanced : bool
126 Whether to load-balance with the Task scheduler or not
127 chunk_size : int or None
113 to use the current `block` attribute of `view`
114 chunksize : int or None
128 115 The size of chunk to use when breaking up sequences in a load-balanced manner
116 **flags : remaining kwargs are passed to View.temp_flags
129 117 """
130 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
131 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
132 self.chunk_size = chunk_size
118
119 chunksize=None
120 mapObject=None
121
122 def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
123 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
124 self.chunksize = chunksize
133 125
134 126 mapClass = Map.dists[dist]
135 127 self.mapObject = mapClass()
136 128
137 129 def __call__(self, *sequences):
130 # check that the length of sequences match
138 131 len_0 = len(sequences[0])
139 132 for s in sequences:
140 133 if len(s)!=len_0:
141 134 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
142 135 raise ValueError(msg)
143
144 if self.balanced:
145 if self.chunk_size:
146 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
136 balanced = 'Balanced' in self.view.__class__.__name__
137 if balanced:
138 if self.chunksize:
139 nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
147 140 else:
148 141 nparts = len_0
149 targets = [self.targets]*nparts
142 targets = [None]*nparts
150 143 else:
151 if self.chunk_size:
152 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
144 if self.chunksize:
145 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
153 146 # multiplexed:
154 targets = self.client._build_targets(self.targets)[-1]
147 targets = self.view.targets
155 148 nparts = len(targets)
156 149
157 150 msg_ids = []
158 151 # my_f = lambda *a: map(self.func, *a)
152 client = self.view.client
159 153 for index, t in enumerate(targets):
160 154 args = []
161 155 for seq in sequences:
@@ -173,12 +167,15 b' class ParallelFunction(RemoteFunction):'
173 167 args = [self.func]+args
174 168 else:
175 169 f=self.func
176 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
177 targets=t, balanced=self.balanced)
170
171 view = self.view if balanced else client[t]
172 with view.temp_flags(block=False, **self.flags):
173 ar = view.apply(f, *args)
178 174
179 175 msg_ids.append(ar.msg_ids[0])
180 176
181 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
177 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
178
182 179 if self.block:
183 180 try:
184 181 return r.get()
@@ -238,7 +238,7 b' class TaskScheduler(SessionFactory):'
238 238 msg = self.session.unpack_message(msg, copy=False, content=False)
239 239 parent = msg['header']
240 240 idents = [idents[0],engine]+idents[1:]
241 print (idents)
241 # print (idents)
242 242 try:
243 243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
244 244 except:
@@ -277,8 +277,9 b' class TaskScheduler(SessionFactory):'
277 277 # time dependencies
278 278 after = Dependency(header.get('after', []))
279 279 if after.all:
280 if after.success:
280 281 after.difference_update(self.all_completed)
281 if not after.success_only:
282 if after.failure:
282 283 after.difference_update(self.all_failed)
283 284 if after.check(self.all_completed, self.all_failed):
284 285 # recast as empty set, if `after` already met,
@@ -302,7 +303,7 b' class TaskScheduler(SessionFactory):'
302 303 self.depending[msg_id] = args
303 304 return self.fail_unreachable(msg_id, error.InvalidDependency)
304 305 # check if unreachable:
305 if dep.unreachable(self.all_failed):
306 if dep.unreachable(self.all_completed, self.all_failed):
306 307 self.depending[msg_id] = args
307 308 return self.fail_unreachable(msg_id)
308 309
@@ -379,7 +380,11 b' class TaskScheduler(SessionFactory):'
379 380 if follow.all:
380 381 # check follow for impossibility
381 382 dests = set()
382 relevant = self.all_completed if follow.success_only else self.all_done
383 relevant = set()
384 if follow.success:
385 relevant = self.all_completed
386 if follow.failure:
387 relevant = relevant.union(self.all_failed)
383 388 for m in follow.intersection(relevant):
384 389 dests.add(self.destinations[m])
385 390 if len(dests) > 1:
@@ -514,11 +519,8 b' class TaskScheduler(SessionFactory):'
514 519
515 520 for msg_id in jobs:
516 521 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
517 # if dep_id in after:
518 # if after.all and (success or not after.success_only):
519 # after.remove(dep_id)
520 522
521 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
523 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
522 524 self.fail_unreachable(msg_id)
523 525
524 526 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
@@ -170,7 +170,7 b' class Kernel(SessionFactory):'
170 170
171 171 content = dict(status='ok')
172 172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
173 parent=parent, ident=ident)[0]
173 parent=parent, ident=ident)
174 174 self.log.debug(str(reply_msg))
175 175
176 176 def shutdown_request(self, stream, ident, parent):
@@ -184,10 +184,7 b' class Kernel(SessionFactory):'
184 184 content['status'] = 'ok'
185 185 msg = self.session.send(stream, 'shutdown_reply',
186 186 content=content, parent=parent, ident=ident)
187 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
188 # content, parent, ident)
189 # print >> sys.__stdout__, msg
190 # time.sleep(0.2)
187 self.log.debug(str(msg))
191 188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 189 dc.start()
193 190
@@ -295,7 +292,7 b' class Kernel(SessionFactory):'
295 292 content = parent[u'content']
296 293 bufs = parent[u'buffers']
297 294 msg_id = parent['header']['msg_id']
298 bound = content.get('bound', False)
295 # bound = parent['header'].get('bound', False)
299 296 except:
300 297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
301 298 return
@@ -314,16 +311,12 b' class Kernel(SessionFactory):'
314 311 working = self.user_ns
315 312 # suffix =
316 313 prefix = "_"+str(msg_id).replace("-","")+"_"
317 # if bound:
318 #
319 # else:
320 # working = dict()
321 # suffix = prefix = "_" # prevent keyword collisions with lambda
314
322 315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
323 if bound:
324 bound_ns = Namespace(working)
325 args = [bound_ns]+list(args)
326 # if f.fun
316 # if bound:
317 # bound_ns = Namespace(working)
318 # args = [bound_ns]+list(args)
319
327 320 fname = getattr(f, '__name__', 'f')
328 321
329 322 fname = prefix+"f"
@@ -341,8 +334,8 b' class Kernel(SessionFactory):'
341 334 finally:
342 335 for key in ns.iterkeys():
343 336 working.pop(key)
344 if bound:
345 working.update(bound_ns)
337 # if bound:
338 # working.update(bound_ns)
346 339
347 340 packed_result,buf = serialize_object(result)
348 341 result_buf = [packed_result]+buf
@@ -365,8 +358,11 b' class Kernel(SessionFactory):'
365 358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
366 359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
367 360
368 # if reply_msg['content']['status'] == u'error':
369 # self.abort_queues()
361 # flush i/o
362 # should this be before reply_msg is sent, like in the single-kernel code,
363 # or should nothing get in the way of real results?
364 sys.stdout.flush()
365 sys.stderr.flush()
370 366
371 367 def dispatch_queue(self, stream, msg):
372 368 self.control_stream.flush()
@@ -1,5 +1,16 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
3 14 import tempfile
4 15 import time
5 16 from subprocess import Popen, PIPE, STDOUT
@@ -15,17 +26,27 b' def setup():'
15 26 cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT)
16 27 processes.append(cp)
17 28 time.sleep(.5)
18 add_engine()
29 add_engines(1)
19 30 c = client.Client(profile='iptest')
20 31 while not c.ids:
21 32 time.sleep(.1)
22 33 c.spin()
34 c.close()
23 35
24 def add_engine(profile='iptest'):
36 def add_engines(n=1, profile='iptest'):
37 rc = client.Client(profile=profile)
38 base = len(rc)
39 eps = []
40 for i in range(n):
25 41 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
26 42 # ep.start()
27 43 processes.append(ep)
28 return ep
44 eps.append(ep)
45 while len(rc) < base+n:
46 time.sleep(.1)
47 rc.spin()
48 rc.close()
49 return eps
29 50
30 51 def teardown():
31 52 time.sleep(1)
@@ -1,3 +1,12 b''
1 """base class for parallel client tests"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
1 10 import sys
2 11 import tempfile
3 12 import time
@@ -6,6 +15,7 b' from multiprocessing import Process'
6 15
7 16 from nose import SkipTest
8 17
18 import zmq
9 19 from zmq.tests import BaseZMQTestCase
10 20
11 21 from IPython.external.decorator import decorator
@@ -14,7 +24,7 b' from IPython.zmq.parallel import error'
14 24 from IPython.zmq.parallel.client import Client
15 25 from IPython.zmq.parallel.ipcluster import launch_process
16 26 from IPython.zmq.parallel.entry_point import select_random_ports
17 from IPython.zmq.parallel.tests import processes,add_engine
27 from IPython.zmq.parallel.tests import processes,add_engines
18 28
19 29 # simple tasks for use in apply tests
20 30
@@ -47,13 +57,11 b' def skip_without(*names):'
47 57 return f(*args, **kwargs)
48 58 return skip_without_names
49 59
50
51 60 class ClusterTestCase(BaseZMQTestCase):
52 61
53 62 def add_engines(self, n=1, block=True):
54 63 """add multiple engines to our cluster"""
55 for i in range(n):
56 self.engines.append(add_engine())
64 self.engines.extend(add_engines(n))
57 65 if block:
58 66 self.wait_on_engines()
59 67
@@ -69,9 +77,10 b' class ClusterTestCase(BaseZMQTestCase):'
69 77 def connect_client(self):
70 78 """connect a client with my Context, and track its sockets for cleanup"""
71 79 c = Client(profile='iptest',context=self.context)
72
73 # for name in filter(lambda n:n.endswith('socket'), dir(c)):
74 # self.sockets.append(getattr(c, name))
80 for name in filter(lambda n:n.endswith('socket'), dir(c)):
81 s = getattr(c, name)
82 s.setsockopt(zmq.LINGER, 0)
83 self.sockets.append(s)
75 84 return c
76 85
77 86 def assertRaisesRemote(self, etype, f, *args, **kwargs):
@@ -92,15 +101,19 b' class ClusterTestCase(BaseZMQTestCase):'
92 101 self.engines=[]
93 102
94 103 def tearDown(self):
95
104 # self.client.clear(block=True)
96 105 # close fds:
97 106 for e in filter(lambda e: e.poll() is not None, processes):
98 107 processes.remove(e)
99 108
109 # allow flushing of incoming messages to prevent crash on socket close
110 self.client.wait(timeout=2)
111 # time.sleep(2)
112 self.client.spin()
100 113 self.client.close()
101 114 BaseZMQTestCase.tearDown(self)
102 # this will be superfluous when pyzmq merges PR #88
103 self.context.term()
115 # this will be redundant when pyzmq merges PR #88
116 # self.context.term()
104 117 # print tempfile.TemporaryFile().fileno(),
105 118 # sys.stdout.flush()
106 119 No newline at end of file
@@ -1,3 +1,16 b''
1 """Tests for parallel client.py"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
1 14 import time
2 15 from tempfile import mktemp
3 16
@@ -8,7 +21,10 b' from IPython.zmq.parallel import error'
8 21 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
9 22 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
10 23
11 from clienttest import ClusterTestCase, segfault, wait
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
25
26 def setup():
27 add_engines(4)
12 28
13 29 class TestClient(ClusterTestCase):
14 30
@@ -17,27 +33,6 b' class TestClient(ClusterTestCase):'
17 33 self.add_engines(3)
18 34 self.assertEquals(len(self.client.ids), n+3)
19 35
20 def test_segfault_task(self):
21 """test graceful handling of engine death (balanced)"""
22 self.add_engines(1)
23 ar = self.client.apply(segfault, block=False)
24 self.assertRaisesRemote(error.EngineError, ar.get)
25 eid = ar.engine_id
26 while eid in self.client.ids:
27 time.sleep(.01)
28 self.client.spin()
29
30 def test_segfault_mux(self):
31 """test graceful handling of engine death (direct)"""
32 self.add_engines(1)
33 eid = self.client.ids[-1]
34 ar = self.client[eid].apply_async(segfault)
35 self.assertRaisesRemote(error.EngineError, ar.get)
36 eid = ar.engine_id
37 while eid in self.client.ids:
38 time.sleep(.01)
39 self.client.spin()
40
41 36 def test_view_indexing(self):
42 37 """test index access for views"""
43 38 self.add_engines(2)
@@ -71,8 +66,8 b' class TestClient(ClusterTestCase):'
71 66 v = self.client[:2]
72 67 v2 =self.client[:2]
73 68 self.assertTrue(v is v2)
74 v = self.client.view()
75 v2 = self.client.view(balanced=True)
69 v = self.client.load_balanced_view()
70 v2 = self.client.load_balanced_view(targets=None)
76 71 self.assertTrue(v is v2)
77 72
78 73 def test_targets(self):
@@ -84,102 +79,26 b' class TestClient(ClusterTestCase):'
84 79
85 80 def test_clear(self):
86 81 """test clear behavior"""
87 self.add_engines(2)
88 self.client.block=True
89 self.client.push(dict(a=5))
90 self.client.pull('a')
82 # self.add_engines(2)
83 v = self.client[:]
84 v.block=True
85 v.push(dict(a=5))
86 v.pull('a')
91 87 id0 = self.client.ids[-1]
92 88 self.client.clear(targets=id0)
93 self.client.pull('a', targets=self.client.ids[:-1])
94 self.assertRaisesRemote(NameError, self.client.pull, 'a')
95 self.client.clear()
89 self.client[:-1].pull('a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 self.client.clear(block=True)
96 92 for i in self.client.ids:
97 self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
98
99
100 def test_push_pull(self):
101 """test pushing and pulling"""
102 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
103 t = self.client.ids[-1]
104 self.add_engines(2)
105 push = self.client.push
106 pull = self.client.pull
107 self.client.block=True
108 nengines = len(self.client)
109 push({'data':data}, targets=t)
110 d = pull('data', targets=t)
111 self.assertEquals(d, data)
112 push({'data':data})
113 d = pull('data')
114 self.assertEquals(d, nengines*[data])
115 ar = push({'data':data}, block=False)
116 self.assertTrue(isinstance(ar, AsyncResult))
117 r = ar.get()
118 ar = pull('data', block=False)
119 self.assertTrue(isinstance(ar, AsyncResult))
120 r = ar.get()
121 self.assertEquals(r, nengines*[data])
122 push(dict(a=10,b=20))
123 r = pull(('a','b'))
124 self.assertEquals(r, nengines*[[10,20]])
125
126 def test_push_pull_function(self):
127 "test pushing and pulling functions"
128 def testf(x):
129 return 2.0*x
130
131 self.add_engines(4)
132 t = self.client.ids[-1]
133 self.client.block=True
134 push = self.client.push
135 pull = self.client.pull
136 execute = self.client.execute
137 push({'testf':testf}, targets=t)
138 r = pull('testf', targets=t)
139 self.assertEqual(r(1.0), testf(1.0))
140 execute('r = testf(10)', targets=t)
141 r = pull('r', targets=t)
142 self.assertEquals(r, testf(10))
143 ar = push({'testf':testf}, block=False)
144 ar.get()
145 ar = pull('testf', block=False)
146 rlist = ar.get()
147 for r in rlist:
148 self.assertEqual(r(1.0), testf(1.0))
149 execute("def g(x): return x*x", targets=t)
150 r = pull(('testf','g'),targets=t)
151 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
152
153 def test_push_function_globals(self):
154 """test that pushed functions have access to globals"""
155 def geta():
156 return a
157 self.add_engines(1)
158 v = self.client[-1]
159 v.block=True
160 v['f'] = geta
161 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
162 v.execute('a=5')
163 v.execute('b=f()')
164 self.assertEquals(v['b'], 5)
165
166 def test_push_function_defaults(self):
167 """test that pushed functions preserve default args"""
168 def echo(a=10):
169 return a
170 self.add_engines(1)
171 v = self.client[-1]
172 v.block=True
173 v['f'] = echo
174 v.execute('b=f()')
175 self.assertEquals(v['b'], 10)
93 # print i
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
176 95
177 96 def test_get_result(self):
178 97 """test getting results from the Hub."""
179 98 c = clientmod.Client(profile='iptest')
180 self.add_engines(1)
99 # self.add_engines(1)
181 100 t = c.ids[-1]
182 ar = c.apply(wait, (1,), block=False, targets=t)
101 ar = c[t].apply_async(wait, 1)
183 102 # give the monitor time to notice the message
184 103 time.sleep(.25)
185 104 ahr = self.client.get_result(ar.msg_ids)
@@ -187,76 +106,42 b' class TestClient(ClusterTestCase):'
187 106 self.assertEquals(ahr.get(), ar.get())
188 107 ar2 = self.client.get_result(ar.msg_ids)
189 108 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 c.close()
190 110
191 111 def test_ids_list(self):
192 112 """test client.ids"""
193 self.add_engines(2)
113 # self.add_engines(2)
194 114 ids = self.client.ids
195 115 self.assertEquals(ids, self.client._ids)
196 116 self.assertFalse(ids is self.client._ids)
197 117 ids.remove(ids[-1])
198 118 self.assertNotEquals(ids, self.client._ids)
199 119
200 def test_run_newline(self):
201 """test that run appends newline to files"""
202 tmpfile = mktemp()
203 with open(tmpfile, 'w') as f:
204 f.write("""def g():
205 return 5
206 """)
207 v = self.client[-1]
208 v.run(tmpfile, block=True)
209 self.assertEquals(v.apply_sync(lambda : g()), 5)
210
211 def test_apply_tracked(self):
212 """test tracking for apply"""
213 # self.add_engines(1)
214 t = self.client.ids[-1]
215 self.client.block=False
216 def echo(n=1024*1024, **kwargs):
217 return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs)
218 ar = echo(1)
219 self.assertTrue(ar._tracker is None)
220 self.assertTrue(ar.sent)
221 ar = echo(track=True)
222 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
223 self.assertEquals(ar.sent, ar._tracker.done)
224 ar._tracker.wait()
225 self.assertTrue(ar.sent)
226
227 def test_push_tracked(self):
228 t = self.client.ids[-1]
229 ns = dict(x='x'*1024*1024)
230 ar = self.client.push(ns, targets=t, block=False)
231 self.assertTrue(ar._tracker is None)
232 self.assertTrue(ar.sent)
233
234 ar = self.client.push(ns, targets=t, block=False, track=True)
235 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
236 self.assertEquals(ar.sent, ar._tracker.done)
237 ar._tracker.wait()
238 self.assertTrue(ar.sent)
239 ar.get()
240
241 def test_scatter_tracked(self):
242 t = self.client.ids
243 x='x'*1024*1024
244 ar = self.client.scatter('x', x, targets=t, block=False)
245 self.assertTrue(ar._tracker is None)
246 self.assertTrue(ar.sent)
247
248 ar = self.client.scatter('x', x, targets=t, block=False, track=True)
249 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
250 self.assertEquals(ar.sent, ar._tracker.done)
251 ar._tracker.wait()
252 self.assertTrue(ar.sent)
253 ar.get()
254
255 def test_remote_reference(self):
256 v = self.client[-1]
257 v['a'] = 123
258 ra = clientmod.Reference('a')
259 b = v.apply_sync(lambda x: x, ra)
260 self.assertEquals(b, 123)
120 def test_queue_status(self):
121 # self.addEngine(4)
122 ids = self.client.ids
123 id0 = ids[0]
124 qs = self.client.queue_status(targets=id0)
125 self.assertTrue(isinstance(qs, dict))
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 allqs = self.client.queue_status()
128 self.assertTrue(isinstance(allqs, dict))
129 self.assertEquals(sorted(allqs.keys()), self.client.ids)
130 for eid,qs in allqs.items():
131 self.assertTrue(isinstance(qs, dict))
132 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133
134 def test_shutdown(self):
135 # self.addEngine(4)
136 ids = self.client.ids
137 id0 = ids[0]
138 self.client.shutdown(id0, block=True)
139 while id0 in self.client.ids:
140 time.sleep(0.1)
141 self.client.spin()
261 142
143 self.assertRaises(IndexError, lambda : self.client[id0])
262 144
145 def test_result_status(self):
146 pass
147 # to be written
@@ -1,5 +1,16 b''
1 1 """test serialization with newserialized"""
2 2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
3 14 from unittest import TestCase
4 15
5 16 from IPython.testing.parametric import parametric
@@ -83,5 +94,15 b' class CanningTestCase(TestCase):'
83 94 a[2] = 1e9
84 95 self.assertTrue((a==final).all())
85 96
97 def test_uncan_function_globals(self):
98 """test that uncanning a module function restores it into its module"""
99 from re import search
100 cf = can(search)
101 csearch = uncan(cf)
102 self.assertEqual(csearch.__module__, search.__module__)
103 self.assertNotEqual(csearch('asd', 'asdf'), None)
104 csearch = uncan(cf, dict(a=5))
105 self.assertEqual(csearch.__module__, search.__module__)
106 self.assertNotEqual(csearch('asd', 'asdf'), None)
86 107
87 108 No newline at end of file
@@ -1,3 +1,15 b''
1 """test building messages with streamsession"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
1 13
2 14 import os
3 15 import uuid
@@ -316,3 +316,39 b' def unpack_apply_message(bufs, g=None, copy=True):'
316 316
317 317 return f,args,kwargs
318 318
319 #--------------------------------------------------------------------------
320 # helpers for implementing old MEC API via view.apply
321 #--------------------------------------------------------------------------
322
323 def interactive(f):
324 """decorator for making functions appear as interactively defined.
325 This results in the function being linked to the user_ns as globals()
326 instead of the module globals().
327 """
328 f.__module__ = '__main__'
329 return f
330
331 @interactive
332 def _push(ns):
333 """helper method for implementing `client.push` via `client.apply`"""
334 globals().update(ns)
335
336 @interactive
337 def _pull(keys):
338 """helper method for implementing `client.pull` via `client.apply`"""
339 user_ns = globals()
340 if isinstance(keys, (list,tuple, set)):
341 for key in keys:
342 if not user_ns.has_key(key):
343 raise NameError("name '%s' is not defined"%key)
344 return map(user_ns.get, keys)
345 else:
346 if not user_ns.has_key(keys):
347 raise NameError("name '%s' is not defined"%keys)
348 return user_ns.get(keys)
349
350 @interactive
351 def _execute(code):
352 """helper method for implementing `client.execute` via `client.apply`"""
353 exec code in globals()
354
This diff has been collapsed as it changes many lines, (596 lines changed) Show them Hide them
@@ -10,13 +10,20 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 import warnings
14 from contextlib import contextmanager
15
16 import zmq
17
13 18 from IPython.testing import decorators as testdec
14 19 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance
15 20
16 21 from IPython.external.decorator import decorator
17 22
18 from .asyncresult import AsyncResult
19 from .dependency import Dependency
23 from . import map as Map
24 from . import util
25 from .asyncresult import AsyncResult, AsyncMapResult
26 from .dependency import Dependency, dependent
20 27 from .remotefunction import ParallelFunction, parallel, remote
21 28
22 29 #-----------------------------------------------------------------------------
@@ -24,21 +31,12 b' from .remotefunction import ParallelFunction, parallel, remote'
24 31 #-----------------------------------------------------------------------------
25 32
26 33 @decorator
27 def myblock(f, self, *args, **kwargs):
28 """override client.block with self.block during a call"""
29 block = self.client.block
30 self.client.block = self.block
31 try:
32 ret = f(self, *args, **kwargs)
33 finally:
34 self.client.block = block
35 return ret
36
37 @decorator
38 34 def save_ids(f, self, *args, **kwargs):
39 35 """Keep our history and outstanding attributes up to date after a method call."""
40 36 n_previous = len(self.client.history)
37 try:
41 38 ret = f(self, *args, **kwargs)
39 finally:
42 40 nmsgs = len(self.client.history) - n_previous
43 41 msg_ids = self.client.history[-nmsgs:]
44 42 self.history.extend(msg_ids)
@@ -71,27 +69,54 b' class View(HasTraits):'
71 69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
72 70
73 71 Don't use this class, use subclasses.
72
73 Methods
74 -------
75
76 spin
77 flushes incoming results and registration state changes
78 control methods spin, and requesting `ids` also ensures up to date
79
80 wait
81 wait on one or more msg_ids
82
83 execution methods
84 apply
85 legacy: execute, run
86
87 data movement
88 push, pull, scatter, gather
89
90 query methods
91 get_result, queue_status, purge_results, result_status
92
93 control methods
94 abort, shutdown
95
74 96 """
75 97 block=Bool(False)
76 bound=Bool(False)
77 track=Bool(False)
98 track=Bool(True)
78 99 history=List()
79 100 outstanding = Set()
80 101 results = Dict()
81 102 client = Instance('IPython.zmq.parallel.client.Client')
82 103
104 _socket = Instance('zmq.Socket')
83 105 _ntargets = Int(1)
84 _balanced = Bool(False)
85 _default_names = List(['block', 'bound', 'track'])
106 _flag_names = List(['block', 'track'])
86 107 _targets = Any()
108 _idents = Any()
87 109
88 def __init__(self, client=None, targets=None):
89 super(View, self).__init__(client=client)
90 self._targets = targets
110 def __init__(self, client=None, socket=None, targets=None):
111 super(View, self).__init__(client=client, _socket=socket)
91 112 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
92 113 self.block = client.block
93 114
94 for name in self._default_names:
115 self._idents, self._targets = self.client._build_targets(targets)
116 if targets is None or isinstance(targets, int):
117 self._targets = targets
118 for name in self._flag_names:
119 # set flags, if they haven't been set yet
95 120 setattr(self, name, getattr(self, name, None))
96 121
97 122 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
@@ -111,134 +136,127 b' class View(HasTraits):'
111 136 def targets(self, value):
112 137 raise AttributeError("Cannot set View `targets` after construction!")
113 138
114 @property
115 def balanced(self):
116 return self._balanced
117
118 @balanced.setter
119 def balanced(self, value):
120 raise AttributeError("Cannot set View `balanced` after construction!")
121
122 def _defaults(self, *excludes):
123 """return dict of our default attributes, excluding names given."""
124 d = dict(balanced=self._balanced, targets=self._targets)
125 for name in self._default_names:
126 if name not in excludes:
127 d[name] = getattr(self, name)
128 return d
129
130 139 def set_flags(self, **kwargs):
131 140 """set my attribute flags by keyword.
132 141
133 A View is a wrapper for the Client's apply method, but
134 with attributes that specify keyword arguments, those attributes
135 can be set by keyword argument with this method.
142 Views determine behavior with a few attributes (`block`, `track`, etc.).
143 These attributes can be set all at once by name with this method.
136 144
137 145 Parameters
138 146 ----------
139 147
140 148 block : bool
141 149 whether to wait for results
142 bound : bool
143 whether to pass the client's Namespace as the first argument
144 to functions called via `apply`.
145 150 track : bool
146 151 whether to create a MessageTracker to allow the user to
147 152 safely edit after arrays and buffers during non-copying
148 153 sends.
149 154 """
150 for key in kwargs:
151 if key not in self._default_names:
152 raise KeyError("Invalid name: %r"%key)
153 for name in ('block', 'bound'):
154 if name in kwargs:
155 setattr(self, name, kwargs[name])
155 for name, value in kwargs.iteritems():
156 if name not in self._flag_names:
157 raise KeyError("Invalid name: %r"%name)
158 else:
159 setattr(self, name, value)
160
161 @contextmanager
162 def temp_flags(self, **kwargs):
163 """temporarily set flags, for use in `with` statements.
164
165 See set_flags for permanent setting of flags
166
167 Examples
168 --------
169
170 >>> view.track=False
171 ...
172 >>> with view.temp_flags(track=True):
173 ... ar = view.apply(dostuff, my_big_array)
174 ... ar.tracker.wait() # wait for send to finish
175 >>> view.track
176 False
177
178 """
179 # preflight: save flags, and set temporaries
180 saved_flags = {}
181 for f in self._flag_names:
182 saved_flags[f] = getattr(self, f)
183 self.set_flags(**kwargs)
184 # yield to the with-statement block
185 yield
186 # postflight: restore saved flags
187 self.set_flags(**saved_flags)
188
156 189
157 190 #----------------------------------------------------------------
158 # wrappers for client methods:
191 # apply
159 192 #----------------------------------------------------------------
160 @sync_results
161 def spin(self):
162 """spin the client, and sync"""
163 self.client.spin()
164 193
165 194 @sync_results
166 195 @save_ids
196 def _really_apply(self, f, args, kwargs, block=None, **options):
197 """wrapper for client.send_apply_message"""
198 raise NotImplementedError("Implement in subclasses")
199
167 200 def apply(self, f, *args, **kwargs):
168 201 """calls f(*args, **kwargs) on remote engines, returning the result.
169 202
170 This method sets all of `client.apply`'s keyword arguments via this
171 View's attributes.
203 This method sets all apply flags via this View's attributes.
172 204
173 205 if self.block is False:
174 206 returns AsyncResult
175 207 else:
176 208 returns actual result of f(*args, **kwargs)
177 209 """
178 return self.client.apply(f, args, kwargs, **self._defaults())
210 return self._really_apply(f, args, kwargs)
179 211
180 @save_ids
181 212 def apply_async(self, f, *args, **kwargs):
182 213 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
183 214
184 215 returns AsyncResult
185 216 """
186 d = self._defaults('block', 'bound')
187 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
217 return self._really_apply(f, args, kwargs, block=False)
188 218
189 219 @spin_after
190 @save_ids
191 220 def apply_sync(self, f, *args, **kwargs):
192 221 """calls f(*args, **kwargs) on remote engines in a blocking manner,
193 222 returning the result.
194 223
195 224 returns: actual result of f(*args, **kwargs)
196 225 """
197 d = self._defaults('block', 'bound', 'track')
198 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
226 return self._really_apply(f, args, kwargs, block=True)
199 227
200 # @sync_results
201 # @save_ids
202 # def apply_bound(self, f, *args, **kwargs):
203 # """calls f(*args, **kwargs) bound to engine namespace(s).
204 #
205 # if self.block is False:
206 # returns msg_id
207 # else:
208 # returns actual result of f(*args, **kwargs)
209 #
210 # This method has access to the targets' namespace via globals()
211 #
212 # """
213 # d = self._defaults('bound')
214 # return self.client.apply(f, args, kwargs, bound=True, **d)
215 #
228 #----------------------------------------------------------------
229 # wrappers for client and control methods
230 #----------------------------------------------------------------
216 231 @sync_results
217 @save_ids
218 def apply_async_bound(self, f, *args, **kwargs):
219 """calls f(*args, **kwargs) bound to engine namespace(s)
220 in a nonblocking manner.
221
222 The first argument to `f` will be the Engine's Namespace
223
224 returns: AsyncResult
232 def spin(self):
233 """spin the client, and sync"""
234 self.client.spin()
225 235
226 """
227 d = self._defaults('block', 'bound')
228 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
236 @sync_results
237 def wait(self, jobs=None, timeout=-1):
238 """waits on one or more `jobs`, for up to `timeout` seconds.
229 239
230 @spin_after
231 @save_ids
232 def apply_sync_bound(self, f, *args, **kwargs):
233 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
240 Parameters
241 ----------
234 242
235 The first argument to `f` will be the Engine's Namespace
243 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
244 ints are indices to self.history
245 strs are msg_ids
246 default: wait on all outstanding messages
247 timeout : float
248 a time in seconds, after which to give up.
249 default is -1, which means no timeout
236 250
237 returns: actual result of f(*args, **kwargs)
251 Returns
252 -------
238 253
254 True : when all msg_ids are done
255 False : timeout reached, some msg_ids still outstanding
239 256 """
240 d = self._defaults('block', 'bound')
241 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
257 if jobs is None:
258 jobs = self.history
259 return self.client.wait(jobs, timeout)
242 260
243 261 def abort(self, jobs=None, block=None):
244 262 """Abort jobs on my engines.
@@ -318,6 +336,7 b' class View(HasTraits):'
318 336 """Parallel version of `itertools.imap`.
319 337
320 338 See `self.map` for details.
339
321 340 """
322 341
323 342 return iter(self.map_async(f,*sequences, **kwargs))
@@ -326,14 +345,15 b' class View(HasTraits):'
326 345 # Decorators
327 346 #-------------------------------------------------------------------
328 347
329 def remote(self, bound=False, block=True):
348 def remote(self, block=True, **flags):
330 349 """Decorator for making a RemoteFunction"""
331 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
350 block = self.block if block is None else block
351 return remote(self, block=block, **flags)
332 352
333 def parallel(self, dist='b', bound=False, block=None):
353 def parallel(self, dist='b', block=None, **flags):
334 354 """Decorator for making a ParallelFunction"""
335 355 block = self.block if block is None else block
336 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
356 return parallel(self, dist=dist, block=block, **flags)
337 357
338 358 @testdec.skip_doctest
339 359 class DirectView(View):
@@ -355,14 +375,65 b' class DirectView(View):'
355 375
356 376 """
357 377
358 def __init__(self, client=None, targets=None):
359 super(DirectView, self).__init__(client=client, targets=targets)
360 self._balanced = False
378 def __init__(self, client=None, socket=None, targets=None):
379 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
361 380
362 @spin_after
381
382 @sync_results
363 383 @save_ids
384 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None):
385 """calls f(*args, **kwargs) on remote engines, returning the result.
386
387 This method sets all of `apply`'s flags via this View's attributes.
388
389 Parameters
390 ----------
391
392 f : callable
393
394 args : list [default: empty]
395
396 kwargs : dict [default: empty]
397
398 block : bool [default: self.block]
399 whether to block
400 track : bool [default: self.track]
401 whether to ask zmq to track the message, for safe non-copying sends
402
403 Returns
404 -------
405
406 if self.block is False:
407 returns AsyncResult
408 else:
409 returns actual result of f(*args, **kwargs) on the engine(s)
410 This will be a list of self.targets is also a list (even length 1), or
411 the single result if self.targets is an integer engine id
412 """
413 args = [] if args is None else args
414 kwargs = {} if kwargs is None else kwargs
415 block = self.block if block is None else block
416 track = self.track if track is None else track
417 msg_ids = []
418 trackers = []
419 for ident in self._idents:
420 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
421 ident=ident)
422 if track:
423 trackers.append(msg['tracker'])
424 msg_ids.append(msg['msg_id'])
425 tracker = None if track is False else zmq.MessageTracker(*trackers)
426 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=self._targets, tracker=tracker)
427 if block:
428 try:
429 return ar.get()
430 except KeyboardInterrupt:
431 pass
432 return ar
433
434 @spin_after
364 435 def map(self, f, *sequences, **kwargs):
365 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
436 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
366 437
367 438 Parallel version of builtin `map`, using this View's `targets`.
368 439
@@ -380,8 +451,6 b' class DirectView(View):'
380 451 the sequences to be distributed and passed to `f`
381 452 block : bool
382 453 whether to wait for the result or not [default self.block]
383 bound : bool
384 whether to pass the client's Namespace as the first argument to `f`
385 454
386 455 Returns
387 456 -------
@@ -396,70 +465,140 b' class DirectView(View):'
396 465 the result of map(f,*sequences)
397 466 """
398 467
399 block = kwargs.get('block', self.block)
400 bound = kwargs.get('bound', self.bound)
468 block = kwargs.pop('block', self.block)
401 469 for k in kwargs.keys():
402 if k not in ['block', 'bound']:
470 if k not in ['block', 'track']:
403 471 raise TypeError("invalid keyword arg, %r"%k)
404 472
405 473 assert len(sequences) > 0, "must have some sequences to map onto!"
406 pf = ParallelFunction(self.client, f, block=block, bound=bound,
407 targets=self._targets, balanced=False)
474 pf = ParallelFunction(self, f, block=block, **kwargs)
408 475 return pf.map(*sequences)
409 476
410 @sync_results
411 @save_ids
412 477 def execute(self, code, block=None):
413 """execute some code on my targets."""
478 """Executes `code` on `targets` in blocking or nonblocking manner.
414 479
415 block = block if block is not None else self.block
480 ``execute`` is always `bound` (affects engine namespace)
416 481
417 return self.client.execute(code, block=block, targets=self._targets)
482 Parameters
483 ----------
418 484
419 @sync_results
420 @save_ids
421 def run(self, fname, block=None):
422 """execute the code in a file on my targets."""
485 code : str
486 the code string to be executed
487 block : bool
488 whether or not to wait until done to return
489 default: self.block
490 """
491 return self._really_apply(util._execute, args=(code,), block=block)
423 492
424 block = block if block is not None else self.block
493 def run(self, filename, block=None):
494 """Execute contents of `filename` on my engine(s).
495
496 This simply reads the contents of the file and calls `execute`.
497
498 Parameters
499 ----------
500
501 filename : str
502 The path to the file
503 targets : int/str/list of ints/strs
504 the engines on which to execute
505 default : all
506 block : bool
507 whether or not to wait until done
508 default: self.block
425 509
426 return self.client.run(fname, block=block, targets=self._targets)
510 """
511 with open(filename, 'r') as f:
512 # add newline in case of trailing indented whitespace
513 # which will cause SyntaxError
514 code = f.read()+'\n'
515 return self.execute(code, block=block)
427 516
428 517 def update(self, ns):
429 """update remote namespace with dict `ns`"""
430 return self.client.push(ns, targets=self._targets, block=self.block)
518 """update remote namespace with dict `ns`
431 519
432 def push(self, ns, block=None):
433 """update remote namespace with dict `ns`"""
520 See `push` for details.
521 """
522 return self.push(ns, block=self.block, track=self.track)
434 523
435 block = block if block is not None else self.block
524 def push(self, ns, block=None, track=None):
525 """update remote namespace with dict `ns`
436 526
437 return self.client.push(ns, targets=self._targets, block=block)
527 Parameters
528 ----------
529
530 ns : dict
531 dict of keys with which to update engine namespace(s)
532 block : bool [default : self.block]
533 whether to wait to be notified of engine receipt
534
535 """
536
537 block = block if block is not None else self.block
538 track = track if track is not None else self.track
539 # applier = self.apply_sync if block else self.apply_async
540 if not isinstance(ns, dict):
541 raise TypeError("Must be a dict, not %s"%type(ns))
542 return self._really_apply(util._push, (ns,),block=block, track=track)
438 543
439 544 def get(self, key_s):
440 545 """get object(s) by `key_s` from remote namespace
441 will return one object if it is a key.
442 It also takes a list of keys, and will return a list of objects."""
546
547 see `pull` for details.
548 """
443 549 # block = block if block is not None else self.block
444 return self.client.pull(key_s, block=True, targets=self._targets)
550 return self.pull(key_s, block=True)
551
552 def pull(self, names, block=True):
553 """get object(s) by `name` from remote namespace
445 554
446 @sync_results
447 @save_ids
448 def pull(self, key_s, block=True):
449 """get object(s) by `key_s` from remote namespace
450 555 will return one object if it is a key.
451 It also takes a list of keys, and will return a list of objects."""
556 can also take a list of keys, in which case it will return a list of objects.
557 """
452 558 block = block if block is not None else self.block
453 return self.client.pull(key_s, block=block, targets=self._targets)
559 applier = self.apply_sync if block else self.apply_async
560 if isinstance(names, basestring):
561 pass
562 elif isinstance(names, (list,tuple,set)):
563 for key in names:
564 if not isinstance(key, basestring):
565 raise TypeError("keys must be str, not type %r"%type(key))
566 else:
567 raise TypeError("names must be strs, not %r"%names)
568 return applier(util._pull, names)
454 569
455 def scatter(self, key, seq, dist='b', flatten=False, block=None):
570 def scatter(self, key, seq, dist='b', flatten=False, block=None, track=None):
456 571 """
457 572 Partition a Python sequence and send the partitions to a set of engines.
458 573 """
459 574 block = block if block is not None else self.block
575 track = track if track is not None else self.track
576 targets = self._targets
577 mapObject = Map.dists[dist]()
578 nparts = len(targets)
579 msg_ids = []
580 trackers = []
581 for index, engineid in enumerate(targets):
582 push = self.client[engineid].push
583 partition = mapObject.getPartition(seq, index, nparts)
584 if flatten and len(partition) == 1:
585 r = push({key: partition[0]}, block=False, track=track)
586 else:
587 r = push({key: partition},block=False, track=track)
588 msg_ids.extend(r.msg_ids)
589 if track:
590 trackers.append(r._tracker)
460 591
461 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
462 targets=self._targets, block=block)
592 if track:
593 tracker = zmq.MessageTracker(*trackers)
594 else:
595 tracker = None
596
597 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
598 if block:
599 r.wait()
600 else:
601 return r
463 602
464 603 @sync_results
465 604 @save_ids
@@ -468,8 +607,20 b' class DirectView(View):'
468 607 Gather a partitioned sequence on a set of engines as a single local seq.
469 608 """
470 609 block = block if block is not None else self.block
610 mapObject = Map.dists[dist]()
611 msg_ids = []
612 for index, engineid in enumerate(self._targets):
471 613
472 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
614 msg_ids.extend(self.client[engineid].pull(key, block=False).msg_ids)
615
616 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
617
618 if block:
619 try:
620 return r.get()
621 except KeyboardInterrupt:
622 pass
623 return r
473 624
474 625 def __getitem__(self, key):
475 626 return self.get(key)
@@ -523,22 +674,25 b' class LoadBalancedView(View):'
523 674
524 675 Load-balanced views can be created with the client's `view` method:
525 676
526 >>> v = client.view(balanced=True)
677 >>> v = client.load_balanced_view()
527 678
528 679 or targets can be specified, to restrict the potential destinations:
529 680
530 >>> v = client.view([1,3],balanced=True)
681 >>> v = client.client.load_balanced_view(([1,3])
531 682
532 683 which would restrict loadbalancing to between engines 1 and 3.
533 684
534 685 """
535 686
536 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
687 _flag_names = ['block', 'track', 'follow', 'after', 'timeout']
537 688
538 def __init__(self, client=None, targets=None):
539 super(LoadBalancedView, self).__init__(client=client, targets=targets)
689 def __init__(self, client=None, socket=None, targets=None):
690 super(LoadBalancedView, self).__init__(client=client, socket=socket, targets=targets)
540 691 self._ntargets = 1
541 self._balanced = True
692 self._task_scheme=client._task_scheme
693 if targets is None:
694 self._targets = None
695 self._idents=[]
542 696
543 697 def _validate_dependency(self, dep):
544 698 """validate a dependency.
@@ -549,7 +703,7 b' class LoadBalancedView(View):'
549 703 return True
550 704 elif isinstance(dep, (list,set, tuple)):
551 705 for d in dep:
552 if not isinstance(d, str, AsyncResult):
706 if not isinstance(d, (str, AsyncResult)):
553 707 return False
554 708 elif isinstance(dep, dict):
555 709 if set(dep.keys()) != set(Dependency().as_dict().keys()):
@@ -562,6 +716,20 b' class LoadBalancedView(View):'
562 716 else:
563 717 return False
564 718
719 return True
720
721 def _render_dependency(self, dep):
722 """helper for building jsonable dependencies from various input forms."""
723 if isinstance(dep, Dependency):
724 return dep.as_dict()
725 elif isinstance(dep, AsyncResult):
726 return dep.msg_ids
727 elif dep is None:
728 return []
729 else:
730 # pass to Dependency constructor
731 return list(Dependency(dep))
732
565 733 def set_flags(self, **kwargs):
566 734 """set my attribute flags by keyword.
567 735
@@ -574,19 +742,28 b' class LoadBalancedView(View):'
574 742
575 743 block : bool
576 744 whether to wait for results
577 bound : bool
578 whether to pass the client's Namespace as the first argument
579 to functions called via `apply`.
580 745 track : bool
581 746 whether to create a MessageTracker to allow the user to
582 747 safely edit after arrays and buffers during non-copying
583 748 sends.
584 follow : Dependency, list, msg_id, AsyncResult
585 the location dependencies of tasks
586 after : Dependency, list, msg_id, AsyncResult
587 the time dependencies of tasks
588 timeout : int,None
589 the timeout to be used for tasks
749 #
750 after : Dependency or collection of msg_ids
751 Only for load-balanced execution (targets=None)
752 Specify a list of msg_ids as a time-based dependency.
753 This job will only be run *after* the dependencies
754 have been met.
755
756 follow : Dependency or collection of msg_ids
757 Only for load-balanced execution (targets=None)
758 Specify a list of msg_ids as a location-based dependency.
759 This job will only be run on an engine where this dependency
760 is met.
761
762 timeout : float/int or None
763 Only for load-balanced execution (targets=None)
764 Specify an amount of time (in seconds) for the scheduler to
765 wait for dependencies to be met before failing with a
766 DependencyTimeout.
590 767 """
591 768
592 769 super(LoadBalancedView, self).set_flags(**kwargs)
@@ -599,23 +776,101 b' class LoadBalancedView(View):'
599 776 raise ValueError("Invalid dependency: %r"%value)
600 777 if 'timeout' in kwargs:
601 778 t = kwargs['timeout']
602 if not isinstance(t, (int, long, float, None)):
779 if not isinstance(t, (int, long, float, type(None))):
603 780 raise TypeError("Invalid type for timeout: %r"%type(t))
604 781 if t is not None:
605 782 if t < 0:
606 783 raise ValueError("Invalid timeout: %s"%t)
607 784 self.timeout = t
608 785
786 @sync_results
787 @save_ids
788 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
789 after=None, follow=None, timeout=None):
790 """calls f(*args, **kwargs) on a remote engine, returning the result.
791
792 This method temporarily sets all of `apply`'s flags for a single call.
793
794 Parameters
795 ----------
796
797 f : callable
798
799 args : list [default: empty]
800
801 kwargs : dict [default: empty]
802
803 block : bool [default: self.block]
804 whether to block
805 track : bool [default: self.track]
806 whether to ask zmq to track the message, for safe non-copying sends
807
808 !!!!!! TODO: THE REST HERE !!!!
809
810 Returns
811 -------
812
813 if self.block is False:
814 returns AsyncResult
815 else:
816 returns actual result of f(*args, **kwargs) on the engine(s)
817 This will be a list of self.targets is also a list (even length 1), or
818 the single result if self.targets is an integer engine id
819 """
820
821 # validate whether we can run
822 if self._socket.closed:
823 msg = "Task farming is disabled"
824 if self._task_scheme == 'pure':
825 msg += " because the pure ZMQ scheduler cannot handle"
826 msg += " disappearing engines."
827 raise RuntimeError(msg)
828
829 if self._task_scheme == 'pure':
830 # pure zmq scheme doesn't support dependencies
831 msg = "Pure ZMQ scheduler doesn't support dependencies"
832 if (follow or after):
833 # hard fail on DAG dependencies
834 raise RuntimeError(msg)
835 if isinstance(f, dependent):
836 # soft warn on functional dependencies
837 warnings.warn(msg, RuntimeWarning)
838
839 # build args
840 args = [] if args is None else args
841 kwargs = {} if kwargs is None else kwargs
842 block = self.block if block is None else block
843 track = self.track if track is None else track
844 after = self.after if after is None else after
845 follow = self.follow if follow is None else follow
846 timeout = self.timeout if timeout is None else timeout
847 after = self._render_dependency(after)
848 follow = self._render_dependency(follow)
849 subheader = dict(after=after, follow=follow, timeout=timeout, targets=self._idents)
850
851 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
852 subheader=subheader)
853 tracker = None if track is False else msg['tracker']
854
855 ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
856
857 if block:
858 try:
859 return ar.get()
860 except KeyboardInterrupt:
861 pass
862 return ar
863
609 864 @spin_after
610 865 @save_ids
611 866 def map(self, f, *sequences, **kwargs):
612 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
867 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
613 868
614 869 Parallel version of builtin `map`, load-balanced by this View.
615 870
616 `block`, `bound`, and `chunk_size` can be specified by keyword only.
871 `block`, and `chunksize` can be specified by keyword only.
617 872
618 Each `chunk_size` elements will be a separate task, and will be
873 Each `chunksize` elements will be a separate task, and will be
619 874 load-balanced. This lets individual elements be available for iteration
620 875 as soon as they arrive.
621 876
@@ -628,13 +883,11 b' class LoadBalancedView(View):'
628 883 the sequences to be distributed and passed to `f`
629 884 block : bool
630 885 whether to wait for the result or not [default self.block]
631 bound : bool
632 whether to pass the client's Namespace as the first argument to `f`
633 886 track : bool
634 887 whether to create a MessageTracker to allow the user to
635 888 safely edit after arrays and buffers during non-copying
636 889 sends.
637 chunk_size : int
890 chunksize : int
638 891 how many elements should be in each task [default 1]
639 892
640 893 Returns
@@ -652,19 +905,16 b' class LoadBalancedView(View):'
652 905
653 906 # default
654 907 block = kwargs.get('block', self.block)
655 bound = kwargs.get('bound', self.bound)
656 chunk_size = kwargs.get('chunk_size', 1)
908 chunksize = kwargs.get('chunksize', 1)
657 909
658 910 keyset = set(kwargs.keys())
659 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
911 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
660 912 if extra_keys:
661 913 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
662 914
663 915 assert len(sequences) > 0, "must have some sequences to map onto!"
664 916
665 pf = ParallelFunction(self.client, f, block=block, bound=bound,
666 targets=self._targets, balanced=True,
667 chunk_size=chunk_size)
917 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
668 918 return pf.map(*sequences)
669 919
670 920 __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file
@@ -53,12 +53,12 b' def make_bintree(levels):'
53 53 add_children(G, root, levels, 2)
54 54 return G
55 55
56 def submit_jobs(client, G, jobs):
56 def submit_jobs(view, G, jobs):
57 57 """Submit jobs via client where G describes the time dependencies."""
58 58 results = {}
59 59 for node in nx.topological_sort(G):
60 deps = [ results[n] for n in G.predecessors(node) ]
61 results[node] = client.apply(jobs[node], after=deps)
60 with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]):
61 results[node] = view.apply(jobs[node])
62 62 return results
63 63
64 64 def validate_tree(G, results):
@@ -76,7 +76,7 b' def main(nodes, edges):'
76 76 in-degree on the y (just for spread). All arrows must
77 77 point at least slightly to the right if the graph is valid.
78 78 """
79 import pylab
79 from matplotlib import pyplot as plt
80 80 from matplotlib.dates import date2num
81 81 from matplotlib.cm import gist_rainbow
82 82 print "building DAG"
@@ -88,10 +88,11 b' def main(nodes, edges):'
88 88 jobs[node] = randomwait
89 89
90 90 client = cmod.Client()
91 view = client.load_balanced_view()
91 92 print "submitting %i tasks with %i dependencies"%(nodes,edges)
92 results = submit_jobs(client, G, jobs)
93 results = submit_jobs(view, G, jobs)
93 94 print "waiting for results"
94 client.barrier()
95 view.wait()
95 96 print "done"
96 97 for node in G:
97 98 md = results[node].metadata
@@ -107,13 +108,13 b' def main(nodes, edges):'
107 108 xmax,ymax = map(max, (x,y))
108 109 xscale = xmax-xmin
109 110 yscale = ymax-ymin
110 pylab.xlim(xmin-xscale*.1,xmax+xscale*.1)
111 pylab.ylim(ymin-yscale*.1,ymax+yscale*.1)
111 plt.xlim(xmin-xscale*.1,xmax+xscale*.1)
112 plt.ylim(ymin-yscale*.1,ymax+yscale*.1)
112 113 return G,results
113 114
114 115 if __name__ == '__main__':
115 import pylab
116 from matplotlib import pyplot as plt
116 117 # main(5,10)
117 118 main(32,96)
118 pylab.show()
119 plt.show()
119 120 No newline at end of file
@@ -31,7 +31,7 b' def getpid2():'
31 31 import os
32 32 return os.getpid()
33 33
34 view = client[None]
34 view = client.load_balanced_view()
35 35 view.block=True
36 36
37 37 # will run on anything:
@@ -58,29 +58,41 b' successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids'
58 58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59 59
60 60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
69
70 client.block = False
71
72 r1a = client.apply(getpid, after=d1a)
73 r1b = client.apply(getpid, follow=d1b)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
76 r3 = client.apply(getpid, after=d3)
77 r4a = client.apply(getpid, after=d4)
78 r4b = client.apply(getpid, follow=d4)
79 r4c = client.apply(getpid, after=d3, follow=d4)
80 r5 = client.apply(getpid, after=d5)
81 r5b = client.apply(getpid, follow=d5, after=d3)
82 r6 = client.apply(getpid, follow=d6)
83 r6b = client.apply(getpid, after=d6, follow=d2b)
61 d1a = Dependency(mixed, all=False, failure=True) # yes
62 d1b = Dependency(mixed, all=False) # yes
63 d2a = Dependency(mixed, all=True, failure=True) # yes after / no follow
64 d2b = Dependency(mixed, all=True) # no
65 d3 = Dependency(failures, all=False) # no
66 d4 = Dependency(failures, all=False, failure=True) # yes
67 d5 = Dependency(failures, all=True, failure=True) # yes after / no follow
68 d6 = Dependency(successes, all=True, failure=True) # yes after / no follow
69
70 view.block = False
71 flags = view.temp_flags
72 with flags(after=d1a):
73 r1a = view.apply(getpid)
74 with flags(follow=d1b):
75 r1b = view.apply(getpid)
76 with flags(after=d2b, follow=d2a):
77 r2a = view.apply(getpid)
78 with flags(after=d2a, follow=d2b):
79 r2b = view.apply(getpid)
80 with flags(after=d3):
81 r3 = view.apply(getpid)
82 with flags(after=d4):
83 r4a = view.apply(getpid)
84 with flags(follow=d4):
85 r4b = view.apply(getpid)
86 with flags(after=d3, follow=d4):
87 r4c = view.apply(getpid)
88 with flags(after=d5):
89 r5 = view.apply(getpid)
90 with flags(follow=d5, after=d3):
91 r5b = view.apply(getpid)
92 with flags(follow=d6):
93 r6 = view.apply(getpid)
94 with flags(after=d6, follow=d2b):
95 r6b = view.apply(getpid)
84 96
85 97 def should_fail(f):
86 98 try:
@@ -1,8 +1,9 b''
1 1 from IPython.zmq.parallel.client import *
2 2
3 3 client = Client()
4 view = client[:]
4 5
5 @remote(client, block=True)
6 @view.remote(block=True)
6 7 def square(a):
7 8 """return square of a number"""
8 9 return a*a
@@ -21,7 +22,7 b' squares2 = [ r.get() for r in arlist ]'
21 22
22 23 # now the more convenient @parallel decorator, which has a map method:
23 24
24 @parallel(client, block=False)
25 @view.parallel(block=False)
25 26 def psquare(a):
26 27 """return square of a number"""
27 28 return a*a
@@ -3,12 +3,12 b' from IPython.zmq.parallel.client import *'
3 3 client = Client()
4 4
5 5 for id in client.ids:
6 client.push(dict(ids=id*id), targets=id)
6 client[id].push(dict(ids=id*id))
7 7
8 rns = client[0]
9 rns['a'] = 5
8 v = client[0]
9 v['a'] = 5
10 10
11 print rns['a']
11 print v['a']
12 12
13 13 remotes = client[:]
14 14
@@ -49,7 +49,7 b' c = client.Client(profile=cluster_profile)'
49 49
50 50 # A LoadBalancedView is an interface to the engines that provides dynamic load
51 51 # balancing at the expense of not knowing which engine will execute the code.
52 view = c.view()
52 view = c.load_balanced_view()
53 53
54 54 # Initialize the common code on the engines. This Python module has the
55 55 # price_options function that prices the options.
@@ -75,7 +75,7 b' print "Submitted tasks: ", len(async_results)'
75 75 sys.stdout.flush()
76 76
77 77 # Block until all tasks are completed.
78 c.barrier(async_results)
78 c.wait(async_results)
79 79 t2 = time.time()
80 80 t = t2-t1
81 81
@@ -27,14 +27,14 b" filestring = 'pi200m.ascii.%(i)02dof20'"
27 27 files = [filestring % {'i':i} for i in range(1,16)]
28 28
29 29 # Connect to the IPython cluster
30 c = client.Client(profile='edison')
31 c.run('pidigits.py')
30 c = client.Client()
31 c[:].run('pidigits.py')
32 32
33 33 # the number of engines
34 34 n = len(c)
35 35 id0 = c.ids[0]
36 36 v = c[:]
37 v.set_flags(bound=True,block=True)
37 v.block=True
38 38 # fetch the pi-files
39 39 print "downloading %i files of pi"%n
40 40 v.map(fetch_pi_file, files[:n])
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt'
30 30 from IPython.external import argparse
31 31 from IPython.zmq.parallel.client import Client, Reference
32 32
33 def setup_partitioner(ns, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
35 global partitioner
35 36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 38 p.prepare_communication()
38 39 # put the partitioner into the global namespace:
39 ns.partitioner=p
40 partitioner=p
40 41
41 def setup_solver(ns, *args, **kwargs):
42 def setup_solver(*args, **kwargs):
42 43 """create a WaveSolver in the engine namespace"""
43 ns.solver = WaveSolver(*args, **kwargs)
44 global solver
45 solver = WaveSolver(*args, **kwargs)
44 46
45 47 def wave_saver(u, x, y, t):
46 48 """save the wave log"""
@@ -146,11 +148,11 b" if __name__ == '__main__':"
146 148 # setup remote partitioner
147 149 # note that Reference means that the argument passed to setup_partitioner will be the
148 150 # object named 'my_id' in the engine's namespace
149 view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
150 152 # wait for initial communication to complete
151 153 view.execute('mpi.barrier()')
152 154 # setup remote solvers
153 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
154 156
155 157 # lambda for calling solver.solve:
156 158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -172,7 +174,7 b" if __name__ == '__main__':"
172 174
173 175 impl['inner'] = 'vectorized'
174 176 # setup new solvers
175 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
176 178 view.execute('mpi.barrier()')
177 179
178 180 # run again with numpy vectorized inner-implementation
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt'
30 30 from IPython.external import argparse
31 31 from IPython.zmq.parallel.client import Client, Reference
32 32
33 def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
35 global partitioner
35 36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 38 p.prepare_communication()
38 39 # put the partitioner into the global namespace:
39 ns.partitioner=p
40 partitioner=p
40 41
41 def setup_solver(ns, *args, **kwargs):
42 def setup_solver(*args, **kwargs):
42 43 """create a WaveSolver in the engine namespace."""
43 ns.solver = WaveSolver(*args, **kwargs)
44 global solver
45 solver = WaveSolver(*args, **kwargs)
44 46
45 47 def wave_saver(u, x, y, t):
46 48 """save the wave state for each timestep."""
@@ -156,7 +158,7 b" if __name__ == '__main__':"
156 158 # setup remote partitioner
157 159 # note that Reference means that the argument passed to setup_partitioner will be the
158 160 # object named 'com' in the engine's namespace
159 view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
160 162 time.sleep(1)
161 163 # convenience lambda to call solver.solve:
162 164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -164,7 +166,7 b" if __name__ == '__main__':"
164 166 if ns.scalar:
165 167 impl['inner'] = 'scalar'
166 168 # setup remote solvers
167 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
168 170
169 171 # run first with element-wise Python operations for each cell
170 172 t0 = time.time()
@@ -182,7 +184,7 b" if __name__ == '__main__':"
182 184 # run again with faster numpy-vectorized inner implementation:
183 185 impl['inner'] = 'vectorized'
184 186 # setup remote solvers
185 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
186 188
187 189 t0 = time.time()
188 190
@@ -19,7 +19,7 b' Contents'
19 19 whatsnew/index.txt
20 20 install/index.txt
21 21 interactive/index.txt
22 parallel/index.txt
22 .. parallel/index.txt
23 23 parallelz/index.txt
24 24 config/index.txt
25 25 development/index.txt
@@ -45,7 +45,7 b' A possible sequence of events for this workflow:'
45 45
46 46
47 47 Further, taking failures into account, assuming all dependencies are run with the default
48 `success_only=True`, the following cases would occur for each node's failure:
48 `success=True,failure=False`, the following cases would occur for each node's failure:
49 49
50 50 0. fails: all other tasks fail as Impossible
51 51 1. 2 can still succeed, but 3,4 are unreachable
@@ -111,7 +111,8 b' on which it depends:'
111 111
112 112 .. sourcecode:: ipython
113 113
114 In [5]: c = client.Client()
114 In [5]: rc = client.Client()
115 In [5]: view = rc.load_balanced_view()
115 116
116 117 In [6]: results = {}
117 118
@@ -120,13 +121,13 b' on which it depends:'
120 121 ...: # leading into this one as dependencies
121 122 ...: deps = [ results[n] for n in G.predecessors(node) ]
122 123 ...: # submit and store AsyncResult object
123 ...: results[node] = client.apply(jobs[node], after=deps, block=False)
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
124 125
125 126 Now that we have submitted all the jobs, we can wait for the results:
126 127
127 128 .. sourcecode:: ipython
128 129
129 In [8]: [ r.get() for r in results.values() ]
130 In [8]: view.wait(results.values())
130 131
131 132 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
132 133 raised an error if a task failed). But we don't know that the ordering was properly
@@ -17,5 +17,6 b' Using IPython for parallel computing (ZMQ)'
17 17 parallel_demos.txt
18 18 dag_dependencies.txt
19 19 parallel_details.txt
20 parallel_transition.txt
20 21
21 22
@@ -135,7 +135,7 b' calculation can also be run by simply typing the commands from'
135 135 # We simply pass Client the name of the cluster profile we
136 136 # are using.
137 137 In [2]: c = client.Client(profile='mycluster')
138 In [3]: view = c.view(balanced=True)
138 In [3]: view = c.load_balanced_view()
139 139
140 140 In [3]: c.ids
141 141 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
@@ -166,11 +166,11 b' calculation can also be run by simply typing the commands from'
166 166 'pi200m.ascii.15of20']
167 167
168 168 # download the data files if they don't already exist:
169 In [8]: c.map(fetch_pi_file, files)
169 In [8]: v.map(fetch_pi_file, files)
170 170
171 171 # This is the parallel calculation using the Client.map method
172 172 # which applies compute_two_digit_freqs to each file in files in parallel.
173 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
173 In [9]: freqs_all = v.map(compute_two_digit_freqs, files)
174 174
175 175 # Add up the frequencies from each engine.
176 176 In [10]: freqs = reduce_freqs(freqs_all)
@@ -18,13 +18,14 b' Non-copying sends and numpy arrays'
18 18 ----------------------------------
19 19
20 20 When numpy arrays are passed as arguments to apply or via data-movement methods, they are not
21 copied. This means that you must be careful if you are sending an array that you intend to work on.
22 PyZMQ does allow you to track when a message has been sent so you can know when it is safe to edit the buffer, but
23 IPython only allows for this.
21 copied. This means that you must be careful if you are sending an array that you intend to work
22 on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe
23 to edit the buffer, but IPython only allows for this.
24 24
25 25 It is also important to note that the non-copying receive of a message is *read-only*. That
26 means that if you intend to work in-place on an array that you have sent or received, you must copy
27 it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as results.
26 means that if you intend to work in-place on an array that you have sent or received, you must
27 copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as
28 results.
28 29
29 30 The following will fail:
30 31
@@ -69,6 +70,24 b' The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an ar'
69 70 In [6]: _.flags.writeable
70 71 Out[6]: False
71 72
73 If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You
74 must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method
75 for checking and waiting for 0MQ to finish with a buffer.
76
77 .. sourcecode:: ipython
78
79 In [5]: A = numpy.random.random((1024,1024))
80
81 In [6]: view.track=True
82
83 In [7]: ar = view.apply_async(lambda x: 2*x, A)
84
85 In [8]: ar.sent
86 Out[8]: False
87
88 In [9]: ar.wait_on_send() # blocks until sent is True
89
90
72 91 What is sendable?
73 92 -----------------
74 93
@@ -83,6 +102,61 b' buffer without copying - and reconstruct the object on the other side in your ow'
83 102 possible that the object reconstruction will become extensible, so you can add your own
84 103 non-copying types, but this does not yet exist.
85 104
105 Closures
106 ********
107
108 Just about anything in Python is pickleable. The one notable exception is objects (generally
109 functions) with *closures*. Closures can be a complicated topic, but the basic principal is that
110 functions that refer to variables in their parent scope have closures.
111
112 An example of a function that uses a closure:
113
114 .. sourcecode:: python
115
116 def f(a):
117 def inner():
118 # inner will have a closure
119 return a
120 return echo
121
122 f1 = f(1)
123 f2 = f(2)
124 f1() # returns 1
125 f2() # returns 2
126
127 f1 and f2 will have closures referring to the scope in which `inner` was defined, because they
128 use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython.
129 Note that you *would* be able to send `f`. This is only true for interactively defined
130 functions (as are often used in decorators), and only when there are variables used inside the
131 inner function, that are defined in the outer function. If the names are *not* in the outer
132 function, then there will not be a closure, and the generated function will look in
133 ``globals()`` for the name:
134
135 .. sourcecode:: python
136
137 def g(b):
138 # note that `b` is not referenced in inner's scope
139 def inner():
140 # this inner will *not* have a closure
141 return a
142 return echo
143 g1 = g(1)
144 g2 = g(2)
145 g1() # raises NameError on 'a'
146 a=5
147 g2() # returns 5
148
149 `g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as
150 globals(). The :meth:`pull` method is implemented based on this principal. If we did not
151 provide pull, you could implement it yourself with `apply`, by simply returning objects out
152 of the global namespace:
153
154 .. sourcecode:: ipython
155
156 In [10]: view.apply(lambda : a)
157
158 # is equivalent to
159 In [11]: view.pull('a')
86 160
87 161 Running Code
88 162 ============
@@ -94,7 +168,9 b' Client method, called `apply`.'
94 168 Apply
95 169 -----
96 170
97 The principal method of remote execution is :meth:`apply`, of Client and View objects. The Client provides the full execution and communication API for engines via its apply method.
171 The principal method of remote execution is :meth:`apply`, of View objects. The Client provides
172 the full execution and communication API for engines via its low-level
173 :meth:`send_apply_message` method.
98 174
99 175 f : function
100 176 The fuction to be called remotely
@@ -102,8 +178,6 b' args : tuple/list'
102 178 The positional arguments passed to `f`
103 179 kwargs : dict
104 180 The keyword arguments passed to `f`
105 bound : bool (default: False)
106 Whether to pass the Engine(s) Namespace as the first argument to `f`.
107 181 block : bool (default: self.block)
108 182 Whether to wait for the result, or return immediately.
109 183 False:
@@ -135,8 +209,6 b' balanced : bool, default None'
135 209 If `balanced` and `targets` are both specified, the task will
136 210 be assigne to *one* of the targets by the scheduler.
137 211
138 The following arguments are only used when balanced is True:
139
140 212 after : Dependency or collection of msg_ids
141 213 Only for load-balanced execution (targets=None)
142 214 Specify a list of msg_ids as a time-based dependency.
@@ -158,11 +230,11 b' timeout : float/int or None'
158 230 execute and run
159 231 ---------------
160 232
161 For executing strings of Python code, Clients also provide an :meth:`execute` and a :meth:`run`
162 method, which rather than take functions and arguments, take simple strings. `execute` simply
163 takes a string of Python code to execute, and sends it to the Engine(s). `run` is the same as
164 `execute`, but for a *file*, rather than a string. It is simply a wrapper that does something
165 very similar to ``execute(open(f).read())``.
233 For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a
234 :meth:`run` method, which rather than take functions and arguments, take simple strings.
235 `execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run`
236 is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that
237 does something very similar to ``execute(open(f).read())``.
166 238
167 239 .. note::
168 240
@@ -172,44 +244,25 b' Views'
172 244 =====
173 245
174 246 The principal extension of the :class:`~parallel.client.Client` is the
175 :class:`~parallel.view.View` class. The client is a fairly stateless object with respect to
176 execution patterns, where you must specify everything about the execution as keywords to each
177 call to :meth:`apply`. For users who want to more conveniently specify various options for
178 several similar calls, we have the :class:`~parallel.view.View` objects. The basic principle of
179 the views is to encapsulate the keyword arguments to :meth:`client.apply` as attributes,
180 allowing users to specify them once and apply to any subsequent calls until the attribute is
181 changed.
247 :class:`~parallel.view.View` class. The client
182 248
183 249 Two of apply's keyword arguments are set at the construction of the View, and are immutable for
184 250 a given View: `balanced` and `targets`. `balanced` determines whether the View will be a
185 251 :class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets`
186 252 attribute. Attempts to change this will raise errors.
187 253
188 Views are cached by targets+balanced combinations, so requesting a view multiple times will always return the *same object*, not create a new one:
254 Views are cached by targets/class, so requesting a view multiple times will always return the
255 *same object*, not create a new one:
189 256
190 257 .. sourcecode:: ipython
191 258
192 In [3]: v1 = rc.view([1,2,3], balanced=True)
193 In [4]: v2 = rc.view([1,2,3], balanced=True)
259 In [3]: v1 = rc.load_balanced_view([1,2,3])
260 In [4]: v2 = rc.load_balanced_view([1,2,3])
194 261
195 262 In [5]: v2 is v1
196 263 Out[5]: True
197 264
198 265
199 A :class:`View` always uses its `targets` attribute, and it will use its `bound`
200 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
201 methods allow overriding `bound` and `block` for a single call.
202
203 ================== ========== ==========
204 method block bound
205 ================== ========== ==========
206 apply self.block self.bound
207 apply_sync True False
208 apply_async False False
209 apply_sync_bound True True
210 apply_async_bound False True
211 ================== ========== ==========
212
213 266 DirectView
214 267 ----------
215 268
@@ -379,24 +432,26 b' interactive session - you must poll the 0MQ sockets for incoming messages. Note'
379 432 this polling *does not* actually make any network requests. It simply performs a `select`
380 433 operation, to check if messages are already in local memory, waiting to be handled.
381 434
382 The method that handles incoming messages is :meth:`spin`. This method flushes any waiting messages on the various incoming sockets, and updates the state of the Client.
435 The method that handles incoming messages is :meth:`spin`. This method flushes any waiting
436 messages on the various incoming sockets, and updates the state of the Client.
383 437
384 If you need to wait for particular results to finish, you can use the :meth:`barrier` method,
438 If you need to wait for particular results to finish, you can use the :meth:`wait` method,
385 439 which will call :meth:`spin` until the messages are no longer outstanding. Anything that
386 440 represents a collection of messages, such as a list of msg_ids or one or more AsyncResult
387 objects, can be passed as argument to barrier. A timeout can be specified, which will prevent
388 the barrier from blocking for more than a specified time, but the default behavior is to wait
441 objects, can be passed as argument to wait. A timeout can be specified, which will prevent
442 the call from blocking for more than a specified time, but the default behavior is to wait
389 443 forever.
390 444
391 445
392 446
393 447 The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies.
394 This is the default if barrier is called with no arguments - i.e. barrier on *all* outstanding messages.
448 This is the default if wait is called with no arguments - i.e. wait on *all* outstanding
449 messages.
395 450
396 451
397 452 .. note::
398 453
399 TODO barrier example
454 TODO wait example
400 455
401 456 Map
402 457 ===
@@ -89,7 +89,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote'
89 89 The controller also provides a single point of contact for users who wish to
90 90 utilize the engines connected to the controller. There are different ways of
91 91 working with a controller. In IPython, all of these models are implemented via
92 the client's :meth:`.Client.apply` method, with various arguments, or
92 the client's :meth:`.View.apply` method, with various arguments, or
93 93 constructing :class:`.View` objects to represent subsets of engines. The two
94 94 primary models for interacting with engines are:
95 95
@@ -124,12 +124,13 b' themselves block when user code is run, the schedulers hide that from the user t'
124 124 a fully asynchronous interface to a set of engines.
125 125
126 126
127 IPython client
128 --------------
127 IPython client and views
128 ------------------------
129 129
130 There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a
131 controller. For each model, there is a corresponding view. These views allow users to
132 interact with a set of engines through the interface. Here are the two default views:
130 There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a cluster.
131 For each execution model, there is a corresponding :class:`~.parallel.view.View`. These views
132 allow users to interact with a set of engines through the interface. Here are the two default
133 views:
133 134
134 135 * The :class:`DirectView` class for explicit addressing.
135 136 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
@@ -212,7 +213,7 b' everything is working correctly, try the following commands:'
212 213 In [4]: c.ids
213 214 Out[4]: set([0, 1, 2, 3])
214 215
215 In [5]: c.apply(lambda : "Hello, World", targets='all', block=True)
216 In [5]: c[:].apply_sync(lambda : "Hello, World")
216 217 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
217 218
218 219
@@ -234,10 +235,10 b' then you would connect to it with:'
234 235 In [2]: c = client.Client(sshserver='myhub.example.com')
235 236
236 237 Where 'myhub.example.com' is the url or IP address of the machine on
237 which the Hub process is running.
238 which the Hub process is running (or another machine that has direct access to the Hub's ports).
238 239
239 240 You are now ready to learn more about the :ref:`Direct
240 <parallelmultiengine>` and :ref:`LoadBalanced <paralleltask>` interfaces to the
241 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
241 242 controller.
242 243
243 244 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
@@ -1,4 +1,4 b''
1 .. _parallelmultiengine:
1 .. _parallel_multiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
@@ -9,7 +9,7 b' IPython engines. The basic idea behind the multiengine interface is that the'
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is thus the best place for
12 and is designed with interactive usage in mind, and is the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
@@ -91,9 +91,7 b" DirectView's :meth:`map` method:"
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 In [63]: dview.block = True
95
96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
97 95
98 96 In [67]: serial_result==parallel_result
99 97 Out[67]: True
@@ -103,8 +101,7 b" DirectView's :meth:`map` method:"
103 101
104 102 The :class:`DirectView`'s version of :meth:`map` does
105 103 not do dynamic load balancing. For a load balanced version, use a
106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
107 `balanced=True`.
104 :class:`LoadBalancedView`.
108 105
109 106 .. seealso::
110 107
@@ -119,7 +116,7 b' two decorators:'
119 116
120 117 .. sourcecode:: ipython
121 118
122 In [10]: @rc.remote(block=True, targets='all')
119 In [10]: @dview.remote(block=True)
123 120 ...: def getpid():
124 121 ...: import os
125 122 ...: return os.getpid()
@@ -128,7 +125,7 b' two decorators:'
128 125 In [11]: getpid()
129 126 Out[11]: [12345, 12346, 12347, 12348]
130 127
131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
132 129 operations and distribute them, reconstructing the result.
133 130
134 131 .. sourcecode:: ipython
@@ -137,13 +134,13 b' operations and distribute them, reconstructing the result.'
137 134
138 135 In [13]: A = np.random.random((64,48))
139 136
140 In [14]: @rc.parallel(block=True, targets='all')
137 In [14]: @dview.parallel(block=True)
141 138 ...: def pmul(A,B):
142 139 ...: return A*B
143 140
144 141 In [15]: C_local = A*A
145 142
146 In [16]: C_remote_partial = pmul(A,A)
143 In [16]: C_remote = pmul(A,A)
147 144
148 145 In [17]: (C_local == C_remote).all()
149 146 Out[17]: True
@@ -159,38 +156,36 b' Calling Python functions'
159 156 The most basic type of operation that can be performed on the engines is to
160 157 execute Python code or call Python functions. Executing Python code can be
161 158 done in blocking or non-blocking mode (non-blocking is default) using the
162 :meth:`execute` method, and calling functions can be done via the
159 :meth:`.View.execute` method, and calling functions can be done via the
163 160 :meth:`.View.apply` method.
164 161
165 162 apply
166 163 -----
167 164
168 165 The main method for doing remote execution (in fact, all methods that
169 communicate with the engines are built on top of it), is :meth:`Client.apply`.
170 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
171 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
172 require some more options, they cannot easily provide this interface.
173 Instead, they provide the signature:
174
175 .. sourcecode:: python
166 communicate with the engines are built on top of it), is :meth:`View.apply`.
176 167
177 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
178 after=None, follow=None, timeout=None)
168 We strive to provide the cleanest interface we can, so `apply` has the following
169 signature:
179 170
180 Where various behavior is controlled via keyword arguments. This means that in the client,
181 you must pass `args` as a tuple, and `kwargs` as a dict.
171 .. sourcecode:: python
182 172
183 In order to provide the nicer interface, we have :class:`View` classes, which wrap
184 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
185 the extra keyword arguments. This means that the views can have the desired pattern:
173 view.apply(f, *args, **kwargs)
186 174
187 .. sourcecode:: python
175 There are various ways to call functions with IPython, and these flags are set as
176 attributes of the View. The ``DirectView`` has just two of these flags:
188 177
189 v.apply(f, *args, **kwargs)
178 dv.block : bool
179 whether to wait for the result, or return an :class:`AsyncResult` object
180 immediately
181 dv.track : bool
182 whether to instruct pyzmq to track when
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 edit in-place. You need to know when it becomes safe to edit the buffer
185 without corrupting the message.
190 186
191 187
192 For instance, performing index-access on a client creates a
193 :class:`.DirectView`.
188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
194 189
195 190 .. sourcecode:: ipython
196 191
@@ -198,23 +193,9 b' For instance, performing index-access on a client creates a'
198 193 Out[4]: <DirectView [1, 2]>
199 194
200 195 In [5]: view.apply<tab>
201 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
202
203 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
204 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
205 methods allow specifying `bound` and `block` via the different methods.
196 view.apply view.apply_async view.apply_sync view.apply_with_flags
206 197
207 ================== ========== ==========
208 method block bound
209 ================== ========== ==========
210 apply self.block self.bound
211 apply_sync True False
212 apply_async False False
213 apply_sync_bound True True
214 apply_async_bound False True
215 ================== ========== ==========
216
217 For explanation of these values, read on.
198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
218 199
219 200 Blocking execution
220 201 ------------------
@@ -232,63 +213,29 b' blocks until the engines are done executing the command:'
232 213
233 214 In [5]: dview['b'] = 10
234 215
235 In [6]: dview.apply_sync(lambda x: a+b+x, 27)
216 In [6]: dview.apply(lambda x: a+b+x, 27)
236 217 Out[6]: [42, 42, 42, 42]
237 218
238 Python commands can be executed on specific engines by calling execute using the ``targets``
239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
240 index-access to the client:
241
242 .. sourcecode:: ipython
243
244 In [6]: rc.execute('c=a+b', targets=[0,2])
245
246 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
247
248 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
249 Out[8]: [15, -5, 15, -5]
250
251 .. note::
219 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
220 method:
252 221
253 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
254 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
255 where this differs in in :meth:`apply`. The :class:`Client` takes many
256 arguments to apply, so it requires `args` and `kwargs` to be passed as
257 individual arguments. Extended options such as `bound`,`targets`, and
258 `block` are controlled by the attributes of the :class:`View` objects, so
259 they can provide the much more convenient
260 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
261 ``f(*args,**kwargs)`` remotely.
222 In [7]: dview.block=False
262 223
263 Bound and unbound execution
264 ---------------------------
224 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
225 Out[8]: [42, 42, 42, 42]
265 226
266 The previous example also shows one of the most important things about the IPython
267 engines: they have a persistent user namespaces. The :meth:`apply` method can
268 be run in either a bound or unbound manner.
269
270 When applying a function in a `bound` manner, the first argument to that function
271 will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
272 also providing attribute-access to keys.
273
274 In all (unbound and bound) execution
227 Python commands can be executed as strings on specific engines by using a View's ``execute``
228 method:
275 229
276 230 .. sourcecode:: ipython
277 231
278 In [9]: dview['b'] = 5 # assign b to 5 everywhere
279
280 In [10]: v0 = rc[0]
232 In [6]: rc[::2].execute('c=a+b')
281 233
282 # multiply b*2 inplace
283 In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
234 In [7]: rc[1::2].execute('c=a-b')
284 235
285 # b is still available in globals during unbound execution
286 In [13]: v0.apply_sync(lambda a: a*b, 3)
287 Out[13]: 30
236 In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True)
237 Out[8]: [15, -5, 15, -5]
288 238
289 `bound=True` specifies that the engine's namespace is to be passed as the first argument when
290 the function is called, and the default `bound=False` specifies that the normal behavior, but
291 the engine's namespace will be available as the globals() when the function is called.
292 239
293 240 Non-blocking execution
294 241 ----------------------
@@ -351,22 +298,24 b' local Python/IPython session:'
351 298 .. Note::
352 299
353 300 Note the import inside the function. This is a common model, to ensure
354 that the appropriate modules are imported where the task is run.
301 that the appropriate modules are imported where the task is run. You can
302 also manually import modules into the engine(s) namespace(s) via
303 :meth:`view.execute('import numpy')`.
355 304
356 305 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
357 are done. For this, there is a the method :meth:`barrier`. This method takes a
306 are done. For this, there is a the method :meth:`wait`. This method takes a
358 307 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
359 308 and blocks until all of the associated results are ready:
360 309
361 310 .. sourcecode:: ipython
362 311
363 In [72]: rc.block=False
312 In [72]: dview.block=False
364 313
365 314 # A trivial list of AsyncResults objects
366 315 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
367 316
368 317 # Wait until all of them are done
369 In [74]: rc.barrier(pr_list)
318 In [74]: dview.wait(pr_list)
370 319
371 320 # Then, their results are ready using get() or the `.r` attribute
372 321 In [75]: pr_list[0].get()
@@ -374,12 +323,12 b' and blocks until all of the associated results are ready:'
374 323
375 324
376 325
377 The ``block`` keyword argument and attributes
378 ---------------------------------------------
326 The ``block`` attribute
327 -----------------------
379 328
380 Most client methods(like :meth:`apply`) accept
329 Many View methods(excluding :meth:`apply`) accept
381 330 ``block`` as a keyword argument. As we have seen above, these
382 keyword arguments control the blocking mode. The :class:`Client` class also has
331 keyword arguments control the blocking mode. The :class:`View` class also has
383 332 a :attr:`block` attribute that controls the default behavior when the keyword
384 333 argument is not provided. Thus the following logic is used for :attr:`block`:
385 334
@@ -387,37 +336,33 b' argument is not provided. Thus the following logic is used for :attr:`block`:'
387 336 * Keyword argument, if provided override the instance attributes for
388 337 the duration of a single call.
389 338
390 DirectView objects also have a ``bound`` attribute, which is used in the same way.
391
392 339 The following examples demonstrate how to use the instance attributes:
393 340
394 341 .. sourcecode:: ipython
395 342
396 In [17]: rc.block = False
343 In [17]: dview.block = False
397 344
398 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
345 In [18]: ar = dview.apply(lambda : 10)
399 346
400 347 In [19]: ar.get()
401 Out[19]: [10,10]
348 Out[19]: [10, 10, 10, 10]
402 349
403 In [21]: rc.block = True
350 In [21]: dview.block = True
404 351
405 352 # Note targets='all' means all engines
406 In [22]: rc.apply(lambda : 42, targets='all')
353 In [22]: dview.apply(lambda : 42)
407 354 Out[22]: [42, 42, 42, 42]
408 355
409 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
356 The :attr:`block` and :attr:`targets` instance attributes of the
410 357 :class:`.DirectView` also determine the behavior of the parallel magic commands.
411 358
412
413 359 Parallel magic commands
414 360 -----------------------
415 361
416 362 .. warning::
417 363
418 The magics have not been changed to work with the zeromq system. ``%px``
419 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
420 not* print stdin/out.
364 The magics have not been changed to work with the zeromq system. The
365 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
421 366
422 367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
423 368 that make it more pleasant to execute Python commands on the engines
@@ -428,6 +373,9 b' Python command on the engines specified by the :attr:`targets` attribute of the'
428 373
429 374 .. sourcecode:: ipython
430 375
376 # load the parallel magic extension:
377 In [21]: %load_ext parallelmagic
378
431 379 # Create a DirectView for all targets
432 380 In [22]: dv = rc[:]
433 381
@@ -521,23 +469,19 b' Here are some examples of how you use :meth:`push` and :meth:`pull`:'
521 469
522 470 .. sourcecode:: ipython
523 471
524 In [38]: rc.push(dict(a=1.03234,b=3453))
472 In [38]: dview.push(dict(a=1.03234,b=3453))
525 473 Out[38]: [None,None,None,None]
526 474
527 In [39]: rc.pull('a')
475 In [39]: dview.pull('a')
528 476 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
529 477
530 In [40]: rc.pull('b',targets=0)
478 In [40]: rc[0].pull('b')
531 479 Out[40]: 3453
532 480
533 In [41]: rc.pull(('a','b'))
481 In [41]: dview.pull(('a','b'))
534 482 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
535 483
536 # zmq client does not have zip_pull
537 In [42]: rc.zip_pull(('a','b'))
538 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
539
540 In [43]: rc.push(dict(c='speed'))
484 In [43]: dview.push(dict(c='speed'))
541 485 Out[43]: [None,None,None,None]
542 486
543 487 In non-blocking mode :meth:`push` and :meth:`pull` also return
@@ -545,9 +489,7 b' In non-blocking mode :meth:`push` and :meth:`pull` also return'
545 489
546 490 .. sourcecode:: ipython
547 491
548 In [47]: rc.block=False
549
550 In [48]: ar = rc.pull('a')
492 In [48]: ar = dview.pull('a', block=False)
551 493
552 494 In [49]: ar.get()
553 495 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
@@ -563,8 +505,6 b' appear as a local dictionary. Underneath, these methods call :meth:`apply`:'
563 505
564 506 .. sourcecode:: ipython
565 507
566 In [50]: dview.block=True
567
568 508 In [51]: dview['a']=['foo','bar']
569 509
570 510 In [52]: dview['a']
@@ -606,7 +546,7 b' basic effect using :meth:`scatter` and :meth:`gather`:'
606 546
607 547 In [66]: dview.scatter('x',range(64))
608 548
609 In [67]: px y = [i**10 for i in x]
549 In [67]: %px y = [i**10 for i in x]
610 550 Parallel execution on engines: [0, 1, 2, 3]
611 551 Out[67]:
612 552
@@ -633,31 +573,47 b' more other types of exceptions. Here is how it works:'
633 573 In [77]: dview.execute('1/0')
634 574 ---------------------------------------------------------------------------
635 575 CompositeError Traceback (most recent call last)
636 /Users/minrk/<ipython-input-10-5d56b303a66c> in <module>()
637 ----> 1 dview.execute('1/0')
638
639 ...
576 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
577 ----> 1 dview.execute('1/0', block=True)
578
579 /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block)
580 460 default: self.block
581 461 """
582 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
583 463
584 464 def run(self, filename, block=None):
585
586 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
587
588 /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs)
589 46 def sync_results(f, self, *args, **kwargs):
590 47 """sync relevant results from self.client to our results attribute."""
591 ---> 48 ret = f(self, *args, **kwargs)
592 49 delta = self.outstanding.difference(self.client.outstanding)
593 50 completed = self.outstanding.intersection(delta)
594
595 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
596
597 /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs)
598 35 n_previous = len(self.client.history)
599 36 try:
600 ---> 37 ret = f(self, *args, **kwargs)
601 38 finally:
602 39 nmsgs = len(self.client.history) - n_previous
603
604 /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
605 398 if block:
606 399 try:
607 --> 400 return ar.get()
608 401 except KeyboardInterrupt:
609 402 pass
640 610
641 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
642 1012 raise ValueError(msg)
643 1013 else:
644 -> 1014 return self._apply_direct(f, args, kwargs, **options)
645 1015
646 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
647
648 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
649 1100 if block:
650 1101 try:
651 -> 1102 return ar.get()
652 1103 except KeyboardInterrupt:
653 1104 return ar
654
655 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
656 78 return self._result
657 79 else:
658 ---> 80 raise self._exception
659 81 else:
660 82 raise error.TimeoutError("Result not ready.")
611 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
612 87 return self._result
613 88 else:
614 ---> 89 raise self._exception
615 90 else:
616 91 raise error.TimeoutError("Result not ready.")
661 617
662 618 CompositeError: one or more exceptions from call to method: _execute
663 619 [0:apply]: ZeroDivisionError: integer division or modulo by zero
@@ -665,6 +621,7 b' more other types of exceptions. Here is how it works:'
665 621 [2:apply]: ZeroDivisionError: integer division or modulo by zero
666 622 [3:apply]: ZeroDivisionError: integer division or modulo by zero
667 623
624
668 625 Notice how the error message printed when :exc:`CompositeError` is raised has
669 626 information about the individual exceptions that were raised on each engine.
670 627 If you want, you can even raise one of these original exceptions:
@@ -672,7 +629,7 b' If you want, you can even raise one of these original exceptions:'
672 629 .. sourcecode:: ipython
673 630
674 631 In [80]: try:
675 ....: rc.execute('1/0')
632 ....: dview.execute('1/0')
676 633 ....: except client.CompositeError, e:
677 634 ....: e.raise_exception()
678 635 ....:
@@ -697,57 +654,50 b' instance:'
697 654
698 655 .. sourcecode:: ipython
699 656
700 In [81]: rc.execute('1/0')
657 In [81]: dview.execute('1/0')
701 658 ---------------------------------------------------------------------------
702 659 CompositeError Traceback (most recent call last)
703 /Users/minrk/<ipython-input-5-b0c7a2b62c52> in <module>()
704 ----> 1 rc.execute('1/0')
705
706 /Users/minrk/<string> in execute(self, code, targets, block)
707
708 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
709 88 self.block = block
710 89 try:
711 ---> 90 ret = f(self, *args, **kwargs)
712 91 finally:
713 92 self.block = saveblock
714
715 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in execute(self, code, targets, block)
716 855 default: self.block
717 856 """
718 --> 857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
719 858 if not block:
720 859 return result
721
722 /Users/minrk/<string> in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
723
724 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
725 88 self.block = block
726 89 try:
727 ---> 90 ret = f(self, *args, **kwargs)
728 91 finally:
729 92 self.block = saveblock
730
731 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
732 1012 raise ValueError(msg)
733 1013 else:
734 -> 1014 return self._apply_direct(f, args, kwargs, **options)
735 1015
736 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
737
738 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
739 1100 if block:
740 1101 try:
741 -> 1102 return ar.get()
742 1103 except KeyboardInterrupt:
743 1104 return ar
660 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
661 ----> 1 dview.execute('1/0', block=True)
662
663 /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block)
664 460 default: self.block
665 461 """
666 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
667 463
668 464 def run(self, filename, block=None):
669
670 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
671
672 /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs)
673 46 def sync_results(f, self, *args, **kwargs):
674 47 """sync relevant results from self.client to our results attribute."""
675 ---> 48 ret = f(self, *args, **kwargs)
676 49 delta = self.outstanding.difference(self.client.outstanding)
677 50 completed = self.outstanding.intersection(delta)
678
679 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
680
681 /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs)
682 35 n_previous = len(self.client.history)
683 36 try:
684 ---> 37 ret = f(self, *args, **kwargs)
685 38 finally:
686 39 nmsgs = len(self.client.history) - n_previous
687
688 /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
689 398 if block:
690 399 try:
691 --> 400 return ar.get()
692 401 except KeyboardInterrupt:
693 402 pass
744 694
745 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
746 78 return self._result
747 79 else:
748 ---> 80 raise self._exception
749 81 else:
750 82 raise error.TimeoutError("Result not ready.")
695 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
696 87 return self._result
697 88 else:
698 ---> 89 raise self._exception
699 90 else:
700 91 raise error.TimeoutError("Result not ready.")
751 701
752 702 CompositeError: one or more exceptions from call to method: _execute
753 703 [0:apply]: ZeroDivisionError: integer division or modulo by zero
@@ -815,14 +765,18 b' instance:'
815 765 ZeroDivisionError: integer division or modulo by zero
816 766
817 767
768 .. note::
769
770 TODO: The above tracebacks are not up to date
771
818 772
819 773 All of this same error handling magic even works in non-blocking mode:
820 774
821 775 .. sourcecode:: ipython
822 776
823 In [83]: rc.block=False
777 In [83]: dview.block=False
824 778
825 In [84]: ar = rc.execute('1/0')
779 In [84]: ar = dview.execute('1/0')
826 780
827 781 In [85]: ar.get()
828 782 ---------------------------------------------------------------------------
@@ -1,4 +1,4 b''
1 .. _paralleltask:
1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
@@ -54,11 +54,12 b' argument to the constructor:'
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = client.Client(profile='mpi')
56 56
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can be constructed via the client's :meth:`view` method:
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 be constructed via the client's :meth:`load_balanced_view` method:
58 59
59 60 .. sourcecode:: ipython
60 61
61 In [4]: lview = rc.view() # default load-balanced view
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
62 63
63 64 .. seealso::
64 65
@@ -110,6 +111,8 b' that turns any Python function into a parallel function:'
110 111 In [11]: f.map(range(32)) # this is done in parallel
111 112 Out[11]: [0.0,10.0,160.0,...]
112 113
114 .. _parallel_dependencies:
115
113 116 Dependencies
114 117 ============
115 118
@@ -230,12 +233,18 b' any|all'
230 233 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
231 234 boolean attribute, which defaults to ``True``.
232 235
233 success_only
234 Whether to consider only tasks that did not raise an error as being fulfilled.
235 Sometimes you want to run a task after another, but only if that task succeeded. In
236 this case, ``success_only`` should be ``True``. However sometimes you may not care
237 whether the task succeeds, and always want the second task to run, in which case
238 you should use `success_only=False`. The default behavior is to only use successes.
236 success [default: True]
237 Whether to consider tasks that succeeded as fulfilling dependencies.
238
239 failure [default : False]
240 Whether to consider tasks that failed as fulfilling dependencies.
241 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
242 only when tasks have failed.
243
244 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
245 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
246 not care whether the task succeeds, and always want the second task to run, in which case you
247 should use `success=failure=True`. The default behavior is to only use successes.
239 248
240 249 There are other switches for interpretation that are made at the *task* level. These are
241 250 specified via keyword arguments to the client's :meth:`apply` method.
@@ -258,7 +267,7 b' timeout'
258 267 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
259 268 task to run after a job submitted via the MUX interface.
260 269
261 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
270 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
262 271 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
263 272 `follow` and `after` keywords to :meth:`client.apply`:
264 273
@@ -266,13 +275,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
266 275
267 276 In [14]: client.block=False
268 277
269 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
278 In [15]: ar = lview.apply(f, args, kwargs)
270 279
271 In [16]: ar2 = client.apply(f2, balanced=True)
280 In [16]: ar2 = lview.apply(f2)
272 281
273 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
282 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
274 283
275 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
284 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
276 285
277 286
278 287 .. seealso::
@@ -297,8 +306,8 b' The basic cases that are checked:'
297 306
298 307 * depending on nonexistent messages
299 308 * `follow` dependencies were run on more than one machine and `all=True`
300 * any dependencies failed and `all=True,success_only=True`
301 * all dependencies failed and `all=False,success_only=True`
309 * any dependencies failed and `all=True,success=True,failures=False`
310 * all dependencies failed and `all=False,success=True,failure=False`
302 311
303 312 .. warning::
304 313
@@ -386,27 +395,25 b' Disabled features when using the ZMQ Scheduler:'
386 395 More details
387 396 ============
388 397
389 The :class:`Client` has many more powerful features that allow quite a bit
398 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
390 399 of flexibility in how tasks are defined and run. The next places to look are
391 400 in the following classes:
392 401
393 * :class:`IPython.zmq.parallel.client.Client`
402 * :class:`IPython.zmq.parallel.view.LoadBalancedView`
394 403 * :class:`IPython.zmq.parallel.client.AsyncResult`
395 * :meth:`IPython.zmq.parallel.client.Client.apply`
404 * :meth:`IPython.zmq.parallel.view.LoadBalancedView.apply`
396 405 * :mod:`IPython.zmq.parallel.dependency`
397 406
398 407 The following is an overview of how to use these classes together:
399 408
400 1. Create a :class:`Client`.
409 1. Create a :class:`Client` and :class:`LoadBalancedView`
401 410 2. Define some functions to be run as tasks
402 411 3. Submit your tasks to using the :meth:`apply` method of your
403 :class:`Client` instance, specifying `balanced=True`. This signals
404 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
405 4. Use :meth:`Client.get_results` to get the results of the
412 :class:`LoadBalancedView` instance.
413 4. Use :meth:`Client.get_result` to get the results of the
406 414 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
407 415 for and then receive the results.
408 416
409
410 417 .. seealso::
411 418
412 419 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now