##// END OF EJS Templates
update API after sagedays29...
MinRK -
Show More
@@ -0,0 +1,69 b''
1 """Tests for asyncresult.py"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14
15 from IPython.zmq.parallel.error import TimeoutError
16
17 from IPython.zmq.parallel.tests import add_engines
18 from .clienttest import ClusterTestCase
19
20 def setup():
21 add_engines(2)
22
23 def wait(n):
24 import time
25 time.sleep(n)
26 return n
27
28 class AsyncResultTest(ClusterTestCase):
29
30 def test_single_result(self):
31 eid = self.client.ids[-1]
32 ar = self.client[eid].apply_async(lambda : 42)
33 self.assertEquals(ar.get(), 42)
34 ar = self.client[[eid]].apply_async(lambda : 42)
35 self.assertEquals(ar.get(), [42])
36 ar = self.client[-1:].apply_async(lambda : 42)
37 self.assertEquals(ar.get(), [42])
38
39 def test_get_after_done(self):
40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 ar.wait()
43 self.assertTrue(ar.ready())
44 self.assertEquals(ar.get(), 42)
45 self.assertEquals(ar.get(), 42)
46
47 def test_get_before_done(self):
48 ar = self.client[-1].apply_async(wait, 0.1)
49 self.assertRaises(TimeoutError, ar.get, 0)
50 ar.wait(0)
51 self.assertFalse(ar.ready())
52 self.assertEquals(ar.get(), 0.1)
53
54 def test_get_after_error(self):
55 ar = self.client[-1].apply_async(lambda : 1/0)
56 ar.wait()
57 self.assertRaisesRemote(ZeroDivisionError, ar.get)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
60
61 def test_get_dict(self):
62 n = len(self.client)
63 ar = self.client[:].apply_async(lambda : 5)
64 self.assertEquals(ar.get(), [5]*n)
65 d = ar.get_dict()
66 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
67 for eid,r in d.iteritems():
68 self.assertEquals(r, 5)
69
@@ -0,0 +1,101 b''
1 """Tests for dependency.py"""
2
3 __docformat__ = "restructuredtext en"
4
5 #-------------------------------------------------------------------------------
6 # Copyright (C) 2011 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
10 #-------------------------------------------------------------------------------
11
12 #-------------------------------------------------------------------------------
13 # Imports
14 #-------------------------------------------------------------------------------
15
16 # import
17 import os
18
19 from IPython.utils.pickleutil import can, uncan
20
21 from IPython.zmq.parallel import dependency as dmod
22 from IPython.zmq.parallel.util import interactive
23
24 from IPython.zmq.parallel.tests import add_engines
25 from .clienttest import ClusterTestCase
26
27 def setup():
28 add_engines(1)
29
30 @dmod.require('time')
31 def wait(n):
32 time.sleep(n)
33 return n
34
35 mixed = map(str, range(10))
36 completed = map(str, range(0,10,2))
37 failed = map(str, range(1,10,2))
38
39 class DependencyTest(ClusterTestCase):
40
41 def setUp(self):
42 ClusterTestCase.setUp(self)
43 self.user_ns = {'__builtins__' : __builtins__}
44 self.view = self.client.load_balanced_view()
45 self.dview = self.client[-1]
46 self.succeeded = set(map(str, range(0,25,2)))
47 self.failed = set(map(str, range(1,25,2)))
48
49 def assertMet(self, dep):
50 self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met")
51
52 def assertUnmet(self, dep):
53 self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met")
54
55 def assertUnreachable(self, dep):
56 self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable")
57
58 def assertReachable(self, dep):
59 self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable")
60
61 def cancan(self, f):
62 """decorator to pass through canning into self.user_ns"""
63 return uncan(can(f), self.user_ns)
64
65 def test_require_imports(self):
66 """test that @require imports names"""
67 @self.cancan
68 @dmod.require('urllib')
69 @interactive
70 def encode(dikt):
71 return urllib.urlencode(dikt)
72 # must pass through canning to properly connect namespaces
73 self.assertEquals(encode(dict(a=5)), 'a=5')
74
75 def test_success_only(self):
76 dep = dmod.Dependency(mixed, success=True, failure=False)
77 self.assertUnmet(dep)
78 self.assertUnreachable(dep)
79 dep.all=False
80 self.assertMet(dep)
81 self.assertReachable(dep)
82 dep = dmod.Dependency(completed, success=True, failure=False)
83 self.assertMet(dep)
84 self.assertReachable(dep)
85 dep.all=False
86 self.assertMet(dep)
87 self.assertReachable(dep)
88
89 def test_failure_only(self):
90 dep = dmod.Dependency(mixed, success=False, failure=True)
91 self.assertUnmet(dep)
92 self.assertUnreachable(dep)
93 dep.all=False
94 self.assertMet(dep)
95 self.assertReachable(dep)
96 dep = dmod.Dependency(completed, success=False, failure=True)
97 self.assertUnmet(dep)
98 self.assertUnreachable(dep)
99 dep.all=False
100 self.assertUnmet(dep)
101 self.assertUnreachable(dep)
@@ -0,0 +1,287 b''
1 """test View objects"""
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
8
9 #-------------------------------------------------------------------------------
10 # Imports
11 #-------------------------------------------------------------------------------
12
13 import time
14 from tempfile import mktemp
15
16 import zmq
17
18 from IPython.zmq.parallel import client as clientmod
19 from IPython.zmq.parallel import error
20 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult, AsyncMapResult
21 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
22 from IPython.zmq.parallel.util import interactive
23
24 from IPython.zmq.parallel.tests import add_engines
25
26 from .clienttest import ClusterTestCase, segfault, wait, skip_without
27
28 def setup():
29 add_engines(3)
30
31 class TestView(ClusterTestCase):
32
33 def test_segfault_task(self):
34 """test graceful handling of engine death (balanced)"""
35 # self.add_engines(1)
36 ar = self.client[-1].apply_async(segfault)
37 self.assertRaisesRemote(error.EngineError, ar.get)
38 eid = ar.engine_id
39 while eid in self.client.ids:
40 time.sleep(.01)
41 self.client.spin()
42
43 def test_segfault_mux(self):
44 """test graceful handling of engine death (direct)"""
45 # self.add_engines(1)
46 eid = self.client.ids[-1]
47 ar = self.client[eid].apply_async(segfault)
48 self.assertRaisesRemote(error.EngineError, ar.get)
49 eid = ar.engine_id
50 while eid in self.client.ids:
51 time.sleep(.01)
52 self.client.spin()
53
54 def test_push_pull(self):
55 """test pushing and pulling"""
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 t = self.client.ids[-1]
58 v = self.client[t]
59 push = v.push
60 pull = v.pull
61 v.block=True
62 nengines = len(self.client)
63 push({'data':data})
64 d = pull('data')
65 self.assertEquals(d, data)
66 self.client[:].push({'data':data})
67 d = self.client[:].pull('data', block=True)
68 self.assertEquals(d, nengines*[data])
69 ar = push({'data':data}, block=False)
70 self.assertTrue(isinstance(ar, AsyncResult))
71 r = ar.get()
72 ar = self.client[:].pull('data', block=False)
73 self.assertTrue(isinstance(ar, AsyncResult))
74 r = ar.get()
75 self.assertEquals(r, nengines*[data])
76 self.client[:].push(dict(a=10,b=20))
77 r = self.client[:].pull(('a','b'))
78 self.assertEquals(r, nengines*[[10,20]])
79
80 def test_push_pull_function(self):
81 "test pushing and pulling functions"
82 def testf(x):
83 return 2.0*x
84
85 t = self.client.ids[-1]
86 self.client[t].block=True
87 push = self.client[t].push
88 pull = self.client[t].pull
89 execute = self.client[t].execute
90 push({'testf':testf})
91 r = pull('testf')
92 self.assertEqual(r(1.0), testf(1.0))
93 execute('r = testf(10)')
94 r = pull('r')
95 self.assertEquals(r, testf(10))
96 ar = self.client[:].push({'testf':testf}, block=False)
97 ar.get()
98 ar = self.client[:].pull('testf', block=False)
99 rlist = ar.get()
100 for r in rlist:
101 self.assertEqual(r(1.0), testf(1.0))
102 execute("def g(x): return x*x")
103 r = pull(('testf','g'))
104 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
105
106 def test_push_function_globals(self):
107 """test that pushed functions have access to globals"""
108 @interactive
109 def geta():
110 return a
111 # self.add_engines(1)
112 v = self.client[-1]
113 v.block=True
114 v['f'] = geta
115 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
116 v.execute('a=5')
117 v.execute('b=f()')
118 self.assertEquals(v['b'], 5)
119
120 def test_push_function_defaults(self):
121 """test that pushed functions preserve default args"""
122 def echo(a=10):
123 return a
124 v = self.client[-1]
125 v.block=True
126 v['f'] = echo
127 v.execute('b=f()')
128 self.assertEquals(v['b'], 10)
129
130 def test_get_result(self):
131 """test getting results from the Hub."""
132 c = clientmod.Client(profile='iptest')
133 # self.add_engines(1)
134 t = c.ids[-1]
135 v = c[t]
136 v2 = self.client[t]
137 ar = v.apply_async(wait, 1)
138 # give the monitor time to notice the message
139 time.sleep(.25)
140 ahr = v2.get_result(ar.msg_ids)
141 self.assertTrue(isinstance(ahr, AsyncHubResult))
142 self.assertEquals(ahr.get(), ar.get())
143 ar2 = v2.get_result(ar.msg_ids)
144 self.assertFalse(isinstance(ar2, AsyncHubResult))
145 c.spin()
146 c.close()
147
148 def test_run_newline(self):
149 """test that run appends newline to files"""
150 tmpfile = mktemp()
151 with open(tmpfile, 'w') as f:
152 f.write("""def g():
153 return 5
154 """)
155 v = self.client[-1]
156 v.run(tmpfile, block=True)
157 self.assertEquals(v.apply_sync(lambda f: f(), clientmod.Reference('g')), 5)
158
159 def test_apply_tracked(self):
160 """test tracking for apply"""
161 # self.add_engines(1)
162 t = self.client.ids[-1]
163 v = self.client[t]
164 v.block=False
165 def echo(n=1024*1024, **kwargs):
166 with v.temp_flags(**kwargs):
167 return v.apply(lambda x: x, 'x'*n)
168 ar = echo(1, track=False)
169 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
170 self.assertTrue(ar.sent)
171 ar = echo(track=True)
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertEquals(ar.sent, ar._tracker.done)
174 ar._tracker.wait()
175 self.assertTrue(ar.sent)
176
177 def test_push_tracked(self):
178 t = self.client.ids[-1]
179 ns = dict(x='x'*1024*1024)
180 v = self.client[t]
181 ar = v.push(ns, block=False, track=False)
182 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertTrue(ar.sent)
184
185 ar = v.push(ns, block=False, track=True)
186 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertEquals(ar.sent, ar._tracker.done)
188 ar._tracker.wait()
189 self.assertTrue(ar.sent)
190 ar.get()
191
192 def test_scatter_tracked(self):
193 t = self.client.ids
194 x='x'*1024*1024
195 ar = self.client[t].scatter('x', x, block=False, track=False)
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertTrue(ar.sent)
198
199 ar = self.client[t].scatter('x', x, block=False, track=True)
200 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertEquals(ar.sent, ar._tracker.done)
202 ar._tracker.wait()
203 self.assertTrue(ar.sent)
204 ar.get()
205
206 def test_remote_reference(self):
207 v = self.client[-1]
208 v['a'] = 123
209 ra = clientmod.Reference('a')
210 b = v.apply_sync(lambda x: x, ra)
211 self.assertEquals(b, 123)
212
213
214 def test_scatter_gather(self):
215 view = self.client[:]
216 seq1 = range(16)
217 view.scatter('a', seq1)
218 seq2 = view.gather('a', block=True)
219 self.assertEquals(seq2, seq1)
220 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
221
222 @skip_without('numpy')
223 def test_scatter_gather_numpy(self):
224 import numpy
225 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
226 view = self.client[:]
227 a = numpy.arange(64)
228 view.scatter('a', a)
229 b = view.gather('a', block=True)
230 assert_array_equal(b, a)
231
232 def test_map(self):
233 view = self.client[:]
234 def f(x):
235 return x**2
236 data = range(16)
237 r = view.map_sync(f, data)
238 self.assertEquals(r, map(f, data))
239
240 def test_scatterGatherNonblocking(self):
241 data = range(16)
242 view = self.client[:]
243 view.scatter('a', data, block=False)
244 ar = view.gather('a', block=False)
245 self.assertEquals(ar.get(), data)
246
247 @skip_without('numpy')
248 def test_scatter_gather_numpy_nonblocking(self):
249 import numpy
250 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
251 a = numpy.arange(64)
252 view = self.client[:]
253 ar = view.scatter('a', a, block=False)
254 self.assertTrue(isinstance(ar, AsyncResult))
255 amr = view.gather('a', block=False)
256 self.assertTrue(isinstance(amr, AsyncMapResult))
257 assert_array_equal(amr.get(), a)
258
259 def test_execute(self):
260 view = self.client[:]
261 # self.client.debug=True
262 execute = view.execute
263 ar = execute('c=30', block=False)
264 self.assertTrue(isinstance(ar, AsyncResult))
265 ar = execute('d=[0,1,2]', block=False)
266 self.client.wait(ar, 1)
267 self.assertEquals(len(ar.get()), len(self.client))
268 for c in view['c']:
269 self.assertEquals(c, 30)
270
271 def test_abort(self):
272 view = self.client[-1]
273 ar = view.execute('import time; time.sleep(0.25)', block=False)
274 ar2 = view.apply_async(lambda : 2)
275 ar3 = view.apply_async(lambda : 3)
276 view.abort(ar2)
277 view.abort(ar3.msg_ids)
278 self.assertRaises(error.TaskAborted, ar2.get)
279 self.assertRaises(error.TaskAborted, ar3.get)
280
281 def test_temp_flags(self):
282 view = self.client[-1]
283 view.block=True
284 with view.temp_flags(block=False):
285 self.assertFalse(view.block)
286 self.assertTrue(view.block)
287
@@ -0,0 +1,97 b''
1 """
2 An exceptionally lousy site spider
3 Ken Kinder <ken@kenkinder.com>
4
5 Updated for newparallel by Min Ragan-Kelley <benjaminrk@gmail.com>
6
7 This module gives an example of how the task interface to the
8 IPython controller works. Before running this script start the IPython controller
9 and some engines using something like::
10
11 ipclusterz start -n 4
12 """
13 import sys
14 from IPython.zmq.parallel import client, error
15 import time
16 import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier
17
18 def fetchAndParse(url, data=None):
19 import urllib2
20 import urlparse
21 import BeautifulSoup
22 links = []
23 try:
24 page = urllib2.urlopen(url, data=data)
25 except Exception:
26 return links
27 else:
28 if page.headers.type == 'text/html':
29 doc = BeautifulSoup.BeautifulSoup(page.read())
30 for node in doc.findAll('a'):
31 href = node.get('href', None)
32 if href:
33 links.append(urlparse.urljoin(url, href))
34 return links
35
36 class DistributedSpider(object):
37
38 # Time to wait between polling for task results.
39 pollingDelay = 0.5
40
41 def __init__(self, site):
42 self.client = client.Client()
43 self.view = self.client.load_balanced_view()
44 self.mux = self.client[:]
45
46 self.allLinks = []
47 self.linksWorking = {}
48 self.linksDone = {}
49
50 self.site = site
51
52 def visitLink(self, url):
53 if url not in self.allLinks:
54 self.allLinks.append(url)
55 if url.startswith(self.site):
56 print ' ', url
57 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
58
59 def onVisitDone(self, links, url):
60 print url, ':'
61 self.linksDone[url] = None
62 del self.linksWorking[url]
63 for link in links:
64 self.visitLink(link)
65
66 def run(self):
67 self.visitLink(self.site)
68 while self.linksWorking:
69 print len(self.linksWorking), 'pending...'
70 self.synchronize()
71 time.sleep(self.pollingDelay)
72
73 def synchronize(self):
74 for url, ar in self.linksWorking.items():
75 # Calling get_task_result with block=False will return None if the
76 # task is not done yet. This provides a simple way of polling.
77 try:
78 links = ar.get(0)
79 except error.TimeoutError:
80 continue
81 except Exception as e:
82 self.linksDone[url] = None
83 del self.linksWorking[url]
84 print url, ':', e.traceback
85 else:
86 self.onVisitDone(links, url)
87
88 def main():
89 if len(sys.argv) > 1:
90 site = sys.argv[1]
91 else:
92 site = raw_input('Enter site to crawl: ')
93 distributedSpider = DistributedSpider(site)
94 distributedSpider.run()
95
96 if __name__ == '__main__':
97 main()
@@ -0,0 +1,19 b''
1 """
2 A Distributed Hello world
3 Ken Kinder <ken@kenkinder.com>
4 """
5 from IPython.zmq.parallel import client
6
7 rc = client.Client()
8
9 def sleep_and_echo(t, msg):
10 import time
11 time.sleep(t)
12 return msg
13
14 view = rc.load_balanced_view()
15
16 world = view.apply_async(sleep_and_echo, 3, 'World!')
17 hello = view.apply_async(sleep_and_echo, 2, 'Hello')
18 print "Submitted tasks:", hello.msg_ids, world.msg_ids
19 print hello.get(), world.get()
@@ -15,10 +15,9 b' __docformat__ = "restructuredtext en"'
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from types import FunctionType
19 import copy
18 import copy
20
19 import sys
21 from IPython.zmq.parallel.dependency import dependent
20 from types import FunctionType
22
21
23 import codeutil
22 import codeutil
24
23
@@ -67,12 +66,22 b' class CannedFunction(CannedObject):'
67 self._checkType(f)
66 self._checkType(f)
68 self.code = f.func_code
67 self.code = f.func_code
69 self.defaults = f.func_defaults
68 self.defaults = f.func_defaults
69 self.module = f.__module__ or '__main__'
70 self.__name__ = f.__name__
70 self.__name__ = f.__name__
71
71
72 def _checkType(self, obj):
72 def _checkType(self, obj):
73 assert isinstance(obj, FunctionType), "Not a function type"
73 assert isinstance(obj, FunctionType), "Not a function type"
74
74
75 def getObject(self, g=None):
75 def getObject(self, g=None):
76 # try to load function back into its module:
77 if not self.module.startswith('__'):
78 try:
79 __import__(self.module)
80 except ImportError:
81 pass
82 else:
83 g = sys.modules[self.module].__dict__
84
76 if g is None:
85 if g is None:
77 g = globals()
86 g = globals()
78 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
87 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
@@ -82,8 +91,9 b' class CannedFunction(CannedObject):'
82 # Functions
91 # Functions
83 #-------------------------------------------------------------------------------
92 #-------------------------------------------------------------------------------
84
93
85
86 def can(obj):
94 def can(obj):
95 # import here to prevent module-level circular imports
96 from IPython.zmq.parallel.dependency import dependent
87 if isinstance(obj, dependent):
97 if isinstance(obj, dependent):
88 keys = ('f','df')
98 keys = ('f','df')
89 return CannedObject(obj, keys=keys)
99 return CannedObject(obj, keys=keys)
@@ -12,6 +12,8 b''
12
12
13 import time
13 import time
14
14
15 from zmq import MessageTracker
16
15 from IPython.external.decorator import decorator
17 from IPython.external.decorator import decorator
16 from . import error
18 from . import error
17
19
@@ -19,6 +21,9 b' from . import error'
19 # Classes
21 # Classes
20 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
21
23
24 # global empty tracker that's always done:
25 finished_tracker = MessageTracker()
26
22 @decorator
27 @decorator
23 def check_ready(f, self, *args, **kwargs):
28 def check_ready(f, self, *args, **kwargs):
24 """Call spin() to sync state prior to calling the method."""
29 """Call spin() to sync state prior to calling the method."""
@@ -36,18 +41,26 b' class AsyncResult(object):'
36 msg_ids = None
41 msg_ids = None
37 _targets = None
42 _targets = None
38 _tracker = None
43 _tracker = None
44 _single_result = False
39
45
40 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
46 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
41 self._client = client
42 if isinstance(msg_ids, basestring):
47 if isinstance(msg_ids, basestring):
48 # always a list
43 msg_ids = [msg_ids]
49 msg_ids = [msg_ids]
50 if tracker is None:
51 # default to always done
52 tracker = finished_tracker
53 self._client = client
44 self.msg_ids = msg_ids
54 self.msg_ids = msg_ids
45 self._fname=fname
55 self._fname=fname
46 self._targets = targets
56 self._targets = targets
47 self._tracker = tracker
57 self._tracker = tracker
48 self._ready = False
58 self._ready = False
49 self._success = None
59 self._success = None
50 self._single_result = len(msg_ids) == 1
60 if len(msg_ids) == 1:
61 self._single_result = not isinstance(targets, (list, tuple))
62 else:
63 self._single_result = False
51
64
52 def __repr__(self):
65 def __repr__(self):
53 if self._ready:
66 if self._ready:
@@ -99,7 +112,7 b' class AsyncResult(object):'
99 """
112 """
100 if self._ready:
113 if self._ready:
101 return
114 return
102 self._ready = self._client.barrier(self.msg_ids, timeout)
115 self._ready = self._client.wait(self.msg_ids, timeout)
103 if self._ready:
116 if self._ready:
104 try:
117 try:
105 results = map(self._client.results.get, self.msg_ids)
118 results = map(self._client.results.get, self.msg_ids)
@@ -149,10 +162,9 b' class AsyncResult(object):'
149 return dict(zip(engine_ids,results))
162 return dict(zip(engine_ids,results))
150
163
151 @property
164 @property
152 @check_ready
153 def result(self):
165 def result(self):
154 """result property wrapper for `get(timeout=0)`."""
166 """result property wrapper for `get(timeout=0)`."""
155 return self._result
167 return self.get()
156
168
157 # abbreviated alias:
169 # abbreviated alias:
158 r = result
170 r = result
@@ -169,7 +181,7 b' class AsyncResult(object):'
169 @property
181 @property
170 def result_dict(self):
182 def result_dict(self):
171 """result property as a dict."""
183 """result property as a dict."""
172 return self.get_dict(0)
184 return self.get_dict()
173
185
174 def __dict__(self):
186 def __dict__(self):
175 return self.get_dict(0)
187 return self.get_dict(0)
@@ -181,11 +193,17 b' class AsyncResult(object):'
181
193
182 @property
194 @property
183 def sent(self):
195 def sent(self):
184 """check whether my messages have been sent"""
196 """check whether my messages have been sent."""
185 if self._tracker is None:
197 return self._tracker.done
186 return True
198
187 else:
199 def wait_for_send(self, timeout=-1):
188 return self._tracker.done
200 """wait for pyzmq send to complete.
201
202 This is necessary when sending arrays that you intend to edit in-place.
203 `timeout` is in seconds, and will raise TimeoutError if it is reached
204 before the send completes.
205 """
206 return self._tracker.wait(timeout)
189
207
190 #-------------------------------------
208 #-------------------------------------
191 # dict-access
209 # dict-access
@@ -285,7 +303,7 b' class AsyncHubResult(AsyncResult):'
285 if self._ready:
303 if self._ready:
286 return
304 return
287 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
305 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
288 local_ready = self._client.barrier(local_ids, timeout)
306 local_ready = self._client.wait(local_ids, timeout)
289 if local_ready:
307 if local_ready:
290 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
308 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
291 if not remote_ids:
309 if not remote_ids:
This diff has been collapsed as it changes many lines, (638 lines changed) Show them Hide them
@@ -1,4 +1,4 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
@@ -31,57 +31,26 b' from IPython.external.decorator import decorator'
31 from IPython.external.ssh import tunnel
31 from IPython.external.ssh import tunnel
32
32
33 from . import error
33 from . import error
34 from . import map as Map
35 from . import util
34 from . import util
36 from . import streamsession as ss
35 from . import streamsession as ss
37 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
38 from .clusterdir import ClusterDir, ClusterDirError
37 from .clusterdir import ClusterDir, ClusterDirError
39 from .dependency import Dependency, depend, require, dependent
38 from .dependency import Dependency, depend, require, dependent
40 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
39 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
41 from .util import ReverseDict, validate_url, disambiguate_url
42 from .view import DirectView, LoadBalancedView
40 from .view import DirectView, LoadBalancedView
43
41
44 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
45 # helpers for implementing old MEC API via client.apply
46 #--------------------------------------------------------------------------
47
48 def _push(user_ns, **ns):
49 """helper method for implementing `client.push` via `client.apply`"""
50 user_ns.update(ns)
51
52 def _pull(user_ns, keys):
53 """helper method for implementing `client.pull` via `client.apply`"""
54 if isinstance(keys, (list,tuple, set)):
55 for key in keys:
56 if not user_ns.has_key(key):
57 raise NameError("name '%s' is not defined"%key)
58 return map(user_ns.get, keys)
59 else:
60 if not user_ns.has_key(keys):
61 raise NameError("name '%s' is not defined"%keys)
62 return user_ns.get(keys)
63
64 def _clear(user_ns):
65 """helper method for implementing `client.clear` via `client.apply`"""
66 user_ns.clear()
67
68 def _execute(user_ns, code):
69 """helper method for implementing `client.execute` via `client.apply`"""
70 exec code in user_ns
71
72
73 #--------------------------------------------------------------------------
74 # Decorators for Client methods
43 # Decorators for Client methods
75 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
76
45
77 @decorator
46 @decorator
78 def spinfirst(f, self, *args, **kwargs):
47 def spin_first(f, self, *args, **kwargs):
79 """Call spin() to sync state prior to calling the method."""
48 """Call spin() to sync state prior to calling the method."""
80 self.spin()
49 self.spin()
81 return f(self, *args, **kwargs)
50 return f(self, *args, **kwargs)
82
51
83 @decorator
52 @decorator
84 def defaultblock(f, self, *args, **kwargs):
53 def default_block(f, self, *args, **kwargs):
85 """Default to self.block; preserve self.block."""
54 """Default to self.block; preserve self.block."""
86 block = kwargs.get('block',None)
55 block = kwargs.get('block',None)
87 block = self.block if block is None else block
56 block = self.block if block is None else block
@@ -151,7 +120,7 b' class Metadata(dict):'
151
120
152
121
153 class Client(HasTraits):
122 class Client(HasTraits):
154 """A semi-synchronous client to the IPython ZMQ controller
123 """A semi-synchronous client to the IPython ZMQ cluster
155
124
156 Parameters
125 Parameters
157 ----------
126 ----------
@@ -193,11 +162,11 b' class Client(HasTraits):'
193 flag for whether to use paramiko instead of shell ssh for tunneling.
162 flag for whether to use paramiko instead of shell ssh for tunneling.
194 [default: True on win32, False else]
163 [default: True on win32, False else]
195
164
196 #------- exec authentication args -------
165 ------- exec authentication args -------
197 # If even localhost is untrusted, you can have some protection against
166 If even localhost is untrusted, you can have some protection against
198 # unauthorized execution by using a key. Messages are still sent
167 unauthorized execution by using a key. Messages are still sent
199 # as cleartext, so if someone can snoop your loopback traffic this will
168 as cleartext, so if someone can snoop your loopback traffic this will
200 # not help against malicious attacks.
169 not help against malicious attacks.
201
170
202 exec_key : str
171 exec_key : str
203 an authentication key or file containing a key
172 an authentication key or file containing a key
@@ -207,7 +176,7 b' class Client(HasTraits):'
207 Attributes
176 Attributes
208 ----------
177 ----------
209
178
210 ids : set of int engine IDs
179 ids : list of int engine IDs
211 requesting the ids attribute always synchronizes
180 requesting the ids attribute always synchronizes
212 the registration state. To request ids without synchronization,
181 the registration state. To request ids without synchronization,
213 use semi-private _ids attributes.
182 use semi-private _ids attributes.
@@ -234,15 +203,18 b' class Client(HasTraits):'
234 flushes incoming results and registration state changes
203 flushes incoming results and registration state changes
235 control methods spin, and requesting `ids` also ensures up to date
204 control methods spin, and requesting `ids` also ensures up to date
236
205
237 barrier
206 wait
238 wait on one or more msg_ids
207 wait on one or more msg_ids
239
208
240 execution methods
209 execution methods
241 apply
210 apply
242 legacy: execute, run
211 legacy: execute, run
243
212
213 data movement
214 push, pull, scatter, gather
215
244 query methods
216 query methods
245 queue_status, get_result, purge
217 queue_status, get_result, purge, result_status
246
218
247 control methods
219 control methods
248 abort, shutdown
220 abort, shutdown
@@ -264,23 +236,25 b' class Client(HasTraits):'
264 _ssh=Bool(False)
236 _ssh=Bool(False)
265 _context = Instance('zmq.Context')
237 _context = Instance('zmq.Context')
266 _config = Dict()
238 _config = Dict()
267 _engines=Instance(ReverseDict, (), {})
239 _engines=Instance(util.ReverseDict, (), {})
268 # _hub_socket=Instance('zmq.Socket')
240 # _hub_socket=Instance('zmq.Socket')
269 _query_socket=Instance('zmq.Socket')
241 _query_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
242 _control_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
243 _iopub_socket=Instance('zmq.Socket')
272 _notification_socket=Instance('zmq.Socket')
244 _notification_socket=Instance('zmq.Socket')
273 _apply_socket=Instance('zmq.Socket')
245 _mux_socket=Instance('zmq.Socket')
274 _mux_ident=Str()
246 _task_socket=Instance('zmq.Socket')
275 _task_ident=Str()
276 _task_scheme=Str()
247 _task_scheme=Str()
277 _balanced_views=Dict()
248 _balanced_views=Dict()
278 _direct_views=Dict()
249 _direct_views=Dict()
279 _closed = False
250 _closed = False
251 _ignored_control_replies=Int(0)
252 _ignored_hub_replies=Int(0)
280
253
281 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
254 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
282 context=None, username=None, debug=False, exec_key=None,
255 context=None, username=None, debug=False, exec_key=None,
283 sshserver=None, sshkey=None, password=None, paramiko=None,
256 sshserver=None, sshkey=None, password=None, paramiko=None,
257 timeout=10
284 ):
258 ):
285 super(Client, self).__init__(debug=debug, profile=profile)
259 super(Client, self).__init__(debug=debug, profile=profile)
286 if context is None:
260 if context is None:
@@ -292,11 +266,11 b' class Client(HasTraits):'
292 if self._cd is not None:
266 if self._cd is not None:
293 if url_or_file is None:
267 if url_or_file is None:
294 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
268 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
295 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
269 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
296 " Please specify at least one of url_or_file or profile."
270 " Please specify at least one of url_or_file or profile."
297
271
298 try:
272 try:
299 validate_url(url_or_file)
273 util.validate_url(url_or_file)
300 except AssertionError:
274 except AssertionError:
301 if not os.path.exists(url_or_file):
275 if not os.path.exists(url_or_file):
302 if self._cd:
276 if self._cd:
@@ -316,7 +290,7 b' class Client(HasTraits):'
316 sshserver=cfg['ssh']
290 sshserver=cfg['ssh']
317 url = cfg['url']
291 url = cfg['url']
318 location = cfg.setdefault('location', None)
292 location = cfg.setdefault('location', None)
319 cfg['url'] = disambiguate_url(cfg['url'], location)
293 cfg['url'] = util.disambiguate_url(cfg['url'], location)
320 url = cfg['url']
294 url = cfg['url']
321
295
322 self._config = cfg
296 self._config = cfg
@@ -351,10 +325,11 b' class Client(HasTraits):'
351
325
352 self._notification_handlers = {'registration_notification' : self._register_engine,
326 self._notification_handlers = {'registration_notification' : self._register_engine,
353 'unregistration_notification' : self._unregister_engine,
327 'unregistration_notification' : self._unregister_engine,
328 'shutdown_notification' : lambda msg: self.close(),
354 }
329 }
355 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
330 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
356 'apply_reply' : self._handle_apply_reply}
331 'apply_reply' : self._handle_apply_reply}
357 self._connect(sshserver, ssh_kwargs)
332 self._connect(sshserver, ssh_kwargs, timeout)
358
333
359 def __del__(self):
334 def __del__(self):
360 """cleanup sockets, but _not_ context."""
335 """cleanup sockets, but _not_ context."""
@@ -378,22 +353,6 b' class Client(HasTraits):'
378 pass
353 pass
379 self._cd = None
354 self._cd = None
380
355
381 @property
382 def ids(self):
383 """Always up-to-date ids property."""
384 self._flush_notifications()
385 # always copy:
386 return list(self._ids)
387
388 def close(self):
389 if self._closed:
390 return
391 snames = filter(lambda n: n.endswith('socket'), dir(self))
392 for socket in map(lambda name: getattr(self, name), snames):
393 if isinstance(socket, zmq.Socket) and not socket.closed:
394 socket.close()
395 self._closed = True
396
397 def _update_engines(self, engines):
356 def _update_engines(self, engines):
398 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
357 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
399 for k,v in engines.iteritems():
358 for k,v in engines.iteritems():
@@ -402,16 +361,15 b' class Client(HasTraits):'
402 self._ids.append(eid)
361 self._ids.append(eid)
403 self._ids = sorted(self._ids)
362 self._ids = sorted(self._ids)
404 if sorted(self._engines.keys()) != range(len(self._engines)) and \
363 if sorted(self._engines.keys()) != range(len(self._engines)) and \
405 self._task_scheme == 'pure' and self._task_ident:
364 self._task_scheme == 'pure' and self._task_socket:
406 self._stop_scheduling_tasks()
365 self._stop_scheduling_tasks()
407
366
408 def _stop_scheduling_tasks(self):
367 def _stop_scheduling_tasks(self):
409 """Stop scheduling tasks because an engine has been unregistered
368 """Stop scheduling tasks because an engine has been unregistered
410 from a pure ZMQ scheduler.
369 from a pure ZMQ scheduler.
411 """
370 """
412 self._task_ident = ''
371 self._task_socket.close()
413 # self._task_socket.close()
372 self._task_socket = None
414 # self._task_socket = None
415 msg = "An engine has been unregistered, and we are using pure " +\
373 msg = "An engine has been unregistered, and we are using pure " +\
416 "ZMQ task scheduling. Task farming will be disabled."
374 "ZMQ task scheduling. Task farming will be disabled."
417 if self.outstanding:
375 if self.outstanding:
@@ -434,8 +392,8 b' class Client(HasTraits):'
434 targets = [targets]
392 targets = [targets]
435 return [self._engines[t] for t in targets], list(targets)
393 return [self._engines[t] for t in targets], list(targets)
436
394
437 def _connect(self, sshserver, ssh_kwargs):
395 def _connect(self, sshserver, ssh_kwargs, timeout):
438 """setup all our socket connections to the controller. This is called from
396 """setup all our socket connections to the cluster. This is called from
439 __init__."""
397 __init__."""
440
398
441 # Maybe allow reconnecting?
399 # Maybe allow reconnecting?
@@ -444,13 +402,16 b' class Client(HasTraits):'
444 self._connected=True
402 self._connected=True
445
403
446 def connect_socket(s, url):
404 def connect_socket(s, url):
447 url = disambiguate_url(url, self._config['location'])
405 url = util.disambiguate_url(url, self._config['location'])
448 if self._ssh:
406 if self._ssh:
449 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
407 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
450 else:
408 else:
451 return s.connect(url)
409 return s.connect(url)
452
410
453 self.session.send(self._query_socket, 'connection_request')
411 self.session.send(self._query_socket, 'connection_request')
412 r,w,x = zmq.select([self._query_socket],[],[], timeout)
413 if not r:
414 raise error.TimeoutError("Hub connection request timed out")
454 idents,msg = self.session.recv(self._query_socket,mode=0)
415 idents,msg = self.session.recv(self._query_socket,mode=0)
455 if self.debug:
416 if self.debug:
456 pprint(msg)
417 pprint(msg)
@@ -458,18 +419,15 b' class Client(HasTraits):'
458 content = msg.content
419 content = msg.content
459 self._config['registration'] = dict(content)
420 self._config['registration'] = dict(content)
460 if content.status == 'ok':
421 if content.status == 'ok':
461 self._apply_socket = self._context.socket(zmq.XREP)
462 self._apply_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 if content.mux:
422 if content.mux:
464 # self._mux_socket = self._context.socket(zmq.XREQ)
423 self._mux_socket = self._context.socket(zmq.XREQ)
465 self._mux_ident = 'mux'
424 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
466 connect_socket(self._apply_socket, content.mux)
425 connect_socket(self._mux_socket, content.mux)
467 if content.task:
426 if content.task:
468 self._task_scheme, task_addr = content.task
427 self._task_scheme, task_addr = content.task
469 # self._task_socket = self._context.socket(zmq.XREQ)
428 self._task_socket = self._context.socket(zmq.XREQ)
470 # self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
429 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
471 connect_socket(self._apply_socket, task_addr)
430 connect_socket(self._task_socket, task_addr)
472 self._task_ident = 'task'
473 if content.notification:
431 if content.notification:
474 self._notification_socket = self._context.socket(zmq.SUB)
432 self._notification_socket = self._context.socket(zmq.SUB)
475 connect_socket(self._notification_socket, content.notification)
433 connect_socket(self._notification_socket, content.notification)
@@ -488,8 +446,6 b' class Client(HasTraits):'
488 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
446 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
489 connect_socket(self._iopub_socket, content.iopub)
447 connect_socket(self._iopub_socket, content.iopub)
490 self._update_engines(dict(content.engines))
448 self._update_engines(dict(content.engines))
491 # give XREP apply_socket some time to connect
492 time.sleep(0.25)
493 else:
449 else:
494 self._connected = False
450 self._connected = False
495 raise Exception("Failed to connect!")
451 raise Exception("Failed to connect!")
@@ -499,7 +455,7 b' class Client(HasTraits):'
499 #--------------------------------------------------------------------------
455 #--------------------------------------------------------------------------
500
456
501 def _unwrap_exception(self, content):
457 def _unwrap_exception(self, content):
502 """unwrap exception, and remap engineid to int."""
458 """unwrap exception, and remap engine_id to int."""
503 e = error.unwrap_exception(content)
459 e = error.unwrap_exception(content)
504 # print e.traceback
460 # print e.traceback
505 if e.engine_info:
461 if e.engine_info:
@@ -545,7 +501,7 b' class Client(HasTraits):'
545
501
546 self._handle_stranded_msgs(eid, uuid)
502 self._handle_stranded_msgs(eid, uuid)
547
503
548 if self._task_ident and self._task_scheme == 'pure':
504 if self._task_socket and self._task_scheme == 'pure':
549 self._stop_scheduling_tasks()
505 self._stop_scheduling_tasks()
550
506
551 def _handle_stranded_msgs(self, eid, uuid):
507 def _handle_stranded_msgs(self, eid, uuid):
@@ -622,7 +578,7 b' class Client(HasTraits):'
622 if content['status'] == 'ok':
578 if content['status'] == 'ok':
623 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
579 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
624 elif content['status'] == 'aborted':
580 elif content['status'] == 'aborted':
625 self.results[msg_id] = error.AbortedTask(msg_id)
581 self.results[msg_id] = error.TaskAborted(msg_id)
626 elif content['status'] == 'resubmitted':
582 elif content['status'] == 'resubmitted':
627 # TODO: handle resubmission
583 # TODO: handle resubmission
628 pass
584 pass
@@ -665,12 +621,26 b' class Client(HasTraits):'
665 in the ZMQ queue.
621 in the ZMQ queue.
666
622
667 Currently: ignore them."""
623 Currently: ignore them."""
624 if self._ignored_control_replies <= 0:
625 return
668 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
626 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
669 while msg is not None:
627 while msg is not None:
628 self._ignored_control_replies -= 1
670 if self.debug:
629 if self.debug:
671 pprint(msg)
630 pprint(msg)
672 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
673
632
633 def _flush_ignored_control(self):
634 """flush ignored control replies"""
635 while self._ignored_control_replies > 0:
636 self.session.recv(self._control_socket)
637 self._ignored_control_replies -= 1
638
639 def _flush_ignored_hub_replies(self):
640 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 while msg is not None:
642 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643
674 def _flush_iopub(self, sock):
644 def _flush_iopub(self, sock):
675 """Flush replies from the iopub channel waiting
645 """Flush replies from the iopub channel waiting
676 in the ZMQ queue.
646 in the ZMQ queue.
@@ -718,26 +688,46 b' class Client(HasTraits):'
718 if not isinstance(key, (int, slice, tuple, list, xrange)):
688 if not isinstance(key, (int, slice, tuple, list, xrange)):
719 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
689 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
720 else:
690 else:
721 return self.view(key, balanced=False)
691 return self._get_view(key, balanced=False)
722
692
723 #--------------------------------------------------------------------------
693 #--------------------------------------------------------------------------
724 # Begin public methods
694 # Begin public methods
725 #--------------------------------------------------------------------------
695 #--------------------------------------------------------------------------
726
696
697 @property
698 def ids(self):
699 """Always up-to-date ids property."""
700 self._flush_notifications()
701 # always copy:
702 return list(self._ids)
703
704 def close(self):
705 if self._closed:
706 return
707 snames = filter(lambda n: n.endswith('socket'), dir(self))
708 for socket in map(lambda name: getattr(self, name), snames):
709 if isinstance(socket, zmq.Socket) and not socket.closed:
710 socket.close()
711 self._closed = True
712
727 def spin(self):
713 def spin(self):
728 """Flush any registration notifications and execution results
714 """Flush any registration notifications and execution results
729 waiting in the ZMQ queue.
715 waiting in the ZMQ queue.
730 """
716 """
731 if self._notification_socket:
717 if self._notification_socket:
732 self._flush_notifications()
718 self._flush_notifications()
733 if self._apply_socket:
719 if self._mux_socket:
734 self._flush_results(self._apply_socket)
720 self._flush_results(self._mux_socket)
721 if self._task_socket:
722 self._flush_results(self._task_socket)
735 if self._control_socket:
723 if self._control_socket:
736 self._flush_control(self._control_socket)
724 self._flush_control(self._control_socket)
737 if self._iopub_socket:
725 if self._iopub_socket:
738 self._flush_iopub(self._iopub_socket)
726 self._flush_iopub(self._iopub_socket)
727 if self._query_socket:
728 self._flush_ignored_hub_replies()
739
729
740 def barrier(self, jobs=None, timeout=-1):
730 def wait(self, jobs=None, timeout=-1):
741 """waits on one or more `jobs`, for up to `timeout` seconds.
731 """waits on one or more `jobs`, for up to `timeout` seconds.
742
732
743 Parameters
733 Parameters
@@ -786,8 +776,8 b' class Client(HasTraits):'
786 # Control methods
776 # Control methods
787 #--------------------------------------------------------------------------
777 #--------------------------------------------------------------------------
788
778
789 @spinfirst
779 @spin_first
790 @defaultblock
780 @default_block
791 def clear(self, targets=None, block=None):
781 def clear(self, targets=None, block=None):
792 """Clear the namespace in target(s)."""
782 """Clear the namespace in target(s)."""
793 targets = self._build_targets(targets)[0]
783 targets = self._build_targets(targets)[0]
@@ -795,18 +785,21 b' class Client(HasTraits):'
795 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
785 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
796 error = False
786 error = False
797 if self.block:
787 if self.block:
788 self._flush_ignored_control()
798 for i in range(len(targets)):
789 for i in range(len(targets)):
799 idents,msg = self.session.recv(self._control_socket,0)
790 idents,msg = self.session.recv(self._control_socket,0)
800 if self.debug:
791 if self.debug:
801 pprint(msg)
792 pprint(msg)
802 if msg['content']['status'] != 'ok':
793 if msg['content']['status'] != 'ok':
803 error = self._unwrap_exception(msg['content'])
794 error = self._unwrap_exception(msg['content'])
795 else:
796 self._ignored_control_replies += len(targets)
804 if error:
797 if error:
805 raise error
798 raise error
806
799
807
800
808 @spinfirst
801 @spin_first
809 @defaultblock
802 @default_block
810 def abort(self, jobs=None, targets=None, block=None):
803 def abort(self, jobs=None, targets=None, block=None):
811 """Abort specific jobs from the execution queues of target(s).
804 """Abort specific jobs from the execution queues of target(s).
812
805
@@ -839,35 +832,41 b' class Client(HasTraits):'
839 content=content, ident=t)
832 content=content, ident=t)
840 error = False
833 error = False
841 if self.block:
834 if self.block:
835 self._flush_ignored_control()
842 for i in range(len(targets)):
836 for i in range(len(targets)):
843 idents,msg = self.session.recv(self._control_socket,0)
837 idents,msg = self.session.recv(self._control_socket,0)
844 if self.debug:
838 if self.debug:
845 pprint(msg)
839 pprint(msg)
846 if msg['content']['status'] != 'ok':
840 if msg['content']['status'] != 'ok':
847 error = self._unwrap_exception(msg['content'])
841 error = self._unwrap_exception(msg['content'])
842 else:
843 self._ignored_control_replies += len(targets)
848 if error:
844 if error:
849 raise error
845 raise error
850
846
851 @spinfirst
847 @spin_first
852 @defaultblock
848 @default_block
853 def shutdown(self, targets=None, restart=False, controller=False, block=None):
849 def shutdown(self, targets=None, restart=False, hub=False, block=None):
854 """Terminates one or more engine processes, optionally including the controller."""
850 """Terminates one or more engine processes, optionally including the hub."""
855 if controller:
851 if hub:
856 targets = 'all'
852 targets = 'all'
857 targets = self._build_targets(targets)[0]
853 targets = self._build_targets(targets)[0]
858 for t in targets:
854 for t in targets:
859 self.session.send(self._control_socket, 'shutdown_request',
855 self.session.send(self._control_socket, 'shutdown_request',
860 content={'restart':restart},ident=t)
856 content={'restart':restart},ident=t)
861 error = False
857 error = False
862 if block or controller:
858 if block or hub:
859 self._flush_ignored_control()
863 for i in range(len(targets)):
860 for i in range(len(targets)):
864 idents,msg = self.session.recv(self._control_socket,0)
861 idents,msg = self.session.recv(self._control_socket, 0)
865 if self.debug:
862 if self.debug:
866 pprint(msg)
863 pprint(msg)
867 if msg['content']['status'] != 'ok':
864 if msg['content']['status'] != 'ok':
868 error = self._unwrap_exception(msg['content'])
865 error = self._unwrap_exception(msg['content'])
866 else:
867 self._ignored_control_replies += len(targets)
869
868
870 if controller:
869 if hub:
871 time.sleep(0.25)
870 time.sleep(0.25)
872 self.session.send(self._query_socket, 'shutdown_request')
871 self.session.send(self._query_socket, 'shutdown_request')
873 idents,msg = self.session.recv(self._query_socket, 0)
872 idents,msg = self.session.recv(self._query_socket, 0)
@@ -883,8 +882,8 b' class Client(HasTraits):'
883 # Execution methods
882 # Execution methods
884 #--------------------------------------------------------------------------
883 #--------------------------------------------------------------------------
885
884
886 @defaultblock
885 @default_block
887 def execute(self, code, targets='all', block=None):
886 def _execute(self, code, targets='all', block=None):
888 """Executes `code` on `targets` in blocking or nonblocking manner.
887 """Executes `code` on `targets` in blocking or nonblocking manner.
889
888
890 ``execute`` is always `bound` (affects engine namespace)
889 ``execute`` is always `bound` (affects engine namespace)
@@ -901,33 +900,7 b' class Client(HasTraits):'
901 whether or not to wait until done to return
900 whether or not to wait until done to return
902 default: self.block
901 default: self.block
903 """
902 """
904 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
903 return self[targets].execute(code, block=block)
905 if not block:
906 return result
907
908 def run(self, filename, targets='all', block=None):
909 """Execute contents of `filename` on engine(s).
910
911 This simply reads the contents of the file and calls `execute`.
912
913 Parameters
914 ----------
915
916 filename : str
917 The path to the file
918 targets : int/str/list of ints/strs
919 the engines on which to execute
920 default : all
921 block : bool
922 whether or not to wait until done
923 default: self.block
924
925 """
926 with open(filename, 'r') as f:
927 # add newline in case of trailing indented whitespace
928 # which will cause SyntaxError
929 code = f.read()+'\n'
930 return self.execute(code, targets=targets, block=block)
931
904
932 def _maybe_raise(self, result):
905 def _maybe_raise(self, result):
933 """wrapper for maybe raising an exception if apply failed."""
906 """wrapper for maybe raising an exception if apply failed."""
@@ -936,287 +909,113 b' class Client(HasTraits):'
936
909
937 return result
910 return result
938
911
939 def _build_dependency(self, dep):
912 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
940 """helper for building jsonable dependencies from various input forms"""
913 ident=None):
941 if isinstance(dep, Dependency):
914 """construct and send an apply message via a socket.
942 return dep.as_dict()
943 elif isinstance(dep, AsyncResult):
944 return dep.msg_ids
945 elif dep is None:
946 return []
947 else:
948 # pass to Dependency constructor
949 return list(Dependency(dep))
950
951 @defaultblock
952 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
953 targets=None, balanced=None,
954 after=None, follow=None, timeout=None,
955 track=False):
956 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
957
915
958 This is the central execution command for the client.
916 This is the principal method with which all engine execution is performed by views.
959
960 Parameters
961 ----------
962
963 f : function
964 The fuction to be called remotely
965 args : tuple/list
966 The positional arguments passed to `f`
967 kwargs : dict
968 The keyword arguments passed to `f`
969 bound : bool (default: False)
970 Whether to pass the Engine(s) Namespace as the first argument to `f`.
971 block : bool (default: self.block)
972 Whether to wait for the result, or return immediately.
973 False:
974 returns AsyncResult
975 True:
976 returns actual result(s) of f(*args, **kwargs)
977 if multiple targets:
978 list of results, matching `targets`
979 track : bool
980 whether to track non-copying sends.
981 [default False]
982
983 targets : int,list of ints, 'all', None
984 Specify the destination of the job.
985 if None:
986 Submit via Task queue for load-balancing.
987 if 'all':
988 Run on all active engines
989 if list:
990 Run on each specified engine
991 if int:
992 Run on single engine
993 Note:
994 that if `balanced=True`, and `targets` is specified,
995 then the load-balancing will be limited to balancing
996 among `targets`.
997
998 balanced : bool, default None
999 whether to load-balance. This will default to True
1000 if targets is unspecified, or False if targets is specified.
1001
1002 If `balanced` and `targets` are both specified, the task will
1003 be assigne to *one* of the targets by the scheduler.
1004
1005 The following arguments are only used when balanced is True:
1006
1007 after : Dependency or collection of msg_ids
1008 Only for load-balanced execution (targets=None)
1009 Specify a list of msg_ids as a time-based dependency.
1010 This job will only be run *after* the dependencies
1011 have been met.
1012
1013 follow : Dependency or collection of msg_ids
1014 Only for load-balanced execution (targets=None)
1015 Specify a list of msg_ids as a location-based dependency.
1016 This job will only be run on an engine where this dependency
1017 is met.
1018
1019 timeout : float/int or None
1020 Only for load-balanced execution (targets=None)
1021 Specify an amount of time (in seconds) for the scheduler to
1022 wait for dependencies to be met before failing with a
1023 DependencyTimeout.
1024
1025 Returns
1026 -------
1027
1028 if block is False:
1029 return AsyncResult wrapping msg_ids
1030 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1031 else:
1032 if single target (or balanced):
1033 return result of `f(*args, **kwargs)`
1034 else:
1035 return list of results, matching `targets`
1036 """
917 """
918
1037 assert not self._closed, "cannot use me anymore, I'm closed!"
919 assert not self._closed, "cannot use me anymore, I'm closed!"
1038 # defaults:
920 # defaults:
1039 block = block if block is not None else self.block
1040 args = args if args is not None else []
921 args = args if args is not None else []
1041 kwargs = kwargs if kwargs is not None else {}
922 kwargs = kwargs if kwargs is not None else {}
923 subheader = subheader if subheader is not None else {}
1042
924
1043 if not self._ids:
925 # validate arguments
1044 # flush notification socket if no engines yet
1045 any_ids = self.ids
1046 if not any_ids:
1047 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
1048
1049 if balanced is None:
1050 if targets is None:
1051 # default to balanced if targets unspecified
1052 balanced = True
1053 else:
1054 # otherwise default to multiplexing
1055 balanced = False
1056
1057 if targets is None and balanced is False:
1058 # default to all if *not* balanced, and targets is unspecified
1059 targets = 'all'
1060
1061 # enforce types of f,args,kwrags
1062 if not callable(f):
926 if not callable(f):
1063 raise TypeError("f must be callable, not %s"%type(f))
927 raise TypeError("f must be callable, not %s"%type(f))
1064 if not isinstance(args, (tuple, list)):
928 if not isinstance(args, (tuple, list)):
1065 raise TypeError("args must be tuple or list, not %s"%type(args))
929 raise TypeError("args must be tuple or list, not %s"%type(args))
1066 if not isinstance(kwargs, dict):
930 if not isinstance(kwargs, dict):
1067 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
931 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
932 if not isinstance(subheader, dict):
933 raise TypeError("subheader must be dict, not %s"%type(subheader))
1068
934
1069 options = dict(bound=bound, block=block, targets=targets, track=track)
935 if not self._ids:
1070
936 # flush notification socket if no engines yet
1071 if balanced:
937 any_ids = self.ids
1072 return self._apply_balanced(f, args, kwargs, timeout=timeout,
938 if not any_ids:
1073 after=after, follow=follow, **options)
939 raise error.NoEnginesRegistered("Can't execute without any connected engines.")
1074 elif follow or after or timeout:
940 # enforce types of f,args,kwargs
1075 msg = "follow, after, and timeout args are only used for"
1076 msg += " load-balanced execution."
1077 raise ValueError(msg)
1078 else:
1079 return self._apply_direct(f, args, kwargs, **options)
1080
1081 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1082 after=None, follow=None, timeout=None, track=None):
1083 """call f(*args, **kwargs) remotely in a load-balanced manner.
1084
1085 This is a private method, see `apply` for details.
1086 Not to be called directly!
1087 """
1088
1089 loc = locals()
1090 for name in ('bound', 'block', 'track'):
1091 assert loc[name] is not None, "kwarg %r must be specified!"%name
1092
1093 if not self._task_ident:
1094 msg = "Task farming is disabled"
1095 if self._task_scheme == 'pure':
1096 msg += " because the pure ZMQ scheduler cannot handle"
1097 msg += " disappearing engines."
1098 raise RuntimeError(msg)
1099
1100 if self._task_scheme == 'pure':
1101 # pure zmq scheme doesn't support dependencies
1102 msg = "Pure ZMQ scheduler doesn't support dependencies"
1103 if (follow or after):
1104 # hard fail on DAG dependencies
1105 raise RuntimeError(msg)
1106 if isinstance(f, dependent):
1107 # soft warn on functional dependencies
1108 warnings.warn(msg, RuntimeWarning)
1109
1110 # defaults:
1111 args = args if args is not None else []
1112 kwargs = kwargs if kwargs is not None else {}
1113
1114 if targets:
1115 idents,_ = self._build_targets(targets)
1116 else:
1117 idents = []
1118
941
1119 after = self._build_dependency(after)
1120 follow = self._build_dependency(follow)
1121 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1122 bufs = util.pack_apply_message(f,args,kwargs)
942 bufs = util.pack_apply_message(f,args,kwargs)
1123 content = dict(bound=bound)
1124
943
1125 msg = self.session.send(self._apply_socket, "apply_request", ident=self._task_ident,
944 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1126 content=content, buffers=bufs, subheader=subheader, track=track)
945 subheader=subheader, track=track)
946
1127 msg_id = msg['msg_id']
947 msg_id = msg['msg_id']
1128 self.outstanding.add(msg_id)
948 self.outstanding.add(msg_id)
949 if ident:
950 # possibly routed to a specific engine
951 if isinstance(ident, list):
952 ident = ident[-1]
953 if ident in self._engines.values():
954 # save for later, in case of engine death
955 self._outstanding_dict[ident].add(msg_id)
1129 self.history.append(msg_id)
956 self.history.append(msg_id)
1130 self.metadata[msg_id]['submitted'] = datetime.now()
957 self.metadata[msg_id]['submitted'] = datetime.now()
1131 tracker = None if track is False else msg['tracker']
1132 ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker)
1133 if block:
1134 try:
1135 return ar.get()
1136 except KeyboardInterrupt:
1137 return ar
1138 else:
1139 return ar
1140
1141 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
1142 track=None):
1143 """Then underlying method for applying functions to specific engines
1144 via the MUX queue.
1145
1146 This is a private method, see `apply` for details.
1147 Not to be called directly!
1148 """
1149
1150 if not self._mux_ident:
1151 msg = "Multiplexing is disabled"
1152 raise RuntimeError(msg)
1153
958
1154 loc = locals()
959 return msg
1155 for name in ('bound', 'block', 'targets', 'track'):
960
1156 assert loc[name] is not None, "kwarg %r must be specified!"%name
1157
1158 idents,targets = self._build_targets(targets)
1159
1160 subheader = {}
1161 content = dict(bound=bound)
1162 bufs = util.pack_apply_message(f,args,kwargs)
1163
1164 msg_ids = []
1165 trackers = []
1166 for ident in idents:
1167 msg = self.session.send(self._apply_socket, "apply_request",
1168 content=content, buffers=bufs, ident=[self._mux_ident, ident], subheader=subheader,
1169 track=track)
1170 if track:
1171 trackers.append(msg['tracker'])
1172 msg_id = msg['msg_id']
1173 self.outstanding.add(msg_id)
1174 self._outstanding_dict[ident].add(msg_id)
1175 self.history.append(msg_id)
1176 msg_ids.append(msg_id)
1177
1178 tracker = None if track is False else zmq.MessageTracker(*trackers)
1179 ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
1180
1181 if block:
1182 try:
1183 return ar.get()
1184 except KeyboardInterrupt:
1185 return ar
1186 else:
1187 return ar
1188
1189 #--------------------------------------------------------------------------
961 #--------------------------------------------------------------------------
1190 # construct a View object
962 # construct a View object
1191 #--------------------------------------------------------------------------
963 #--------------------------------------------------------------------------
1192
964
1193 @defaultblock
1194 def remote(self, bound=False, block=None, targets=None, balanced=None):
1195 """Decorator for making a RemoteFunction"""
1196 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1197
1198 @defaultblock
1199 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1200 """Decorator for making a ParallelFunction"""
1201 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1202
1203 def _cache_view(self, targets, balanced):
965 def _cache_view(self, targets, balanced):
1204 """save views, so subsequent requests don't create new objects."""
966 """save views, so subsequent requests don't create new objects."""
1205 if balanced:
967 if balanced:
968 # validate whether we can run
969 if not self._task_socket:
970 msg = "Task farming is disabled"
971 if self._task_scheme == 'pure':
972 msg += " because the pure ZMQ scheduler cannot handle"
973 msg += " disappearing engines."
974 raise RuntimeError(msg)
975 socket = self._task_socket
1206 view_class = LoadBalancedView
976 view_class = LoadBalancedView
1207 view_cache = self._balanced_views
977 view_cache = self._balanced_views
1208 else:
978 else:
979 socket = self._mux_socket
1209 view_class = DirectView
980 view_class = DirectView
1210 view_cache = self._direct_views
981 view_cache = self._direct_views
1211
982
1212 # use str, since often targets will be a list
983 # use str, since often targets will be a list
1213 key = str(targets)
984 key = str(targets)
1214 if key not in view_cache:
985 if key not in view_cache:
1215 view_cache[key] = view_class(client=self, targets=targets)
986 view_cache[key] = view_class(client=self, socket=socket, targets=targets)
1216
987
1217 return view_cache[key]
988 return view_cache[key]
1218
989
1219 def view(self, targets=None, balanced=None):
990 def load_balanced_view(self, targets=None):
991 """construct a DirectView object.
992
993 If no arguments are specified, create a LoadBalancedView
994 using all engines.
995
996 Parameters
997 ----------
998
999 targets: list,slice,int,etc. [default: use all engines]
1000 The subset of engines across which to load-balance
1001 """
1002 return self._get_view(targets, balanced=True)
1003
1004 def direct_view(self, targets='all'):
1005 """construct a DirectView object.
1006
1007 If no targets are specified, create a DirectView
1008 using all engines.
1009
1010 Parameters
1011 ----------
1012
1013 targets: list,slice,int,etc. [default: use all engines]
1014 The engines to use for the View
1015 """
1016 return self._get_view(targets, balanced=False)
1017
1018 def _get_view(self, targets, balanced):
1220 """Method for constructing View objects.
1019 """Method for constructing View objects.
1221
1020
1222 If no arguments are specified, create a LoadBalancedView
1021 If no arguments are specified, create a LoadBalancedView
@@ -1234,9 +1033,7 b' class Client(HasTraits):'
1234
1033
1235 """
1034 """
1236
1035
1237 balanced = (targets is None) if balanced is None else balanced
1036 if targets in (None,'all'):
1238
1239 if targets is None:
1240 if balanced:
1037 if balanced:
1241 return self._cache_view(None,True)
1038 return self._cache_view(None,True)
1242 else:
1039 else:
@@ -1261,20 +1058,20 b' class Client(HasTraits):'
1261 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1058 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1262
1059
1263 #--------------------------------------------------------------------------
1060 #--------------------------------------------------------------------------
1264 # Data movement
1061 # Data movement (TO BE REMOVED)
1265 #--------------------------------------------------------------------------
1062 #--------------------------------------------------------------------------
1266
1063
1267 @defaultblock
1064 @default_block
1268 def push(self, ns, targets='all', block=None, track=False):
1065 def _push(self, ns, targets='all', block=None, track=False):
1269 """Push the contents of `ns` into the namespace on `target`"""
1066 """Push the contents of `ns` into the namespace on `target`"""
1270 if not isinstance(ns, dict):
1067 if not isinstance(ns, dict):
1271 raise TypeError("Must be a dict, not %s"%type(ns))
1068 raise TypeError("Must be a dict, not %s"%type(ns))
1272 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1069 result = self.apply(util._push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1273 if not block:
1070 if not block:
1274 return result
1071 return result
1275
1072
1276 @defaultblock
1073 @default_block
1277 def pull(self, keys, targets='all', block=None):
1074 def _pull(self, keys, targets='all', block=None):
1278 """Pull objects from `target`'s namespace by `keys`"""
1075 """Pull objects from `target`'s namespace by `keys`"""
1279 if isinstance(keys, basestring):
1076 if isinstance(keys, basestring):
1280 pass
1077 pass
@@ -1284,64 +1081,15 b' class Client(HasTraits):'
1284 raise TypeError("keys must be str, not type %r"%type(key))
1081 raise TypeError("keys must be str, not type %r"%type(key))
1285 else:
1082 else:
1286 raise TypeError("keys must be strs, not %r"%keys)
1083 raise TypeError("keys must be strs, not %r"%keys)
1287 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1084 result = self.apply(util._pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1288 return result
1085 return result
1289
1086
1290 @defaultblock
1291 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False):
1292 """
1293 Partition a Python sequence and send the partitions to a set of engines.
1294 """
1295 targets = self._build_targets(targets)[-1]
1296 mapObject = Map.dists[dist]()
1297 nparts = len(targets)
1298 msg_ids = []
1299 trackers = []
1300 for index, engineid in enumerate(targets):
1301 partition = mapObject.getPartition(seq, index, nparts)
1302 if flatten and len(partition) == 1:
1303 r = self.push({key: partition[0]}, targets=engineid, block=False, track=track)
1304 else:
1305 r = self.push({key: partition}, targets=engineid, block=False, track=track)
1306 msg_ids.extend(r.msg_ids)
1307 if track:
1308 trackers.append(r._tracker)
1309
1310 if track:
1311 tracker = zmq.MessageTracker(*trackers)
1312 else:
1313 tracker = None
1314
1315 r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker)
1316 if block:
1317 r.wait()
1318 else:
1319 return r
1320
1321 @defaultblock
1322 def gather(self, key, dist='b', targets='all', block=None):
1323 """
1324 Gather a partitioned sequence on a set of engines as a single local seq.
1325 """
1326
1327 targets = self._build_targets(targets)[-1]
1328 mapObject = Map.dists[dist]()
1329 msg_ids = []
1330 for index, engineid in enumerate(targets):
1331 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1332
1333 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1334 if block:
1335 return r.get()
1336 else:
1337 return r
1338
1339 #--------------------------------------------------------------------------
1087 #--------------------------------------------------------------------------
1340 # Query methods
1088 # Query methods
1341 #--------------------------------------------------------------------------
1089 #--------------------------------------------------------------------------
1342
1090
1343 @spinfirst
1091 @spin_first
1344 @defaultblock
1092 @default_block
1345 def get_result(self, indices_or_msg_ids=None, block=None):
1093 def get_result(self, indices_or_msg_ids=None, block=None):
1346 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1094 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1347
1095
@@ -1406,7 +1154,7 b' class Client(HasTraits):'
1406
1154
1407 return ar
1155 return ar
1408
1156
1409 @spinfirst
1157 @spin_first
1410 def result_status(self, msg_ids, status_only=True):
1158 def result_status(self, msg_ids, status_only=True):
1411 """Check on the status of the result(s) of the apply request with `msg_ids`.
1159 """Check on the status of the result(s) of the apply request with `msg_ids`.
1412
1160
@@ -1505,7 +1253,7 b' class Client(HasTraits):'
1505 error.collect_exceptions(failures, "result_status")
1253 error.collect_exceptions(failures, "result_status")
1506 return content
1254 return content
1507
1255
1508 @spinfirst
1256 @spin_first
1509 def queue_status(self, targets='all', verbose=False):
1257 def queue_status(self, targets='all', verbose=False):
1510 """Fetch the status of engine queues.
1258 """Fetch the status of engine queues.
1511
1259
@@ -1518,8 +1266,8 b' class Client(HasTraits):'
1518 verbose : bool
1266 verbose : bool
1519 Whether to return lengths only, or lists of ids for each element
1267 Whether to return lengths only, or lists of ids for each element
1520 """
1268 """
1521 targets = self._build_targets(targets)[1]
1269 engine_ids = self._build_targets(targets)[1]
1522 content = dict(targets=targets, verbose=verbose)
1270 content = dict(targets=engine_ids, verbose=verbose)
1523 self.session.send(self._query_socket, "queue_request", content=content)
1271 self.session.send(self._query_socket, "queue_request", content=content)
1524 idents,msg = self.session.recv(self._query_socket, 0)
1272 idents,msg = self.session.recv(self._query_socket, 0)
1525 if self.debug:
1273 if self.debug:
@@ -1528,11 +1276,15 b' class Client(HasTraits):'
1528 status = content.pop('status')
1276 status = content.pop('status')
1529 if status != 'ok':
1277 if status != 'ok':
1530 raise self._unwrap_exception(content)
1278 raise self._unwrap_exception(content)
1531 return util.rekey(content)
1279 content = util.rekey(content)
1280 if isinstance(targets, int):
1281 return content[targets]
1282 else:
1283 return content
1532
1284
1533 @spinfirst
1285 @spin_first
1534 def purge_results(self, jobs=[], targets=[]):
1286 def purge_results(self, jobs=[], targets=[]):
1535 """Tell the controller to forget results.
1287 """Tell the Hub to forget results.
1536
1288
1537 Individual results can be purged by msg_id, or the entire
1289 Individual results can be purged by msg_id, or the entire
1538 history of specific targets can be purged.
1290 history of specific targets can be purged.
@@ -1540,11 +1292,11 b' class Client(HasTraits):'
1540 Parameters
1292 Parameters
1541 ----------
1293 ----------
1542
1294
1543 jobs : str or list of strs or AsyncResult objects
1295 jobs : str or list of str or AsyncResult objects
1544 the msg_ids whose results should be forgotten.
1296 the msg_ids whose results should be forgotten.
1545 targets : int/str/list of ints/strs
1297 targets : int/str/list of ints/strs
1546 The targets, by uuid or int_id, whose entire history is to be purged.
1298 The targets, by uuid or int_id, whose entire history is to be purged.
1547 Use `targets='all'` to scrub everything from the controller's memory.
1299 Use `targets='all'` to scrub everything from the Hub's memory.
1548
1300
1549 default : None
1301 default : None
1550 """
1302 """
@@ -6,11 +6,9 b''
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 from IPython.external.decorator import decorator
10
11 from .asyncresult import AsyncResult
9 from .asyncresult import AsyncResult
12 from .error import UnmetDependency
10 from .error import UnmetDependency
13
11 from .util import interactive
14
12
15 class depend(object):
13 class depend(object):
16 """Dependency decorator, for use with tasks.
14 """Dependency decorator, for use with tasks.
@@ -54,6 +52,8 b' class dependent(object):'
54 self.dkwargs = dkwargs
52 self.dkwargs = dkwargs
55
53
56 def __call__(self, *args, **kwargs):
54 def __call__(self, *args, **kwargs):
55 # if hasattr(self.f, 'func_globals') and hasattr(self.df, 'func_globals'):
56 # self.df.func_globals = self.f.func_globals
57 if self.df(*self.dargs, **self.dkwargs) is False:
57 if self.df(*self.dargs, **self.dkwargs) is False:
58 raise UnmetDependency()
58 raise UnmetDependency()
59 return self.f(*args, **kwargs)
59 return self.f(*args, **kwargs)
@@ -62,13 +62,18 b' class dependent(object):'
62 def __name__(self):
62 def __name__(self):
63 return self.func_name
63 return self.func_name
64
64
65 @interactive
65 def _require(*names):
66 def _require(*names):
66 """Helper for @require decorator."""
67 """Helper for @require decorator."""
68 from IPython.zmq.parallel.error import UnmetDependency
69 user_ns = globals()
67 for name in names:
70 for name in names:
71 if name in user_ns:
72 continue
68 try:
73 try:
69 __import__(name)
74 exec 'import %s'%name in user_ns
70 except ImportError:
75 except ImportError:
71 return False
76 raise UnmetDependency(name)
72 return True
77 return True
73
78
74 def require(*names):
79 def require(*names):
@@ -96,54 +101,73 b' class Dependency(set):'
96 all : bool [default True]
101 all : bool [default True]
97 Whether the dependency should be considered met when *all* depending tasks have completed
102 Whether the dependency should be considered met when *all* depending tasks have completed
98 or only when *any* have been completed.
103 or only when *any* have been completed.
99 success_only : bool [default True]
104 success : bool [default True]
100 Whether to consider only successes for Dependencies, or consider failures as well.
105 Whether to consider successes as fulfilling dependencies.
101 If `all=success_only=True`, then this task will fail with an ImpossibleDependency
106 failure : bool [default False]
107 Whether to consider failures as fulfilling dependencies.
108
109 If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
102 as soon as the first depended-upon task fails.
110 as soon as the first depended-upon task fails.
103 """
111 """
104
112
105 all=True
113 all=True
106 success_only=True
114 success=True
115 failure=True
107
116
108 def __init__(self, dependencies=[], all=True, success_only=True):
117 def __init__(self, dependencies=[], all=True, success=True, failure=False):
109 if isinstance(dependencies, dict):
118 if isinstance(dependencies, dict):
110 # load from dict
119 # load from dict
111 all = dependencies.get('all', True)
120 all = dependencies.get('all', True)
112 success_only = dependencies.get('success_only', success_only)
121 success = dependencies.get('success', success)
122 failure = dependencies.get('failure', failure)
113 dependencies = dependencies.get('dependencies', [])
123 dependencies = dependencies.get('dependencies', [])
114 ids = []
124 ids = []
115 if isinstance(dependencies, AsyncResult):
125
116 ids.extend(AsyncResult.msg_ids)
126 # extract ids from various sources:
117 else:
127 if isinstance(dependencies, (basestring, AsyncResult)):
118 for d in dependencies:
128 dependencies = [dependencies]
119 if isinstance(d, basestring):
129 for d in dependencies:
120 ids.append(d)
130 if isinstance(d, basestring):
121 elif isinstance(d, AsyncResult):
131 ids.append(d)
122 ids.extend(d.msg_ids)
132 elif isinstance(d, AsyncResult):
123 else:
133 ids.extend(d.msg_ids)
124 raise TypeError("invalid dependency type: %r"%type(d))
134 else:
135 raise TypeError("invalid dependency type: %r"%type(d))
136
125 set.__init__(self, ids)
137 set.__init__(self, ids)
126 self.all = all
138 self.all = all
127 self.success_only=success_only
139 if not (success or failure):
140 raise ValueError("Must depend on at least one of successes or failures!")
141 self.success=success
142 self.failure = failure
128
143
129 def check(self, completed, failed=None):
144 def check(self, completed, failed=None):
130 if failed is not None and not self.success_only:
145 """check whether our dependencies have been met."""
131 completed = completed.union(failed)
132 if len(self) == 0:
146 if len(self) == 0:
133 return True
147 return True
148 against = set()
149 if self.success:
150 against = completed
151 if failed is not None and self.failure:
152 against = against.union(failed)
134 if self.all:
153 if self.all:
135 return self.issubset(completed)
154 return self.issubset(against)
136 else:
155 else:
137 return not self.isdisjoint(completed)
156 return not self.isdisjoint(against)
138
157
139 def unreachable(self, failed):
158 def unreachable(self, completed, failed=None):
140 if len(self) == 0 or len(failed) == 0 or not self.success_only:
159 """return whether this dependency has become impossible."""
160 if len(self) == 0:
141 return False
161 return False
142 # print self, self.success_only, self.all, failed
162 against = set()
163 if not self.success:
164 against = completed
165 if failed is not None and not self.failure:
166 against = against.union(failed)
143 if self.all:
167 if self.all:
144 return not self.isdisjoint(failed)
168 return not self.isdisjoint(against)
145 else:
169 else:
146 return self.issubset(failed)
170 return self.issubset(against)
147
171
148
172
149 def as_dict(self):
173 def as_dict(self):
@@ -151,9 +175,10 b' class Dependency(set):'
151 return dict(
175 return dict(
152 dependencies=list(self),
176 dependencies=list(self),
153 all=self.all,
177 all=self.all,
154 success_only=self.success_only,
178 success=self.success,
179 failure=self.failure
155 )
180 )
156
181
157
182
158 __all__ = ['depend', 'require', 'dependent', 'Dependency']
183 __all__ = ['depend', 'require', 'dependent', 'Dependency']
159
184
@@ -890,13 +890,9 b' class Hub(LoggingFactory):'
890
890
891 def shutdown_request(self, client_id, msg):
891 def shutdown_request(self, client_id, msg):
892 """handle shutdown request."""
892 """handle shutdown request."""
893 # s = self.context.socket(zmq.XREQ)
894 # s.connect(self.client_connections['mux'])
895 # time.sleep(0.1)
896 # for eid,ec in self.engines.iteritems():
897 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
898 # time.sleep(1)
899 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
893 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
894 # also notify other clients of shutdown
895 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
900 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
896 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
901 dc.start()
897 dc.start()
902
898
@@ -1,4 +1,4 b''
1 """Remote Functions and decorators for the client."""
1 """Remote Functions and decorators for Views."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
@@ -22,33 +22,33 b' from .asyncresult import AsyncMapResult'
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 @testdec.skip_doctest
24 @testdec.skip_doctest
25 def remote(client, bound=False, block=None, targets=None, balanced=None):
25 def remote(view, block=None, **flags):
26 """Turn a function into a remote function.
26 """Turn a function into a remote function.
27
27
28 This method can be used for map:
28 This method can be used for map:
29
29
30 In [1]: @remote(client,block=True)
30 In [1]: @remote(view,block=True)
31 ...: def func(a):
31 ...: def func(a):
32 ...: pass
32 ...: pass
33 """
33 """
34
34
35 def remote_function(f):
35 def remote_function(f):
36 return RemoteFunction(client, f, bound, block, targets, balanced)
36 return RemoteFunction(view, f, block=block, **flags)
37 return remote_function
37 return remote_function
38
38
39 @testdec.skip_doctest
39 @testdec.skip_doctest
40 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
40 def parallel(view, dist='b', block=None, **flags):
41 """Turn a function into a parallel remote function.
41 """Turn a function into a parallel remote function.
42
42
43 This method can be used for map:
43 This method can be used for map:
44
44
45 In [1]: @parallel(client,block=True)
45 In [1]: @parallel(view, block=True)
46 ...: def func(a):
46 ...: def func(a):
47 ...: pass
47 ...: pass
48 """
48 """
49
49
50 def parallel_function(f):
50 def parallel_function(f):
51 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
51 return ParallelFunction(view, f, dist=dist, block=block, **flags)
52 return parallel_function
52 return parallel_function
53
53
54 #--------------------------------------------------------------------------
54 #--------------------------------------------------------------------------
@@ -61,44 +61,32 b' class RemoteFunction(object):'
61 Parameters
61 Parameters
62 ----------
62 ----------
63
63
64 client : Client instance
64 view : View instance
65 The client to be used to connect to engines
65 The view to be used for execution
66 f : callable
66 f : callable
67 The function to be wrapped into a remote function
67 The function to be wrapped into a remote function
68 bound : bool [default: False]
69 Whether the affect the remote namespace when called
70 block : bool [default: None]
68 block : bool [default: None]
71 Whether to wait for results or not. The default behavior is
69 Whether to wait for results or not. The default behavior is
72 to use the current `block` attribute of `client`
70 to use the current `block` attribute of `view`
73 targets : valid target list [default: all]
71
74 The targets on which to execute.
72 **flags : remaining kwargs are passed to View.temp_flags
75 balanced : bool
76 Whether to load-balance with the Task scheduler or not
77 """
73 """
78
74
79 client = None # the remote connection
75 view = None # the remote connection
80 func = None # the wrapped function
76 func = None # the wrapped function
81 block = None # whether to block
77 block = None # whether to block
82 bound = None # whether to affect the namespace
78 flags = None # dict of extra kwargs for temp_flags
83 targets = None # where to execute
84 balanced = None # whether to load-balance
85
79
86 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
80 def __init__(self, view, f, block=None, **flags):
87 self.client = client
81 self.view = view
88 self.func = f
82 self.func = f
89 self.block=block
83 self.block=block
90 self.bound=bound
84 self.flags=flags
91 self.targets=targets
92 if balanced is None:
93 if targets is None:
94 balanced = True
95 else:
96 balanced = False
97 self.balanced = balanced
98
85
99 def __call__(self, *args, **kwargs):
86 def __call__(self, *args, **kwargs):
100 return self.client.apply(self.func, args=args, kwargs=kwargs,
87 block = self.view.block if self.block is None else self.block
101 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
88 with self.view.temp_flags(block=block, **self.flags):
89 return self.view.apply(self.func, *args, **kwargs)
102
90
103
91
104 class ParallelFunction(RemoteFunction):
92 class ParallelFunction(RemoteFunction):
@@ -111,51 +99,57 b' class ParallelFunction(RemoteFunction):'
111 Parameters
99 Parameters
112 ----------
100 ----------
113
101
114 client : Client instance
102 view : View instance
115 The client to be used to connect to engines
103 The view to be used for execution
116 f : callable
104 f : callable
117 The function to be wrapped into a remote function
105 The function to be wrapped into a remote function
118 bound : bool [default: False]
106 dist : str [default: 'b']
119 Whether the affect the remote namespace when called
107 The key for which mapObject to use to distribute sequences
108 options are:
109 * 'b' : use contiguous chunks in order
110 * 'r' : use round-robin striping
120 block : bool [default: None]
111 block : bool [default: None]
121 Whether to wait for results or not. The default behavior is
112 Whether to wait for results or not. The default behavior is
122 to use the current `block` attribute of `client`
113 to use the current `block` attribute of `view`
123 targets : valid target list [default: all]
114 chunksize : int or None
124 The targets on which to execute.
125 balanced : bool
126 Whether to load-balance with the Task scheduler or not
127 chunk_size : int or None
128 The size of chunk to use when breaking up sequences in a load-balanced manner
115 The size of chunk to use when breaking up sequences in a load-balanced manner
116 **flags : remaining kwargs are passed to View.temp_flags
129 """
117 """
130 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
118
131 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
119 chunksize=None
132 self.chunk_size = chunk_size
120 mapObject=None
121
122 def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
123 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
124 self.chunksize = chunksize
133
125
134 mapClass = Map.dists[dist]
126 mapClass = Map.dists[dist]
135 self.mapObject = mapClass()
127 self.mapObject = mapClass()
136
128
137 def __call__(self, *sequences):
129 def __call__(self, *sequences):
130 # check that the length of sequences match
138 len_0 = len(sequences[0])
131 len_0 = len(sequences[0])
139 for s in sequences:
132 for s in sequences:
140 if len(s)!=len_0:
133 if len(s)!=len_0:
141 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
134 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
142 raise ValueError(msg)
135 raise ValueError(msg)
143
136 balanced = 'Balanced' in self.view.__class__.__name__
144 if self.balanced:
137 if balanced:
145 if self.chunk_size:
138 if self.chunksize:
146 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
139 nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
147 else:
140 else:
148 nparts = len_0
141 nparts = len_0
149 targets = [self.targets]*nparts
142 targets = [None]*nparts
150 else:
143 else:
151 if self.chunk_size:
144 if self.chunksize:
152 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
145 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
153 # multiplexed:
146 # multiplexed:
154 targets = self.client._build_targets(self.targets)[-1]
147 targets = self.view.targets
155 nparts = len(targets)
148 nparts = len(targets)
156
149
157 msg_ids = []
150 msg_ids = []
158 # my_f = lambda *a: map(self.func, *a)
151 # my_f = lambda *a: map(self.func, *a)
152 client = self.view.client
159 for index, t in enumerate(targets):
153 for index, t in enumerate(targets):
160 args = []
154 args = []
161 for seq in sequences:
155 for seq in sequences:
@@ -173,12 +167,15 b' class ParallelFunction(RemoteFunction):'
173 args = [self.func]+args
167 args = [self.func]+args
174 else:
168 else:
175 f=self.func
169 f=self.func
176 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
170
177 targets=t, balanced=self.balanced)
171 view = self.view if balanced else client[t]
172 with view.temp_flags(block=False, **self.flags):
173 ar = view.apply(f, *args)
178
174
179 msg_ids.append(ar.msg_ids[0])
175 msg_ids.append(ar.msg_ids[0])
180
176
181 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
177 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
178
182 if self.block:
179 if self.block:
183 try:
180 try:
184 return r.get()
181 return r.get()
@@ -238,7 +238,7 b' class TaskScheduler(SessionFactory):'
238 msg = self.session.unpack_message(msg, copy=False, content=False)
238 msg = self.session.unpack_message(msg, copy=False, content=False)
239 parent = msg['header']
239 parent = msg['header']
240 idents = [idents[0],engine]+idents[1:]
240 idents = [idents[0],engine]+idents[1:]
241 print (idents)
241 # print (idents)
242 try:
242 try:
243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
244 except:
244 except:
@@ -277,8 +277,9 b' class TaskScheduler(SessionFactory):'
277 # time dependencies
277 # time dependencies
278 after = Dependency(header.get('after', []))
278 after = Dependency(header.get('after', []))
279 if after.all:
279 if after.all:
280 after.difference_update(self.all_completed)
280 if after.success:
281 if not after.success_only:
281 after.difference_update(self.all_completed)
282 if after.failure:
282 after.difference_update(self.all_failed)
283 after.difference_update(self.all_failed)
283 if after.check(self.all_completed, self.all_failed):
284 if after.check(self.all_completed, self.all_failed):
284 # recast as empty set, if `after` already met,
285 # recast as empty set, if `after` already met,
@@ -302,7 +303,7 b' class TaskScheduler(SessionFactory):'
302 self.depending[msg_id] = args
303 self.depending[msg_id] = args
303 return self.fail_unreachable(msg_id, error.InvalidDependency)
304 return self.fail_unreachable(msg_id, error.InvalidDependency)
304 # check if unreachable:
305 # check if unreachable:
305 if dep.unreachable(self.all_failed):
306 if dep.unreachable(self.all_completed, self.all_failed):
306 self.depending[msg_id] = args
307 self.depending[msg_id] = args
307 return self.fail_unreachable(msg_id)
308 return self.fail_unreachable(msg_id)
308
309
@@ -379,7 +380,11 b' class TaskScheduler(SessionFactory):'
379 if follow.all:
380 if follow.all:
380 # check follow for impossibility
381 # check follow for impossibility
381 dests = set()
382 dests = set()
382 relevant = self.all_completed if follow.success_only else self.all_done
383 relevant = set()
384 if follow.success:
385 relevant = self.all_completed
386 if follow.failure:
387 relevant = relevant.union(self.all_failed)
383 for m in follow.intersection(relevant):
388 for m in follow.intersection(relevant):
384 dests.add(self.destinations[m])
389 dests.add(self.destinations[m])
385 if len(dests) > 1:
390 if len(dests) > 1:
@@ -514,11 +519,8 b' class TaskScheduler(SessionFactory):'
514
519
515 for msg_id in jobs:
520 for msg_id in jobs:
516 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
521 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
517 # if dep_id in after:
518 # if after.all and (success or not after.success_only):
519 # after.remove(dep_id)
520
522
521 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
523 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
522 self.fail_unreachable(msg_id)
524 self.fail_unreachable(msg_id)
523
525
524 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
526 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
@@ -170,7 +170,7 b' class Kernel(SessionFactory):'
170
170
171 content = dict(status='ok')
171 content = dict(status='ok')
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
173 parent=parent, ident=ident)[0]
173 parent=parent, ident=ident)
174 self.log.debug(str(reply_msg))
174 self.log.debug(str(reply_msg))
175
175
176 def shutdown_request(self, stream, ident, parent):
176 def shutdown_request(self, stream, ident, parent):
@@ -184,10 +184,7 b' class Kernel(SessionFactory):'
184 content['status'] = 'ok'
184 content['status'] = 'ok'
185 msg = self.session.send(stream, 'shutdown_reply',
185 msg = self.session.send(stream, 'shutdown_reply',
186 content=content, parent=parent, ident=ident)
186 content=content, parent=parent, ident=ident)
187 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
187 self.log.debug(str(msg))
188 # content, parent, ident)
189 # print >> sys.__stdout__, msg
190 # time.sleep(0.2)
191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc.start()
189 dc.start()
193
190
@@ -295,7 +292,7 b' class Kernel(SessionFactory):'
295 content = parent[u'content']
292 content = parent[u'content']
296 bufs = parent[u'buffers']
293 bufs = parent[u'buffers']
297 msg_id = parent['header']['msg_id']
294 msg_id = parent['header']['msg_id']
298 bound = content.get('bound', False)
295 # bound = parent['header'].get('bound', False)
299 except:
296 except:
300 self.log.error("Got bad msg: %s"%parent, exc_info=True)
297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
301 return
298 return
@@ -314,16 +311,12 b' class Kernel(SessionFactory):'
314 working = self.user_ns
311 working = self.user_ns
315 # suffix =
312 # suffix =
316 prefix = "_"+str(msg_id).replace("-","")+"_"
313 prefix = "_"+str(msg_id).replace("-","")+"_"
317 # if bound:
314
318 #
319 # else:
320 # working = dict()
321 # suffix = prefix = "_" # prevent keyword collisions with lambda
322 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
323 if bound:
316 # if bound:
324 bound_ns = Namespace(working)
317 # bound_ns = Namespace(working)
325 args = [bound_ns]+list(args)
318 # args = [bound_ns]+list(args)
326 # if f.fun
319
327 fname = getattr(f, '__name__', 'f')
320 fname = getattr(f, '__name__', 'f')
328
321
329 fname = prefix+"f"
322 fname = prefix+"f"
@@ -341,8 +334,8 b' class Kernel(SessionFactory):'
341 finally:
334 finally:
342 for key in ns.iterkeys():
335 for key in ns.iterkeys():
343 working.pop(key)
336 working.pop(key)
344 if bound:
337 # if bound:
345 working.update(bound_ns)
338 # working.update(bound_ns)
346
339
347 packed_result,buf = serialize_object(result)
340 packed_result,buf = serialize_object(result)
348 result_buf = [packed_result]+buf
341 result_buf = [packed_result]+buf
@@ -364,9 +357,12 b' class Kernel(SessionFactory):'
364
357
365 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
366 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
367
360
368 # if reply_msg['content']['status'] == u'error':
361 # flush i/o
369 # self.abort_queues()
362 # should this be before reply_msg is sent, like in the single-kernel code,
363 # or should nothing get in the way of real results?
364 sys.stdout.flush()
365 sys.stderr.flush()
370
366
371 def dispatch_queue(self, stream, msg):
367 def dispatch_queue(self, stream, msg):
372 self.control_stream.flush()
368 self.control_stream.flush()
@@ -1,5 +1,16 b''
1 """toplevel setup/teardown for parallel tests."""
1 """toplevel setup/teardown for parallel tests."""
2
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
3 import tempfile
14 import tempfile
4 import time
15 import time
5 from subprocess import Popen, PIPE, STDOUT
16 from subprocess import Popen, PIPE, STDOUT
@@ -15,17 +26,27 b' def setup():'
15 cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT)
26 cp = Popen('ipcontrollerz --profile iptest -r --log-level 10 --log-to-file'.split(), stdout=blackhole, stderr=STDOUT)
16 processes.append(cp)
27 processes.append(cp)
17 time.sleep(.5)
28 time.sleep(.5)
18 add_engine()
29 add_engines(1)
19 c = client.Client(profile='iptest')
30 c = client.Client(profile='iptest')
20 while not c.ids:
31 while not c.ids:
21 time.sleep(.1)
32 time.sleep(.1)
22 c.spin()
33 c.spin()
34 c.close()
23
35
24 def add_engine(profile='iptest'):
36 def add_engines(n=1, profile='iptest'):
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
37 rc = client.Client(profile=profile)
26 # ep.start()
38 base = len(rc)
27 processes.append(ep)
39 eps = []
28 return ep
40 for i in range(n):
41 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
42 # ep.start()
43 processes.append(ep)
44 eps.append(ep)
45 while len(rc) < base+n:
46 time.sleep(.1)
47 rc.spin()
48 rc.close()
49 return eps
29
50
30 def teardown():
51 def teardown():
31 time.sleep(1)
52 time.sleep(1)
@@ -1,3 +1,12 b''
1 """base class for parallel client tests"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
1 import sys
10 import sys
2 import tempfile
11 import tempfile
3 import time
12 import time
@@ -6,6 +15,7 b' from multiprocessing import Process'
6
15
7 from nose import SkipTest
16 from nose import SkipTest
8
17
18 import zmq
9 from zmq.tests import BaseZMQTestCase
19 from zmq.tests import BaseZMQTestCase
10
20
11 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
@@ -14,7 +24,7 b' from IPython.zmq.parallel import error'
14 from IPython.zmq.parallel.client import Client
24 from IPython.zmq.parallel.client import Client
15 from IPython.zmq.parallel.ipcluster import launch_process
25 from IPython.zmq.parallel.ipcluster import launch_process
16 from IPython.zmq.parallel.entry_point import select_random_ports
26 from IPython.zmq.parallel.entry_point import select_random_ports
17 from IPython.zmq.parallel.tests import processes,add_engine
27 from IPython.zmq.parallel.tests import processes,add_engines
18
28
19 # simple tasks for use in apply tests
29 # simple tasks for use in apply tests
20
30
@@ -47,13 +57,11 b' def skip_without(*names):'
47 return f(*args, **kwargs)
57 return f(*args, **kwargs)
48 return skip_without_names
58 return skip_without_names
49
59
50
51 class ClusterTestCase(BaseZMQTestCase):
60 class ClusterTestCase(BaseZMQTestCase):
52
61
53 def add_engines(self, n=1, block=True):
62 def add_engines(self, n=1, block=True):
54 """add multiple engines to our cluster"""
63 """add multiple engines to our cluster"""
55 for i in range(n):
64 self.engines.extend(add_engines(n))
56 self.engines.append(add_engine())
57 if block:
65 if block:
58 self.wait_on_engines()
66 self.wait_on_engines()
59
67
@@ -68,10 +76,11 b' class ClusterTestCase(BaseZMQTestCase):'
68
76
69 def connect_client(self):
77 def connect_client(self):
70 """connect a client with my Context, and track its sockets for cleanup"""
78 """connect a client with my Context, and track its sockets for cleanup"""
71 c = Client(profile='iptest',context=self.context)
79 c = Client(profile='iptest', context=self.context)
72
80 for name in filter(lambda n:n.endswith('socket'), dir(c)):
73 # for name in filter(lambda n:n.endswith('socket'), dir(c)):
81 s = getattr(c, name)
74 # self.sockets.append(getattr(c, name))
82 s.setsockopt(zmq.LINGER, 0)
83 self.sockets.append(s)
75 return c
84 return c
76
85
77 def assertRaisesRemote(self, etype, f, *args, **kwargs):
86 def assertRaisesRemote(self, etype, f, *args, **kwargs):
@@ -92,15 +101,19 b' class ClusterTestCase(BaseZMQTestCase):'
92 self.engines=[]
101 self.engines=[]
93
102
94 def tearDown(self):
103 def tearDown(self):
95
104 # self.client.clear(block=True)
96 # close fds:
105 # close fds:
97 for e in filter(lambda e: e.poll() is not None, processes):
106 for e in filter(lambda e: e.poll() is not None, processes):
98 processes.remove(e)
107 processes.remove(e)
99
108
109 # allow flushing of incoming messages to prevent crash on socket close
110 self.client.wait(timeout=2)
111 # time.sleep(2)
112 self.client.spin()
100 self.client.close()
113 self.client.close()
101 BaseZMQTestCase.tearDown(self)
114 BaseZMQTestCase.tearDown(self)
102 # this will be superfluous when pyzmq merges PR #88
115 # this will be redundant when pyzmq merges PR #88
103 self.context.term()
116 # self.context.term()
104 # print tempfile.TemporaryFile().fileno(),
117 # print tempfile.TemporaryFile().fileno(),
105 # sys.stdout.flush()
118 # sys.stdout.flush()
106 No newline at end of file
119
@@ -1,3 +1,16 b''
1 """Tests for parallel client.py"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
1 import time
14 import time
2 from tempfile import mktemp
15 from tempfile import mktemp
3
16
@@ -8,7 +21,10 b' from IPython.zmq.parallel import error'
8 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
21 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
9 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
22 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
10
23
11 from clienttest import ClusterTestCase, segfault, wait
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
25
26 def setup():
27 add_engines(4)
12
28
13 class TestClient(ClusterTestCase):
29 class TestClient(ClusterTestCase):
14
30
@@ -17,27 +33,6 b' class TestClient(ClusterTestCase):'
17 self.add_engines(3)
33 self.add_engines(3)
18 self.assertEquals(len(self.client.ids), n+3)
34 self.assertEquals(len(self.client.ids), n+3)
19
35
20 def test_segfault_task(self):
21 """test graceful handling of engine death (balanced)"""
22 self.add_engines(1)
23 ar = self.client.apply(segfault, block=False)
24 self.assertRaisesRemote(error.EngineError, ar.get)
25 eid = ar.engine_id
26 while eid in self.client.ids:
27 time.sleep(.01)
28 self.client.spin()
29
30 def test_segfault_mux(self):
31 """test graceful handling of engine death (direct)"""
32 self.add_engines(1)
33 eid = self.client.ids[-1]
34 ar = self.client[eid].apply_async(segfault)
35 self.assertRaisesRemote(error.EngineError, ar.get)
36 eid = ar.engine_id
37 while eid in self.client.ids:
38 time.sleep(.01)
39 self.client.spin()
40
41 def test_view_indexing(self):
36 def test_view_indexing(self):
42 """test index access for views"""
37 """test index access for views"""
43 self.add_engines(2)
38 self.add_engines(2)
@@ -71,8 +66,8 b' class TestClient(ClusterTestCase):'
71 v = self.client[:2]
66 v = self.client[:2]
72 v2 =self.client[:2]
67 v2 =self.client[:2]
73 self.assertTrue(v is v2)
68 self.assertTrue(v is v2)
74 v = self.client.view()
69 v = self.client.load_balanced_view()
75 v2 = self.client.view(balanced=True)
70 v2 = self.client.load_balanced_view(targets=None)
76 self.assertTrue(v is v2)
71 self.assertTrue(v is v2)
77
72
78 def test_targets(self):
73 def test_targets(self):
@@ -84,102 +79,26 b' class TestClient(ClusterTestCase):'
84
79
85 def test_clear(self):
80 def test_clear(self):
86 """test clear behavior"""
81 """test clear behavior"""
87 self.add_engines(2)
82 # self.add_engines(2)
88 self.client.block=True
83 v = self.client[:]
89 self.client.push(dict(a=5))
84 v.block=True
90 self.client.pull('a')
85 v.push(dict(a=5))
86 v.pull('a')
91 id0 = self.client.ids[-1]
87 id0 = self.client.ids[-1]
92 self.client.clear(targets=id0)
88 self.client.clear(targets=id0)
93 self.client.pull('a', targets=self.client.ids[:-1])
89 self.client[:-1].pull('a')
94 self.assertRaisesRemote(NameError, self.client.pull, 'a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
95 self.client.clear()
91 self.client.clear(block=True)
96 for i in self.client.ids:
92 for i in self.client.ids:
97 self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
93 # print i
98
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
99
100 def test_push_pull(self):
101 """test pushing and pulling"""
102 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
103 t = self.client.ids[-1]
104 self.add_engines(2)
105 push = self.client.push
106 pull = self.client.pull
107 self.client.block=True
108 nengines = len(self.client)
109 push({'data':data}, targets=t)
110 d = pull('data', targets=t)
111 self.assertEquals(d, data)
112 push({'data':data})
113 d = pull('data')
114 self.assertEquals(d, nengines*[data])
115 ar = push({'data':data}, block=False)
116 self.assertTrue(isinstance(ar, AsyncResult))
117 r = ar.get()
118 ar = pull('data', block=False)
119 self.assertTrue(isinstance(ar, AsyncResult))
120 r = ar.get()
121 self.assertEquals(r, nengines*[data])
122 push(dict(a=10,b=20))
123 r = pull(('a','b'))
124 self.assertEquals(r, nengines*[[10,20]])
125
126 def test_push_pull_function(self):
127 "test pushing and pulling functions"
128 def testf(x):
129 return 2.0*x
130
131 self.add_engines(4)
132 t = self.client.ids[-1]
133 self.client.block=True
134 push = self.client.push
135 pull = self.client.pull
136 execute = self.client.execute
137 push({'testf':testf}, targets=t)
138 r = pull('testf', targets=t)
139 self.assertEqual(r(1.0), testf(1.0))
140 execute('r = testf(10)', targets=t)
141 r = pull('r', targets=t)
142 self.assertEquals(r, testf(10))
143 ar = push({'testf':testf}, block=False)
144 ar.get()
145 ar = pull('testf', block=False)
146 rlist = ar.get()
147 for r in rlist:
148 self.assertEqual(r(1.0), testf(1.0))
149 execute("def g(x): return x*x", targets=t)
150 r = pull(('testf','g'),targets=t)
151 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
152
153 def test_push_function_globals(self):
154 """test that pushed functions have access to globals"""
155 def geta():
156 return a
157 self.add_engines(1)
158 v = self.client[-1]
159 v.block=True
160 v['f'] = geta
161 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
162 v.execute('a=5')
163 v.execute('b=f()')
164 self.assertEquals(v['b'], 5)
165
95
166 def test_push_function_defaults(self):
167 """test that pushed functions preserve default args"""
168 def echo(a=10):
169 return a
170 self.add_engines(1)
171 v = self.client[-1]
172 v.block=True
173 v['f'] = echo
174 v.execute('b=f()')
175 self.assertEquals(v['b'], 10)
176
177 def test_get_result(self):
96 def test_get_result(self):
178 """test getting results from the Hub."""
97 """test getting results from the Hub."""
179 c = clientmod.Client(profile='iptest')
98 c = clientmod.Client(profile='iptest')
180 self.add_engines(1)
99 # self.add_engines(1)
181 t = c.ids[-1]
100 t = c.ids[-1]
182 ar = c.apply(wait, (1,), block=False, targets=t)
101 ar = c[t].apply_async(wait, 1)
183 # give the monitor time to notice the message
102 # give the monitor time to notice the message
184 time.sleep(.25)
103 time.sleep(.25)
185 ahr = self.client.get_result(ar.msg_ids)
104 ahr = self.client.get_result(ar.msg_ids)
@@ -187,76 +106,42 b' class TestClient(ClusterTestCase):'
187 self.assertEquals(ahr.get(), ar.get())
106 self.assertEquals(ahr.get(), ar.get())
188 ar2 = self.client.get_result(ar.msg_ids)
107 ar2 = self.client.get_result(ar.msg_ids)
189 self.assertFalse(isinstance(ar2, AsyncHubResult))
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 c.close()
190
110
191 def test_ids_list(self):
111 def test_ids_list(self):
192 """test client.ids"""
112 """test client.ids"""
193 self.add_engines(2)
113 # self.add_engines(2)
194 ids = self.client.ids
114 ids = self.client.ids
195 self.assertEquals(ids, self.client._ids)
115 self.assertEquals(ids, self.client._ids)
196 self.assertFalse(ids is self.client._ids)
116 self.assertFalse(ids is self.client._ids)
197 ids.remove(ids[-1])
117 ids.remove(ids[-1])
198 self.assertNotEquals(ids, self.client._ids)
118 self.assertNotEquals(ids, self.client._ids)
199
119
200 def test_run_newline(self):
120 def test_queue_status(self):
201 """test that run appends newline to files"""
121 # self.addEngine(4)
202 tmpfile = mktemp()
122 ids = self.client.ids
203 with open(tmpfile, 'w') as f:
123 id0 = ids[0]
204 f.write("""def g():
124 qs = self.client.queue_status(targets=id0)
205 return 5
125 self.assertTrue(isinstance(qs, dict))
206 """)
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
207 v = self.client[-1]
127 allqs = self.client.queue_status()
208 v.run(tmpfile, block=True)
128 self.assertTrue(isinstance(allqs, dict))
209 self.assertEquals(v.apply_sync(lambda : g()), 5)
129 self.assertEquals(sorted(allqs.keys()), self.client.ids)
130 for eid,qs in allqs.items():
131 self.assertTrue(isinstance(qs, dict))
132 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
210
133
211 def test_apply_tracked(self):
134 def test_shutdown(self):
212 """test tracking for apply"""
135 # self.addEngine(4)
213 # self.add_engines(1)
136 ids = self.client.ids
214 t = self.client.ids[-1]
137 id0 = ids[0]
215 self.client.block=False
138 self.client.shutdown(id0, block=True)
216 def echo(n=1024*1024, **kwargs):
139 while id0 in self.client.ids:
217 return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs)
140 time.sleep(0.1)
218 ar = echo(1)
141 self.client.spin()
219 self.assertTrue(ar._tracker is None)
220 self.assertTrue(ar.sent)
221 ar = echo(track=True)
222 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
223 self.assertEquals(ar.sent, ar._tracker.done)
224 ar._tracker.wait()
225 self.assertTrue(ar.sent)
226
227 def test_push_tracked(self):
228 t = self.client.ids[-1]
229 ns = dict(x='x'*1024*1024)
230 ar = self.client.push(ns, targets=t, block=False)
231 self.assertTrue(ar._tracker is None)
232 self.assertTrue(ar.sent)
233
234 ar = self.client.push(ns, targets=t, block=False, track=True)
235 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
236 self.assertEquals(ar.sent, ar._tracker.done)
237 ar._tracker.wait()
238 self.assertTrue(ar.sent)
239 ar.get()
240
142
241 def test_scatter_tracked(self):
143 self.assertRaises(IndexError, lambda : self.client[id0])
242 t = self.client.ids
243 x='x'*1024*1024
244 ar = self.client.scatter('x', x, targets=t, block=False)
245 self.assertTrue(ar._tracker is None)
246 self.assertTrue(ar.sent)
247
144
248 ar = self.client.scatter('x', x, targets=t, block=False, track=True)
145 def test_result_status(self):
249 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
146 pass
250 self.assertEquals(ar.sent, ar._tracker.done)
147 # to be written
251 ar._tracker.wait()
252 self.assertTrue(ar.sent)
253 ar.get()
254
255 def test_remote_reference(self):
256 v = self.client[-1]
257 v['a'] = 123
258 ra = clientmod.Reference('a')
259 b = v.apply_sync(lambda x: x, ra)
260 self.assertEquals(b, 123)
261
262
@@ -1,5 +1,16 b''
1 """test serialization with newserialized"""
1 """test serialization with newserialized"""
2
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
3 from unittest import TestCase
14 from unittest import TestCase
4
15
5 from IPython.testing.parametric import parametric
16 from IPython.testing.parametric import parametric
@@ -82,6 +93,16 b' class CanningTestCase(TestCase):'
82 # test non-copying:
93 # test non-copying:
83 a[2] = 1e9
94 a[2] = 1e9
84 self.assertTrue((a==final).all())
95 self.assertTrue((a==final).all())
85
96
97 def test_uncan_function_globals(self):
98 """test that uncanning a module function restores it into its module"""
99 from re import search
100 cf = can(search)
101 csearch = uncan(cf)
102 self.assertEqual(csearch.__module__, search.__module__)
103 self.assertNotEqual(csearch('asd', 'asdf'), None)
104 csearch = uncan(cf, dict(a=5))
105 self.assertEqual(csearch.__module__, search.__module__)
106 self.assertNotEqual(csearch('asd', 'asdf'), None)
86
107
87 No newline at end of file
108
@@ -1,3 +1,15 b''
1 """test building messages with streamsession"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
1
13
2 import os
14 import os
3 import uuid
15 import uuid
@@ -316,3 +316,39 b' def unpack_apply_message(bufs, g=None, copy=True):'
316
316
317 return f,args,kwargs
317 return f,args,kwargs
318
318
319 #--------------------------------------------------------------------------
320 # helpers for implementing old MEC API via view.apply
321 #--------------------------------------------------------------------------
322
323 def interactive(f):
324 """decorator for making functions appear as interactively defined.
325 This results in the function being linked to the user_ns as globals()
326 instead of the module globals().
327 """
328 f.__module__ = '__main__'
329 return f
330
331 @interactive
332 def _push(ns):
333 """helper method for implementing `client.push` via `client.apply`"""
334 globals().update(ns)
335
336 @interactive
337 def _pull(keys):
338 """helper method for implementing `client.pull` via `client.apply`"""
339 user_ns = globals()
340 if isinstance(keys, (list,tuple, set)):
341 for key in keys:
342 if not user_ns.has_key(key):
343 raise NameError("name '%s' is not defined"%key)
344 return map(user_ns.get, keys)
345 else:
346 if not user_ns.has_key(keys):
347 raise NameError("name '%s' is not defined"%keys)
348 return user_ns.get(keys)
349
350 @interactive
351 def _execute(code):
352 """helper method for implementing `client.execute` via `client.apply`"""
353 exec code in globals()
354
This diff has been collapsed as it changes many lines, (610 lines changed) Show them Hide them
@@ -10,13 +10,20 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import warnings
14 from contextlib import contextmanager
15
16 import zmq
17
13 from IPython.testing import decorators as testdec
18 from IPython.testing import decorators as testdec
14 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance
19 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance
15
20
16 from IPython.external.decorator import decorator
21 from IPython.external.decorator import decorator
17
22
18 from .asyncresult import AsyncResult
23 from . import map as Map
19 from .dependency import Dependency
24 from . import util
25 from .asyncresult import AsyncResult, AsyncMapResult
26 from .dependency import Dependency, dependent
20 from .remotefunction import ParallelFunction, parallel, remote
27 from .remotefunction import ParallelFunction, parallel, remote
21
28
22 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
@@ -24,25 +31,16 b' from .remotefunction import ParallelFunction, parallel, remote'
24 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
25
32
26 @decorator
33 @decorator
27 def myblock(f, self, *args, **kwargs):
28 """override client.block with self.block during a call"""
29 block = self.client.block
30 self.client.block = self.block
31 try:
32 ret = f(self, *args, **kwargs)
33 finally:
34 self.client.block = block
35 return ret
36
37 @decorator
38 def save_ids(f, self, *args, **kwargs):
34 def save_ids(f, self, *args, **kwargs):
39 """Keep our history and outstanding attributes up to date after a method call."""
35 """Keep our history and outstanding attributes up to date after a method call."""
40 n_previous = len(self.client.history)
36 n_previous = len(self.client.history)
41 ret = f(self, *args, **kwargs)
37 try:
42 nmsgs = len(self.client.history) - n_previous
38 ret = f(self, *args, **kwargs)
43 msg_ids = self.client.history[-nmsgs:]
39 finally:
44 self.history.extend(msg_ids)
40 nmsgs = len(self.client.history) - n_previous
45 map(self.outstanding.add, msg_ids)
41 msg_ids = self.client.history[-nmsgs:]
42 self.history.extend(msg_ids)
43 map(self.outstanding.add, msg_ids)
46 return ret
44 return ret
47
45
48 @decorator
46 @decorator
@@ -71,27 +69,54 b' class View(HasTraits):'
71 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
72
70
73 Don't use this class, use subclasses.
71 Don't use this class, use subclasses.
72
73 Methods
74 -------
75
76 spin
77 flushes incoming results and registration state changes
78 control methods spin, and requesting `ids` also ensures up to date
79
80 wait
81 wait on one or more msg_ids
82
83 execution methods
84 apply
85 legacy: execute, run
86
87 data movement
88 push, pull, scatter, gather
89
90 query methods
91 get_result, queue_status, purge_results, result_status
92
93 control methods
94 abort, shutdown
95
74 """
96 """
75 block=Bool(False)
97 block=Bool(False)
76 bound=Bool(False)
98 track=Bool(True)
77 track=Bool(False)
78 history=List()
99 history=List()
79 outstanding = Set()
100 outstanding = Set()
80 results = Dict()
101 results = Dict()
81 client = Instance('IPython.zmq.parallel.client.Client')
102 client = Instance('IPython.zmq.parallel.client.Client')
82
103
104 _socket = Instance('zmq.Socket')
83 _ntargets = Int(1)
105 _ntargets = Int(1)
84 _balanced = Bool(False)
106 _flag_names = List(['block', 'track'])
85 _default_names = List(['block', 'bound', 'track'])
86 _targets = Any()
107 _targets = Any()
108 _idents = Any()
87
109
88 def __init__(self, client=None, targets=None):
110 def __init__(self, client=None, socket=None, targets=None):
89 super(View, self).__init__(client=client)
111 super(View, self).__init__(client=client, _socket=socket)
90 self._targets = targets
91 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
112 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
92 self.block = client.block
113 self.block = client.block
93
114
94 for name in self._default_names:
115 self._idents, self._targets = self.client._build_targets(targets)
116 if targets is None or isinstance(targets, int):
117 self._targets = targets
118 for name in self._flag_names:
119 # set flags, if they haven't been set yet
95 setattr(self, name, getattr(self, name, None))
120 setattr(self, name, getattr(self, name, None))
96
121
97 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
122 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
@@ -111,134 +136,127 b' class View(HasTraits):'
111 def targets(self, value):
136 def targets(self, value):
112 raise AttributeError("Cannot set View `targets` after construction!")
137 raise AttributeError("Cannot set View `targets` after construction!")
113
138
114 @property
115 def balanced(self):
116 return self._balanced
117
118 @balanced.setter
119 def balanced(self, value):
120 raise AttributeError("Cannot set View `balanced` after construction!")
121
122 def _defaults(self, *excludes):
123 """return dict of our default attributes, excluding names given."""
124 d = dict(balanced=self._balanced, targets=self._targets)
125 for name in self._default_names:
126 if name not in excludes:
127 d[name] = getattr(self, name)
128 return d
129
130 def set_flags(self, **kwargs):
139 def set_flags(self, **kwargs):
131 """set my attribute flags by keyword.
140 """set my attribute flags by keyword.
132
141
133 A View is a wrapper for the Client's apply method, but
142 Views determine behavior with a few attributes (`block`, `track`, etc.).
134 with attributes that specify keyword arguments, those attributes
143 These attributes can be set all at once by name with this method.
135 can be set by keyword argument with this method.
136
144
137 Parameters
145 Parameters
138 ----------
146 ----------
139
147
140 block : bool
148 block : bool
141 whether to wait for results
149 whether to wait for results
142 bound : bool
143 whether to pass the client's Namespace as the first argument
144 to functions called via `apply`.
145 track : bool
150 track : bool
146 whether to create a MessageTracker to allow the user to
151 whether to create a MessageTracker to allow the user to
147 safely edit after arrays and buffers during non-copying
152 safely edit after arrays and buffers during non-copying
148 sends.
153 sends.
149 """
154 """
150 for key in kwargs:
155 for name, value in kwargs.iteritems():
151 if key not in self._default_names:
156 if name not in self._flag_names:
152 raise KeyError("Invalid name: %r"%key)
157 raise KeyError("Invalid name: %r"%name)
153 for name in ('block', 'bound'):
158 else:
154 if name in kwargs:
159 setattr(self, name, value)
155 setattr(self, name, kwargs[name])
156
160
161 @contextmanager
162 def temp_flags(self, **kwargs):
163 """temporarily set flags, for use in `with` statements.
164
165 See set_flags for permanent setting of flags
166
167 Examples
168 --------
169
170 >>> view.track=False
171 ...
172 >>> with view.temp_flags(track=True):
173 ... ar = view.apply(dostuff, my_big_array)
174 ... ar.tracker.wait() # wait for send to finish
175 >>> view.track
176 False
177
178 """
179 # preflight: save flags, and set temporaries
180 saved_flags = {}
181 for f in self._flag_names:
182 saved_flags[f] = getattr(self, f)
183 self.set_flags(**kwargs)
184 # yield to the with-statement block
185 yield
186 # postflight: restore saved flags
187 self.set_flags(**saved_flags)
188
189
157 #----------------------------------------------------------------
190 #----------------------------------------------------------------
158 # wrappers for client methods:
191 # apply
159 #----------------------------------------------------------------
192 #----------------------------------------------------------------
160 @sync_results
161 def spin(self):
162 """spin the client, and sync"""
163 self.client.spin()
164
193
165 @sync_results
194 @sync_results
166 @save_ids
195 @save_ids
196 def _really_apply(self, f, args, kwargs, block=None, **options):
197 """wrapper for client.send_apply_message"""
198 raise NotImplementedError("Implement in subclasses")
199
167 def apply(self, f, *args, **kwargs):
200 def apply(self, f, *args, **kwargs):
168 """calls f(*args, **kwargs) on remote engines, returning the result.
201 """calls f(*args, **kwargs) on remote engines, returning the result.
169
202
170 This method sets all of `client.apply`'s keyword arguments via this
203 This method sets all apply flags via this View's attributes.
171 View's attributes.
172
204
173 if self.block is False:
205 if self.block is False:
174 returns AsyncResult
206 returns AsyncResult
175 else:
207 else:
176 returns actual result of f(*args, **kwargs)
208 returns actual result of f(*args, **kwargs)
177 """
209 """
178 return self.client.apply(f, args, kwargs, **self._defaults())
210 return self._really_apply(f, args, kwargs)
179
211
180 @save_ids
181 def apply_async(self, f, *args, **kwargs):
212 def apply_async(self, f, *args, **kwargs):
182 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
213 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
183
214
184 returns AsyncResult
215 returns AsyncResult
185 """
216 """
186 d = self._defaults('block', 'bound')
217 return self._really_apply(f, args, kwargs, block=False)
187 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
188
218
189 @spin_after
219 @spin_after
190 @save_ids
191 def apply_sync(self, f, *args, **kwargs):
220 def apply_sync(self, f, *args, **kwargs):
192 """calls f(*args, **kwargs) on remote engines in a blocking manner,
221 """calls f(*args, **kwargs) on remote engines in a blocking manner,
193 returning the result.
222 returning the result.
194
223
195 returns: actual result of f(*args, **kwargs)
224 returns: actual result of f(*args, **kwargs)
196 """
225 """
197 d = self._defaults('block', 'bound', 'track')
226 return self._really_apply(f, args, kwargs, block=True)
198 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
199
227
200 # @sync_results
228 #----------------------------------------------------------------
201 # @save_ids
229 # wrappers for client and control methods
202 # def apply_bound(self, f, *args, **kwargs):
230 #----------------------------------------------------------------
203 # """calls f(*args, **kwargs) bound to engine namespace(s).
204 #
205 # if self.block is False:
206 # returns msg_id
207 # else:
208 # returns actual result of f(*args, **kwargs)
209 #
210 # This method has access to the targets' namespace via globals()
211 #
212 # """
213 # d = self._defaults('bound')
214 # return self.client.apply(f, args, kwargs, bound=True, **d)
215 #
216 @sync_results
231 @sync_results
217 @save_ids
232 def spin(self):
218 def apply_async_bound(self, f, *args, **kwargs):
233 """spin the client, and sync"""
219 """calls f(*args, **kwargs) bound to engine namespace(s)
234 self.client.spin()
220 in a nonblocking manner.
235
221
236 @sync_results
222 The first argument to `f` will be the Engine's Namespace
237 def wait(self, jobs=None, timeout=-1):
223
238 """waits on one or more `jobs`, for up to `timeout` seconds.
224 returns: AsyncResult
225
239
226 """
240 Parameters
227 d = self._defaults('block', 'bound')
241 ----------
228 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
229
230 @spin_after
231 @save_ids
232 def apply_sync_bound(self, f, *args, **kwargs):
233 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
234
242
235 The first argument to `f` will be the Engine's Namespace
243 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
244 ints are indices to self.history
245 strs are msg_ids
246 default: wait on all outstanding messages
247 timeout : float
248 a time in seconds, after which to give up.
249 default is -1, which means no timeout
236
250
237 returns: actual result of f(*args, **kwargs)
251 Returns
252 -------
238
253
254 True : when all msg_ids are done
255 False : timeout reached, some msg_ids still outstanding
239 """
256 """
240 d = self._defaults('block', 'bound')
257 if jobs is None:
241 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
258 jobs = self.history
259 return self.client.wait(jobs, timeout)
242
260
243 def abort(self, jobs=None, block=None):
261 def abort(self, jobs=None, block=None):
244 """Abort jobs on my engines.
262 """Abort jobs on my engines.
@@ -318,6 +336,7 b' class View(HasTraits):'
318 """Parallel version of `itertools.imap`.
336 """Parallel version of `itertools.imap`.
319
337
320 See `self.map` for details.
338 See `self.map` for details.
339
321 """
340 """
322
341
323 return iter(self.map_async(f,*sequences, **kwargs))
342 return iter(self.map_async(f,*sequences, **kwargs))
@@ -326,14 +345,15 b' class View(HasTraits):'
326 # Decorators
345 # Decorators
327 #-------------------------------------------------------------------
346 #-------------------------------------------------------------------
328
347
329 def remote(self, bound=False, block=True):
348 def remote(self, block=True, **flags):
330 """Decorator for making a RemoteFunction"""
349 """Decorator for making a RemoteFunction"""
331 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
350 block = self.block if block is None else block
351 return remote(self, block=block, **flags)
332
352
333 def parallel(self, dist='b', bound=False, block=None):
353 def parallel(self, dist='b', block=None, **flags):
334 """Decorator for making a ParallelFunction"""
354 """Decorator for making a ParallelFunction"""
335 block = self.block if block is None else block
355 block = self.block if block is None else block
336 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
356 return parallel(self, dist=dist, block=block, **flags)
337
357
338 @testdec.skip_doctest
358 @testdec.skip_doctest
339 class DirectView(View):
359 class DirectView(View):
@@ -355,14 +375,65 b' class DirectView(View):'
355
375
356 """
376 """
357
377
358 def __init__(self, client=None, targets=None):
378 def __init__(self, client=None, socket=None, targets=None):
359 super(DirectView, self).__init__(client=client, targets=targets)
379 super(DirectView, self).__init__(client=client, socket=socket, targets=targets)
360 self._balanced = False
380
361
381
362 @spin_after
382 @sync_results
363 @save_ids
383 @save_ids
384 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None):
385 """calls f(*args, **kwargs) on remote engines, returning the result.
386
387 This method sets all of `apply`'s flags via this View's attributes.
388
389 Parameters
390 ----------
391
392 f : callable
393
394 args : list [default: empty]
395
396 kwargs : dict [default: empty]
397
398 block : bool [default: self.block]
399 whether to block
400 track : bool [default: self.track]
401 whether to ask zmq to track the message, for safe non-copying sends
402
403 Returns
404 -------
405
406 if self.block is False:
407 returns AsyncResult
408 else:
409 returns actual result of f(*args, **kwargs) on the engine(s)
410 This will be a list of self.targets is also a list (even length 1), or
411 the single result if self.targets is an integer engine id
412 """
413 args = [] if args is None else args
414 kwargs = {} if kwargs is None else kwargs
415 block = self.block if block is None else block
416 track = self.track if track is None else track
417 msg_ids = []
418 trackers = []
419 for ident in self._idents:
420 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
421 ident=ident)
422 if track:
423 trackers.append(msg['tracker'])
424 msg_ids.append(msg['msg_id'])
425 tracker = None if track is False else zmq.MessageTracker(*trackers)
426 ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=self._targets, tracker=tracker)
427 if block:
428 try:
429 return ar.get()
430 except KeyboardInterrupt:
431 pass
432 return ar
433
434 @spin_after
364 def map(self, f, *sequences, **kwargs):
435 def map(self, f, *sequences, **kwargs):
365 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
436 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
366
437
367 Parallel version of builtin `map`, using this View's `targets`.
438 Parallel version of builtin `map`, using this View's `targets`.
368
439
@@ -380,8 +451,6 b' class DirectView(View):'
380 the sequences to be distributed and passed to `f`
451 the sequences to be distributed and passed to `f`
381 block : bool
452 block : bool
382 whether to wait for the result or not [default self.block]
453 whether to wait for the result or not [default self.block]
383 bound : bool
384 whether to pass the client's Namespace as the first argument to `f`
385
454
386 Returns
455 Returns
387 -------
456 -------
@@ -396,70 +465,140 b' class DirectView(View):'
396 the result of map(f,*sequences)
465 the result of map(f,*sequences)
397 """
466 """
398
467
399 block = kwargs.get('block', self.block)
468 block = kwargs.pop('block', self.block)
400 bound = kwargs.get('bound', self.bound)
401 for k in kwargs.keys():
469 for k in kwargs.keys():
402 if k not in ['block', 'bound']:
470 if k not in ['block', 'track']:
403 raise TypeError("invalid keyword arg, %r"%k)
471 raise TypeError("invalid keyword arg, %r"%k)
404
472
405 assert len(sequences) > 0, "must have some sequences to map onto!"
473 assert len(sequences) > 0, "must have some sequences to map onto!"
406 pf = ParallelFunction(self.client, f, block=block, bound=bound,
474 pf = ParallelFunction(self, f, block=block, **kwargs)
407 targets=self._targets, balanced=False)
408 return pf.map(*sequences)
475 return pf.map(*sequences)
409
476
410 @sync_results
411 @save_ids
412 def execute(self, code, block=None):
477 def execute(self, code, block=None):
413 """execute some code on my targets."""
478 """Executes `code` on `targets` in blocking or nonblocking manner.
414
479
415 block = block if block is not None else self.block
480 ``execute`` is always `bound` (affects engine namespace)
416
481
417 return self.client.execute(code, block=block, targets=self._targets)
482 Parameters
483 ----------
484
485 code : str
486 the code string to be executed
487 block : bool
488 whether or not to wait until done to return
489 default: self.block
490 """
491 return self._really_apply(util._execute, args=(code,), block=block)
418
492
419 @sync_results
493 def run(self, filename, block=None):
420 @save_ids
494 """Execute contents of `filename` on my engine(s).
421 def run(self, fname, block=None):
422 """execute the code in a file on my targets."""
423
495
424 block = block if block is not None else self.block
496 This simply reads the contents of the file and calls `execute`.
425
497
426 return self.client.run(fname, block=block, targets=self._targets)
498 Parameters
499 ----------
500
501 filename : str
502 The path to the file
503 targets : int/str/list of ints/strs
504 the engines on which to execute
505 default : all
506 block : bool
507 whether or not to wait until done
508 default: self.block
509
510 """
511 with open(filename, 'r') as f:
512 # add newline in case of trailing indented whitespace
513 # which will cause SyntaxError
514 code = f.read()+'\n'
515 return self.execute(code, block=block)
427
516
428 def update(self, ns):
517 def update(self, ns):
429 """update remote namespace with dict `ns`"""
518 """update remote namespace with dict `ns`
430 return self.client.push(ns, targets=self._targets, block=self.block)
519
520 See `push` for details.
521 """
522 return self.push(ns, block=self.block, track=self.track)
431
523
432 def push(self, ns, block=None):
524 def push(self, ns, block=None, track=None):
433 """update remote namespace with dict `ns`"""
525 """update remote namespace with dict `ns`
434
526
435 block = block if block is not None else self.block
527 Parameters
528 ----------
436
529
437 return self.client.push(ns, targets=self._targets, block=block)
530 ns : dict
531 dict of keys with which to update engine namespace(s)
532 block : bool [default : self.block]
533 whether to wait to be notified of engine receipt
534
535 """
536
537 block = block if block is not None else self.block
538 track = track if track is not None else self.track
539 # applier = self.apply_sync if block else self.apply_async
540 if not isinstance(ns, dict):
541 raise TypeError("Must be a dict, not %s"%type(ns))
542 return self._really_apply(util._push, (ns,),block=block, track=track)
438
543
439 def get(self, key_s):
544 def get(self, key_s):
440 """get object(s) by `key_s` from remote namespace
545 """get object(s) by `key_s` from remote namespace
441 will return one object if it is a key.
546
442 It also takes a list of keys, and will return a list of objects."""
547 see `pull` for details.
548 """
443 # block = block if block is not None else self.block
549 # block = block if block is not None else self.block
444 return self.client.pull(key_s, block=True, targets=self._targets)
550 return self.pull(key_s, block=True)
445
551
446 @sync_results
552 def pull(self, names, block=True):
447 @save_ids
553 """get object(s) by `name` from remote namespace
448 def pull(self, key_s, block=True):
554
449 """get object(s) by `key_s` from remote namespace
450 will return one object if it is a key.
555 will return one object if it is a key.
451 It also takes a list of keys, and will return a list of objects."""
556 can also take a list of keys, in which case it will return a list of objects.
557 """
452 block = block if block is not None else self.block
558 block = block if block is not None else self.block
453 return self.client.pull(key_s, block=block, targets=self._targets)
559 applier = self.apply_sync if block else self.apply_async
560 if isinstance(names, basestring):
561 pass
562 elif isinstance(names, (list,tuple,set)):
563 for key in names:
564 if not isinstance(key, basestring):
565 raise TypeError("keys must be str, not type %r"%type(key))
566 else:
567 raise TypeError("names must be strs, not %r"%names)
568 return applier(util._pull, names)
454
569
455 def scatter(self, key, seq, dist='b', flatten=False, block=None):
570 def scatter(self, key, seq, dist='b', flatten=False, block=None, track=None):
456 """
571 """
457 Partition a Python sequence and send the partitions to a set of engines.
572 Partition a Python sequence and send the partitions to a set of engines.
458 """
573 """
459 block = block if block is not None else self.block
574 block = block if block is not None else self.block
575 track = track if track is not None else self.track
576 targets = self._targets
577 mapObject = Map.dists[dist]()
578 nparts = len(targets)
579 msg_ids = []
580 trackers = []
581 for index, engineid in enumerate(targets):
582 push = self.client[engineid].push
583 partition = mapObject.getPartition(seq, index, nparts)
584 if flatten and len(partition) == 1:
585 r = push({key: partition[0]}, block=False, track=track)
586 else:
587 r = push({key: partition},block=False, track=track)
588 msg_ids.extend(r.msg_ids)
589 if track:
590 trackers.append(r._tracker)
460
591
461 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
592 if track:
462 targets=self._targets, block=block)
593 tracker = zmq.MessageTracker(*trackers)
594 else:
595 tracker = None
596
597 r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker)
598 if block:
599 r.wait()
600 else:
601 return r
463
602
464 @sync_results
603 @sync_results
465 @save_ids
604 @save_ids
@@ -468,8 +607,20 b' class DirectView(View):'
468 Gather a partitioned sequence on a set of engines as a single local seq.
607 Gather a partitioned sequence on a set of engines as a single local seq.
469 """
608 """
470 block = block if block is not None else self.block
609 block = block if block is not None else self.block
610 mapObject = Map.dists[dist]()
611 msg_ids = []
612 for index, engineid in enumerate(self._targets):
613
614 msg_ids.extend(self.client[engineid].pull(key, block=False).msg_ids)
471
615
472 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
616 r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather')
617
618 if block:
619 try:
620 return r.get()
621 except KeyboardInterrupt:
622 pass
623 return r
473
624
474 def __getitem__(self, key):
625 def __getitem__(self, key):
475 return self.get(key)
626 return self.get(key)
@@ -523,22 +674,25 b' class LoadBalancedView(View):'
523
674
524 Load-balanced views can be created with the client's `view` method:
675 Load-balanced views can be created with the client's `view` method:
525
676
526 >>> v = client.view(balanced=True)
677 >>> v = client.load_balanced_view()
527
678
528 or targets can be specified, to restrict the potential destinations:
679 or targets can be specified, to restrict the potential destinations:
529
680
530 >>> v = client.view([1,3],balanced=True)
681 >>> v = client.client.load_balanced_view(([1,3])
531
682
532 which would restrict loadbalancing to between engines 1 and 3.
683 which would restrict loadbalancing to between engines 1 and 3.
533
684
534 """
685 """
535
686
536 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
687 _flag_names = ['block', 'track', 'follow', 'after', 'timeout']
537
688
538 def __init__(self, client=None, targets=None):
689 def __init__(self, client=None, socket=None, targets=None):
539 super(LoadBalancedView, self).__init__(client=client, targets=targets)
690 super(LoadBalancedView, self).__init__(client=client, socket=socket, targets=targets)
540 self._ntargets = 1
691 self._ntargets = 1
541 self._balanced = True
692 self._task_scheme=client._task_scheme
693 if targets is None:
694 self._targets = None
695 self._idents=[]
542
696
543 def _validate_dependency(self, dep):
697 def _validate_dependency(self, dep):
544 """validate a dependency.
698 """validate a dependency.
@@ -549,7 +703,7 b' class LoadBalancedView(View):'
549 return True
703 return True
550 elif isinstance(dep, (list,set, tuple)):
704 elif isinstance(dep, (list,set, tuple)):
551 for d in dep:
705 for d in dep:
552 if not isinstance(d, str, AsyncResult):
706 if not isinstance(d, (str, AsyncResult)):
553 return False
707 return False
554 elif isinstance(dep, dict):
708 elif isinstance(dep, dict):
555 if set(dep.keys()) != set(Dependency().as_dict().keys()):
709 if set(dep.keys()) != set(Dependency().as_dict().keys()):
@@ -561,7 +715,21 b' class LoadBalancedView(View):'
561 return False
715 return False
562 else:
716 else:
563 return False
717 return False
718
719 return True
564
720
721 def _render_dependency(self, dep):
722 """helper for building jsonable dependencies from various input forms."""
723 if isinstance(dep, Dependency):
724 return dep.as_dict()
725 elif isinstance(dep, AsyncResult):
726 return dep.msg_ids
727 elif dep is None:
728 return []
729 else:
730 # pass to Dependency constructor
731 return list(Dependency(dep))
732
565 def set_flags(self, **kwargs):
733 def set_flags(self, **kwargs):
566 """set my attribute flags by keyword.
734 """set my attribute flags by keyword.
567
735
@@ -574,19 +742,28 b' class LoadBalancedView(View):'
574
742
575 block : bool
743 block : bool
576 whether to wait for results
744 whether to wait for results
577 bound : bool
578 whether to pass the client's Namespace as the first argument
579 to functions called via `apply`.
580 track : bool
745 track : bool
581 whether to create a MessageTracker to allow the user to
746 whether to create a MessageTracker to allow the user to
582 safely edit after arrays and buffers during non-copying
747 safely edit after arrays and buffers during non-copying
583 sends.
748 sends.
584 follow : Dependency, list, msg_id, AsyncResult
749 #
585 the location dependencies of tasks
750 after : Dependency or collection of msg_ids
586 after : Dependency, list, msg_id, AsyncResult
751 Only for load-balanced execution (targets=None)
587 the time dependencies of tasks
752 Specify a list of msg_ids as a time-based dependency.
588 timeout : int,None
753 This job will only be run *after* the dependencies
589 the timeout to be used for tasks
754 have been met.
755
756 follow : Dependency or collection of msg_ids
757 Only for load-balanced execution (targets=None)
758 Specify a list of msg_ids as a location-based dependency.
759 This job will only be run on an engine where this dependency
760 is met.
761
762 timeout : float/int or None
763 Only for load-balanced execution (targets=None)
764 Specify an amount of time (in seconds) for the scheduler to
765 wait for dependencies to be met before failing with a
766 DependencyTimeout.
590 """
767 """
591
768
592 super(LoadBalancedView, self).set_flags(**kwargs)
769 super(LoadBalancedView, self).set_flags(**kwargs)
@@ -599,23 +776,101 b' class LoadBalancedView(View):'
599 raise ValueError("Invalid dependency: %r"%value)
776 raise ValueError("Invalid dependency: %r"%value)
600 if 'timeout' in kwargs:
777 if 'timeout' in kwargs:
601 t = kwargs['timeout']
778 t = kwargs['timeout']
602 if not isinstance(t, (int, long, float, None)):
779 if not isinstance(t, (int, long, float, type(None))):
603 raise TypeError("Invalid type for timeout: %r"%type(t))
780 raise TypeError("Invalid type for timeout: %r"%type(t))
604 if t is not None:
781 if t is not None:
605 if t < 0:
782 if t < 0:
606 raise ValueError("Invalid timeout: %s"%t)
783 raise ValueError("Invalid timeout: %s"%t)
607 self.timeout = t
784 self.timeout = t
608
785
786 @sync_results
787 @save_ids
788 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
789 after=None, follow=None, timeout=None):
790 """calls f(*args, **kwargs) on a remote engine, returning the result.
791
792 This method temporarily sets all of `apply`'s flags for a single call.
793
794 Parameters
795 ----------
796
797 f : callable
798
799 args : list [default: empty]
800
801 kwargs : dict [default: empty]
802
803 block : bool [default: self.block]
804 whether to block
805 track : bool [default: self.track]
806 whether to ask zmq to track the message, for safe non-copying sends
807
808 !!!!!! TODO: THE REST HERE !!!!
809
810 Returns
811 -------
812
813 if self.block is False:
814 returns AsyncResult
815 else:
816 returns actual result of f(*args, **kwargs) on the engine(s)
817 This will be a list of self.targets is also a list (even length 1), or
818 the single result if self.targets is an integer engine id
819 """
820
821 # validate whether we can run
822 if self._socket.closed:
823 msg = "Task farming is disabled"
824 if self._task_scheme == 'pure':
825 msg += " because the pure ZMQ scheduler cannot handle"
826 msg += " disappearing engines."
827 raise RuntimeError(msg)
828
829 if self._task_scheme == 'pure':
830 # pure zmq scheme doesn't support dependencies
831 msg = "Pure ZMQ scheduler doesn't support dependencies"
832 if (follow or after):
833 # hard fail on DAG dependencies
834 raise RuntimeError(msg)
835 if isinstance(f, dependent):
836 # soft warn on functional dependencies
837 warnings.warn(msg, RuntimeWarning)
838
839 # build args
840 args = [] if args is None else args
841 kwargs = {} if kwargs is None else kwargs
842 block = self.block if block is None else block
843 track = self.track if track is None else track
844 after = self.after if after is None else after
845 follow = self.follow if follow is None else follow
846 timeout = self.timeout if timeout is None else timeout
847 after = self._render_dependency(after)
848 follow = self._render_dependency(follow)
849 subheader = dict(after=after, follow=follow, timeout=timeout, targets=self._idents)
850
851 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
852 subheader=subheader)
853 tracker = None if track is False else msg['tracker']
854
855 ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
856
857 if block:
858 try:
859 return ar.get()
860 except KeyboardInterrupt:
861 pass
862 return ar
863
609 @spin_after
864 @spin_after
610 @save_ids
865 @save_ids
611 def map(self, f, *sequences, **kwargs):
866 def map(self, f, *sequences, **kwargs):
612 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
867 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
613
868
614 Parallel version of builtin `map`, load-balanced by this View.
869 Parallel version of builtin `map`, load-balanced by this View.
615
870
616 `block`, `bound`, and `chunk_size` can be specified by keyword only.
871 `block`, and `chunksize` can be specified by keyword only.
617
872
618 Each `chunk_size` elements will be a separate task, and will be
873 Each `chunksize` elements will be a separate task, and will be
619 load-balanced. This lets individual elements be available for iteration
874 load-balanced. This lets individual elements be available for iteration
620 as soon as they arrive.
875 as soon as they arrive.
621
876
@@ -628,13 +883,11 b' class LoadBalancedView(View):'
628 the sequences to be distributed and passed to `f`
883 the sequences to be distributed and passed to `f`
629 block : bool
884 block : bool
630 whether to wait for the result or not [default self.block]
885 whether to wait for the result or not [default self.block]
631 bound : bool
632 whether to pass the client's Namespace as the first argument to `f`
633 track : bool
886 track : bool
634 whether to create a MessageTracker to allow the user to
887 whether to create a MessageTracker to allow the user to
635 safely edit after arrays and buffers during non-copying
888 safely edit after arrays and buffers during non-copying
636 sends.
889 sends.
637 chunk_size : int
890 chunksize : int
638 how many elements should be in each task [default 1]
891 how many elements should be in each task [default 1]
639
892
640 Returns
893 Returns
@@ -652,19 +905,16 b' class LoadBalancedView(View):'
652
905
653 # default
906 # default
654 block = kwargs.get('block', self.block)
907 block = kwargs.get('block', self.block)
655 bound = kwargs.get('bound', self.bound)
908 chunksize = kwargs.get('chunksize', 1)
656 chunk_size = kwargs.get('chunk_size', 1)
657
909
658 keyset = set(kwargs.keys())
910 keyset = set(kwargs.keys())
659 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
911 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
660 if extra_keys:
912 if extra_keys:
661 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
913 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
662
914
663 assert len(sequences) > 0, "must have some sequences to map onto!"
915 assert len(sequences) > 0, "must have some sequences to map onto!"
664
916
665 pf = ParallelFunction(self.client, f, block=block, bound=bound,
917 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
666 targets=self._targets, balanced=True,
667 chunk_size=chunk_size)
668 return pf.map(*sequences)
918 return pf.map(*sequences)
669
919
670 __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file
920 __all__ = ['LoadBalancedView', 'DirectView']
@@ -53,12 +53,12 b' def make_bintree(levels):'
53 add_children(G, root, levels, 2)
53 add_children(G, root, levels, 2)
54 return G
54 return G
55
55
56 def submit_jobs(client, G, jobs):
56 def submit_jobs(view, G, jobs):
57 """Submit jobs via client where G describes the time dependencies."""
57 """Submit jobs via client where G describes the time dependencies."""
58 results = {}
58 results = {}
59 for node in nx.topological_sort(G):
59 for node in nx.topological_sort(G):
60 deps = [ results[n] for n in G.predecessors(node) ]
60 with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]):
61 results[node] = client.apply(jobs[node], after=deps)
61 results[node] = view.apply(jobs[node])
62 return results
62 return results
63
63
64 def validate_tree(G, results):
64 def validate_tree(G, results):
@@ -76,7 +76,7 b' def main(nodes, edges):'
76 in-degree on the y (just for spread). All arrows must
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 import pylab
79 from matplotlib import pyplot as plt
80 from matplotlib.dates import date2num
80 from matplotlib.dates import date2num
81 from matplotlib.cm import gist_rainbow
81 from matplotlib.cm import gist_rainbow
82 print "building DAG"
82 print "building DAG"
@@ -88,10 +88,11 b' def main(nodes, edges):'
88 jobs[node] = randomwait
88 jobs[node] = randomwait
89
89
90 client = cmod.Client()
90 client = cmod.Client()
91 view = client.load_balanced_view()
91 print "submitting %i tasks with %i dependencies"%(nodes,edges)
92 print "submitting %i tasks with %i dependencies"%(nodes,edges)
92 results = submit_jobs(client, G, jobs)
93 results = submit_jobs(view, G, jobs)
93 print "waiting for results"
94 print "waiting for results"
94 client.barrier()
95 view.wait()
95 print "done"
96 print "done"
96 for node in G:
97 for node in G:
97 md = results[node].metadata
98 md = results[node].metadata
@@ -107,13 +108,13 b' def main(nodes, edges):'
107 xmax,ymax = map(max, (x,y))
108 xmax,ymax = map(max, (x,y))
108 xscale = xmax-xmin
109 xscale = xmax-xmin
109 yscale = ymax-ymin
110 yscale = ymax-ymin
110 pylab.xlim(xmin-xscale*.1,xmax+xscale*.1)
111 plt.xlim(xmin-xscale*.1,xmax+xscale*.1)
111 pylab.ylim(ymin-yscale*.1,ymax+yscale*.1)
112 plt.ylim(ymin-yscale*.1,ymax+yscale*.1)
112 return G,results
113 return G,results
113
114
114 if __name__ == '__main__':
115 if __name__ == '__main__':
115 import pylab
116 from matplotlib import pyplot as plt
116 # main(5,10)
117 # main(5,10)
117 main(32,96)
118 main(32,96)
118 pylab.show()
119 plt.show()
119 No newline at end of file
120
@@ -31,7 +31,7 b' def getpid2():'
31 import os
31 import os
32 return os.getpid()
32 return os.getpid()
33
33
34 view = client[None]
34 view = client.load_balanced_view()
35 view.block=True
35 view.block=True
36
36
37 # will run on anything:
37 # will run on anything:
@@ -58,29 +58,41 b' successes = [ view.apply_async(wait, 1).msg_ids[0] for i in range(len(client.ids'
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
58 failures = [ view.apply_async(wait_and_fail, 1).msg_ids[0] for i in range(len(client.ids)) ]
59
59
60 mixed = [failures[0],successes[0]]
60 mixed = [failures[0],successes[0]]
61 d1a = Dependency(mixed, mode='any', success_only=False) # yes
61 d1a = Dependency(mixed, all=False, failure=True) # yes
62 d1b = Dependency(mixed, mode='any', success_only=True) # yes
62 d1b = Dependency(mixed, all=False) # yes
63 d2a = Dependency(mixed, mode='all', success_only=False) # yes after / no follow
63 d2a = Dependency(mixed, all=True, failure=True) # yes after / no follow
64 d2b = Dependency(mixed, mode='all', success_only=True) # no
64 d2b = Dependency(mixed, all=True) # no
65 d3 = Dependency(failures, mode='any', success_only=True) # no
65 d3 = Dependency(failures, all=False) # no
66 d4 = Dependency(failures, mode='any', success_only=False) # yes
66 d4 = Dependency(failures, all=False, failure=True) # yes
67 d5 = Dependency(failures, mode='all', success_only=False) # yes after / no follow
67 d5 = Dependency(failures, all=True, failure=True) # yes after / no follow
68 d6 = Dependency(successes, mode='all', success_only=False) # yes after / no follow
68 d6 = Dependency(successes, all=True, failure=True) # yes after / no follow
69
69
70 client.block = False
70 view.block = False
71
71 flags = view.temp_flags
72 r1a = client.apply(getpid, after=d1a)
72 with flags(after=d1a):
73 r1b = client.apply(getpid, follow=d1b)
73 r1a = view.apply(getpid)
74 r2a = client.apply(getpid, after=d2b, follow=d2a)
74 with flags(follow=d1b):
75 r2b = client.apply(getpid, after=d2a, follow=d2b)
75 r1b = view.apply(getpid)
76 r3 = client.apply(getpid, after=d3)
76 with flags(after=d2b, follow=d2a):
77 r4a = client.apply(getpid, after=d4)
77 r2a = view.apply(getpid)
78 r4b = client.apply(getpid, follow=d4)
78 with flags(after=d2a, follow=d2b):
79 r4c = client.apply(getpid, after=d3, follow=d4)
79 r2b = view.apply(getpid)
80 r5 = client.apply(getpid, after=d5)
80 with flags(after=d3):
81 r5b = client.apply(getpid, follow=d5, after=d3)
81 r3 = view.apply(getpid)
82 r6 = client.apply(getpid, follow=d6)
82 with flags(after=d4):
83 r6b = client.apply(getpid, after=d6, follow=d2b)
83 r4a = view.apply(getpid)
84 with flags(follow=d4):
85 r4b = view.apply(getpid)
86 with flags(after=d3, follow=d4):
87 r4c = view.apply(getpid)
88 with flags(after=d5):
89 r5 = view.apply(getpid)
90 with flags(follow=d5, after=d3):
91 r5b = view.apply(getpid)
92 with flags(follow=d6):
93 r6 = view.apply(getpid)
94 with flags(after=d6, follow=d2b):
95 r6b = view.apply(getpid)
84
96
85 def should_fail(f):
97 def should_fail(f):
86 try:
98 try:
@@ -1,8 +1,9 b''
1 from IPython.zmq.parallel.client import *
1 from IPython.zmq.parallel.client import *
2
2
3 client = Client()
3 client = Client()
4 view = client[:]
4
5
5 @remote(client, block=True)
6 @view.remote(block=True)
6 def square(a):
7 def square(a):
7 """return square of a number"""
8 """return square of a number"""
8 return a*a
9 return a*a
@@ -21,7 +22,7 b' squares2 = [ r.get() for r in arlist ]'
21
22
22 # now the more convenient @parallel decorator, which has a map method:
23 # now the more convenient @parallel decorator, which has a map method:
23
24
24 @parallel(client, block=False)
25 @view.parallel(block=False)
25 def psquare(a):
26 def psquare(a):
26 """return square of a number"""
27 """return square of a number"""
27 return a*a
28 return a*a
@@ -3,12 +3,12 b' from IPython.zmq.parallel.client import *'
3 client = Client()
3 client = Client()
4
4
5 for id in client.ids:
5 for id in client.ids:
6 client.push(dict(ids=id*id), targets=id)
6 client[id].push(dict(ids=id*id))
7
7
8 rns = client[0]
8 v = client[0]
9 rns['a'] = 5
9 v['a'] = 5
10
10
11 print rns['a']
11 print v['a']
12
12
13 remotes = client[:]
13 remotes = client[:]
14
14
@@ -49,7 +49,7 b' c = client.Client(profile=cluster_profile)'
49
49
50 # A LoadBalancedView is an interface to the engines that provides dynamic load
50 # A LoadBalancedView is an interface to the engines that provides dynamic load
51 # balancing at the expense of not knowing which engine will execute the code.
51 # balancing at the expense of not knowing which engine will execute the code.
52 view = c.view()
52 view = c.load_balanced_view()
53
53
54 # Initialize the common code on the engines. This Python module has the
54 # Initialize the common code on the engines. This Python module has the
55 # price_options function that prices the options.
55 # price_options function that prices the options.
@@ -75,7 +75,7 b' print "Submitted tasks: ", len(async_results)'
75 sys.stdout.flush()
75 sys.stdout.flush()
76
76
77 # Block until all tasks are completed.
77 # Block until all tasks are completed.
78 c.barrier(async_results)
78 c.wait(async_results)
79 t2 = time.time()
79 t2 = time.time()
80 t = t2-t1
80 t = t2-t1
81
81
@@ -27,14 +27,14 b" filestring = 'pi200m.ascii.%(i)02dof20'"
27 files = [filestring % {'i':i} for i in range(1,16)]
27 files = [filestring % {'i':i} for i in range(1,16)]
28
28
29 # Connect to the IPython cluster
29 # Connect to the IPython cluster
30 c = client.Client(profile='edison')
30 c = client.Client()
31 c.run('pidigits.py')
31 c[:].run('pidigits.py')
32
32
33 # the number of engines
33 # the number of engines
34 n = len(c)
34 n = len(c)
35 id0 = c.ids[0]
35 id0 = c.ids[0]
36 v = c[:]
36 v = c[:]
37 v.set_flags(bound=True,block=True)
37 v.block=True
38 # fetch the pi-files
38 # fetch the pi-files
39 print "downloading %i files of pi"%n
39 print "downloading %i files of pi"%n
40 v.map(fetch_pi_file, files[:n])
40 v.map(fetch_pi_file, files[:n])
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt'
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.zmq.parallel.client import Client, Reference
31 from IPython.zmq.parallel.client import Client, Reference
32
32
33 def setup_partitioner(ns, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.prepare_communication()
38 p.prepare_communication()
38 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
39 ns.partitioner=p
40 partitioner=p
40
41
41 def setup_solver(ns, *args, **kwargs):
42 def setup_solver(*args, **kwargs):
42 """create a WaveSolver in the engine namespace"""
43 """create a WaveSolver in the engine namespace"""
43 ns.solver = WaveSolver(*args, **kwargs)
44 global solver
45 solver = WaveSolver(*args, **kwargs)
44
46
45 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
46 """save the wave log"""
48 """save the wave log"""
@@ -146,11 +148,11 b" if __name__ == '__main__':"
146 # setup remote partitioner
148 # setup remote partitioner
147 # note that Reference means that the argument passed to setup_partitioner will be the
149 # note that Reference means that the argument passed to setup_partitioner will be the
148 # object named 'my_id' in the engine's namespace
150 # object named 'my_id' in the engine's namespace
149 view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
150 # wait for initial communication to complete
152 # wait for initial communication to complete
151 view.execute('mpi.barrier()')
153 view.execute('mpi.barrier()')
152 # setup remote solvers
154 # setup remote solvers
153 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
154
156
155 # lambda for calling solver.solve:
157 # lambda for calling solver.solve:
156 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -172,7 +174,7 b" if __name__ == '__main__':"
172
174
173 impl['inner'] = 'vectorized'
175 impl['inner'] = 'vectorized'
174 # setup new solvers
176 # setup new solvers
175 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
176 view.execute('mpi.barrier()')
178 view.execute('mpi.barrier()')
177
179
178 # run again with numpy vectorized inner-implementation
180 # run again with numpy vectorized inner-implementation
@@ -30,17 +30,19 b' from numpy import exp, zeros, newaxis, sqrt'
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.zmq.parallel.client import Client, Reference
31 from IPython.zmq.parallel.client import Client, Reference
32
32
33 def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.prepare_communication()
38 p.prepare_communication()
38 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
39 ns.partitioner=p
40 partitioner=p
40
41
41 def setup_solver(ns, *args, **kwargs):
42 def setup_solver(*args, **kwargs):
42 """create a WaveSolver in the engine namespace."""
43 """create a WaveSolver in the engine namespace."""
43 ns.solver = WaveSolver(*args, **kwargs)
44 global solver
45 solver = WaveSolver(*args, **kwargs)
44
46
45 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
46 """save the wave state for each timestep."""
48 """save the wave state for each timestep."""
@@ -156,7 +158,7 b" if __name__ == '__main__':"
156 # setup remote partitioner
158 # setup remote partitioner
157 # note that Reference means that the argument passed to setup_partitioner will be the
159 # note that Reference means that the argument passed to setup_partitioner will be the
158 # object named 'com' in the engine's namespace
160 # object named 'com' in the engine's namespace
159 view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
160 time.sleep(1)
162 time.sleep(1)
161 # convenience lambda to call solver.solve:
163 # convenience lambda to call solver.solve:
162 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -164,7 +166,7 b" if __name__ == '__main__':"
164 if ns.scalar:
166 if ns.scalar:
165 impl['inner'] = 'scalar'
167 impl['inner'] = 'scalar'
166 # setup remote solvers
168 # setup remote solvers
167 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
168
170
169 # run first with element-wise Python operations for each cell
171 # run first with element-wise Python operations for each cell
170 t0 = time.time()
172 t0 = time.time()
@@ -182,7 +184,7 b" if __name__ == '__main__':"
182 # run again with faster numpy-vectorized inner implementation:
184 # run again with faster numpy-vectorized inner implementation:
183 impl['inner'] = 'vectorized'
185 impl['inner'] = 'vectorized'
184 # setup remote solvers
186 # setup remote solvers
185 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
186
188
187 t0 = time.time()
189 t0 = time.time()
188
190
@@ -19,7 +19,7 b' Contents'
19 whatsnew/index.txt
19 whatsnew/index.txt
20 install/index.txt
20 install/index.txt
21 interactive/index.txt
21 interactive/index.txt
22 parallel/index.txt
22 .. parallel/index.txt
23 parallelz/index.txt
23 parallelz/index.txt
24 config/index.txt
24 config/index.txt
25 development/index.txt
25 development/index.txt
@@ -45,7 +45,7 b' A possible sequence of events for this workflow:'
45
45
46
46
47 Further, taking failures into account, assuming all dependencies are run with the default
47 Further, taking failures into account, assuming all dependencies are run with the default
48 `success_only=True`, the following cases would occur for each node's failure:
48 `success=True,failure=False`, the following cases would occur for each node's failure:
49
49
50 0. fails: all other tasks fail as Impossible
50 0. fails: all other tasks fail as Impossible
51 1. 2 can still succeed, but 3,4 are unreachable
51 1. 2 can still succeed, but 3,4 are unreachable
@@ -111,7 +111,8 b' on which it depends:'
111
111
112 .. sourcecode:: ipython
112 .. sourcecode:: ipython
113
113
114 In [5]: c = client.Client()
114 In [5]: rc = client.Client()
115 In [5]: view = rc.load_balanced_view()
115
116
116 In [6]: results = {}
117 In [6]: results = {}
117
118
@@ -120,13 +121,13 b' on which it depends:'
120 ...: # leading into this one as dependencies
121 ...: # leading into this one as dependencies
121 ...: deps = [ results[n] for n in G.predecessors(node) ]
122 ...: deps = [ results[n] for n in G.predecessors(node) ]
122 ...: # submit and store AsyncResult object
123 ...: # submit and store AsyncResult object
123 ...: results[node] = client.apply(jobs[node], after=deps, block=False)
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
124
125
125 Now that we have submitted all the jobs, we can wait for the results:
126 Now that we have submitted all the jobs, we can wait for the results:
126
127
127 .. sourcecode:: ipython
128 .. sourcecode:: ipython
128
129
129 In [8]: [ r.get() for r in results.values() ]
130 In [8]: view.wait(results.values())
130
131
131 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
132 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
132 raised an error if a task failed). But we don't know that the ordering was properly
133 raised an error if a task failed). But we don't know that the ordering was properly
@@ -17,5 +17,6 b' Using IPython for parallel computing (ZMQ)'
17 parallel_demos.txt
17 parallel_demos.txt
18 dag_dependencies.txt
18 dag_dependencies.txt
19 parallel_details.txt
19 parallel_details.txt
20 parallel_transition.txt
20
21
21
22
@@ -135,7 +135,7 b' calculation can also be run by simply typing the commands from'
135 # We simply pass Client the name of the cluster profile we
135 # We simply pass Client the name of the cluster profile we
136 # are using.
136 # are using.
137 In [2]: c = client.Client(profile='mycluster')
137 In [2]: c = client.Client(profile='mycluster')
138 In [3]: view = c.view(balanced=True)
138 In [3]: view = c.load_balanced_view()
139
139
140 In [3]: c.ids
140 In [3]: c.ids
141 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
141 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
@@ -166,11 +166,11 b' calculation can also be run by simply typing the commands from'
166 'pi200m.ascii.15of20']
166 'pi200m.ascii.15of20']
167
167
168 # download the data files if they don't already exist:
168 # download the data files if they don't already exist:
169 In [8]: c.map(fetch_pi_file, files)
169 In [8]: v.map(fetch_pi_file, files)
170
170
171 # This is the parallel calculation using the Client.map method
171 # This is the parallel calculation using the Client.map method
172 # which applies compute_two_digit_freqs to each file in files in parallel.
172 # which applies compute_two_digit_freqs to each file in files in parallel.
173 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
173 In [9]: freqs_all = v.map(compute_two_digit_freqs, files)
174
174
175 # Add up the frequencies from each engine.
175 # Add up the frequencies from each engine.
176 In [10]: freqs = reduce_freqs(freqs_all)
176 In [10]: freqs = reduce_freqs(freqs_all)
@@ -18,13 +18,14 b' Non-copying sends and numpy arrays'
18 ----------------------------------
18 ----------------------------------
19
19
20 When numpy arrays are passed as arguments to apply or via data-movement methods, they are not
20 When numpy arrays are passed as arguments to apply or via data-movement methods, they are not
21 copied. This means that you must be careful if you are sending an array that you intend to work on.
21 copied. This means that you must be careful if you are sending an array that you intend to work
22 PyZMQ does allow you to track when a message has been sent so you can know when it is safe to edit the buffer, but
22 on. PyZMQ does allow you to track when a message has been sent so you can know when it is safe
23 IPython only allows for this.
23 to edit the buffer, but IPython only allows for this.
24
24
25 It is also important to note that the non-copying receive of a message is *read-only*. That
25 It is also important to note that the non-copying receive of a message is *read-only*. That
26 means that if you intend to work in-place on an array that you have sent or received, you must copy
26 means that if you intend to work in-place on an array that you have sent or received, you must
27 it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as results.
27 copy it. This is true for both numpy arrays sent to engines and numpy arrays retrieved as
28 results.
28
29
29 The following will fail:
30 The following will fail:
30
31
@@ -69,6 +70,24 b' The :attr:`ndarray.flags.writeable` flag will tell you if you can write to an ar'
69 In [6]: _.flags.writeable
70 In [6]: _.flags.writeable
70 Out[6]: False
71 Out[6]: False
71
72
73 If you want to safely edit an array in-place after *sending* it, you must use the `track=True` flag. IPython always performs non-copying sends of arrays, which return immediately. You
74 must instruct IPython track those messages *at send time* in order to know for sure that the send has completed. AsyncResults have a :attr:`sent` property, and :meth:`wait_on_send` method
75 for checking and waiting for 0MQ to finish with a buffer.
76
77 .. sourcecode:: ipython
78
79 In [5]: A = numpy.random.random((1024,1024))
80
81 In [6]: view.track=True
82
83 In [7]: ar = view.apply_async(lambda x: 2*x, A)
84
85 In [8]: ar.sent
86 Out[8]: False
87
88 In [9]: ar.wait_on_send() # blocks until sent is True
89
90
72 What is sendable?
91 What is sendable?
73 -----------------
92 -----------------
74
93
@@ -83,6 +102,61 b' buffer without copying - and reconstruct the object on the other side in your ow'
83 possible that the object reconstruction will become extensible, so you can add your own
102 possible that the object reconstruction will become extensible, so you can add your own
84 non-copying types, but this does not yet exist.
103 non-copying types, but this does not yet exist.
85
104
105 Closures
106 ********
107
108 Just about anything in Python is pickleable. The one notable exception is objects (generally
109 functions) with *closures*. Closures can be a complicated topic, but the basic principal is that
110 functions that refer to variables in their parent scope have closures.
111
112 An example of a function that uses a closure:
113
114 .. sourcecode:: python
115
116 def f(a):
117 def inner():
118 # inner will have a closure
119 return a
120 return echo
121
122 f1 = f(1)
123 f2 = f(2)
124 f1() # returns 1
125 f2() # returns 2
126
127 f1 and f2 will have closures referring to the scope in which `inner` was defined, because they
128 use the variable 'a'. As a result, you would not be able to send ``f1`` or ``f2`` with IPython.
129 Note that you *would* be able to send `f`. This is only true for interactively defined
130 functions (as are often used in decorators), and only when there are variables used inside the
131 inner function, that are defined in the outer function. If the names are *not* in the outer
132 function, then there will not be a closure, and the generated function will look in
133 ``globals()`` for the name:
134
135 .. sourcecode:: python
136
137 def g(b):
138 # note that `b` is not referenced in inner's scope
139 def inner():
140 # this inner will *not* have a closure
141 return a
142 return echo
143 g1 = g(1)
144 g2 = g(2)
145 g1() # raises NameError on 'a'
146 a=5
147 g2() # returns 5
148
149 `g1` and `g2` *will* be sendable with IPython, and will treat the engine's namespace as
150 globals(). The :meth:`pull` method is implemented based on this principal. If we did not
151 provide pull, you could implement it yourself with `apply`, by simply returning objects out
152 of the global namespace:
153
154 .. sourcecode:: ipython
155
156 In [10]: view.apply(lambda : a)
157
158 # is equivalent to
159 In [11]: view.pull('a')
86
160
87 Running Code
161 Running Code
88 ============
162 ============
@@ -94,7 +168,9 b' Client method, called `apply`.'
94 Apply
168 Apply
95 -----
169 -----
96
170
97 The principal method of remote execution is :meth:`apply`, of Client and View objects. The Client provides the full execution and communication API for engines via its apply method.
171 The principal method of remote execution is :meth:`apply`, of View objects. The Client provides
172 the full execution and communication API for engines via its low-level
173 :meth:`send_apply_message` method.
98
174
99 f : function
175 f : function
100 The fuction to be called remotely
176 The fuction to be called remotely
@@ -102,8 +178,6 b' args : tuple/list'
102 The positional arguments passed to `f`
178 The positional arguments passed to `f`
103 kwargs : dict
179 kwargs : dict
104 The keyword arguments passed to `f`
180 The keyword arguments passed to `f`
105 bound : bool (default: False)
106 Whether to pass the Engine(s) Namespace as the first argument to `f`.
107 block : bool (default: self.block)
181 block : bool (default: self.block)
108 Whether to wait for the result, or return immediately.
182 Whether to wait for the result, or return immediately.
109 False:
183 False:
@@ -135,8 +209,6 b' balanced : bool, default None'
135 If `balanced` and `targets` are both specified, the task will
209 If `balanced` and `targets` are both specified, the task will
136 be assigne to *one* of the targets by the scheduler.
210 be assigne to *one* of the targets by the scheduler.
137
211
138 The following arguments are only used when balanced is True:
139
140 after : Dependency or collection of msg_ids
212 after : Dependency or collection of msg_ids
141 Only for load-balanced execution (targets=None)
213 Only for load-balanced execution (targets=None)
142 Specify a list of msg_ids as a time-based dependency.
214 Specify a list of msg_ids as a time-based dependency.
@@ -158,11 +230,11 b' timeout : float/int or None'
158 execute and run
230 execute and run
159 ---------------
231 ---------------
160
232
161 For executing strings of Python code, Clients also provide an :meth:`execute` and a :meth:`run`
233 For executing strings of Python code, :class:`DirectView`s also provide an :meth:`execute` and a
162 method, which rather than take functions and arguments, take simple strings. `execute` simply
234 :meth:`run` method, which rather than take functions and arguments, take simple strings.
163 takes a string of Python code to execute, and sends it to the Engine(s). `run` is the same as
235 `execute` simply takes a string of Python code to execute, and sends it to the Engine(s). `run`
164 `execute`, but for a *file*, rather than a string. It is simply a wrapper that does something
236 is the same as `execute`, but for a *file*, rather than a string. It is simply a wrapper that
165 very similar to ``execute(open(f).read())``.
237 does something very similar to ``execute(open(f).read())``.
166
238
167 .. note::
239 .. note::
168
240
@@ -172,44 +244,25 b' Views'
172 =====
244 =====
173
245
174 The principal extension of the :class:`~parallel.client.Client` is the
246 The principal extension of the :class:`~parallel.client.Client` is the
175 :class:`~parallel.view.View` class. The client is a fairly stateless object with respect to
247 :class:`~parallel.view.View` class. The client
176 execution patterns, where you must specify everything about the execution as keywords to each
177 call to :meth:`apply`. For users who want to more conveniently specify various options for
178 several similar calls, we have the :class:`~parallel.view.View` objects. The basic principle of
179 the views is to encapsulate the keyword arguments to :meth:`client.apply` as attributes,
180 allowing users to specify them once and apply to any subsequent calls until the attribute is
181 changed.
182
248
183 Two of apply's keyword arguments are set at the construction of the View, and are immutable for
249 Two of apply's keyword arguments are set at the construction of the View, and are immutable for
184 a given View: `balanced` and `targets`. `balanced` determines whether the View will be a
250 a given View: `balanced` and `targets`. `balanced` determines whether the View will be a
185 :class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets`
251 :class:`.LoadBalancedView` or a :class:`.DirectView`, and `targets` will be the View's `targets`
186 attribute. Attempts to change this will raise errors.
252 attribute. Attempts to change this will raise errors.
187
253
188 Views are cached by targets+balanced combinations, so requesting a view multiple times will always return the *same object*, not create a new one:
254 Views are cached by targets/class, so requesting a view multiple times will always return the
255 *same object*, not create a new one:
189
256
190 .. sourcecode:: ipython
257 .. sourcecode:: ipython
191
258
192 In [3]: v1 = rc.view([1,2,3], balanced=True)
259 In [3]: v1 = rc.load_balanced_view([1,2,3])
193 In [4]: v2 = rc.view([1,2,3], balanced=True)
260 In [4]: v2 = rc.load_balanced_view([1,2,3])
194
261
195 In [5]: v2 is v1
262 In [5]: v2 is v1
196 Out[5]: True
263 Out[5]: True
197
264
198
265
199 A :class:`View` always uses its `targets` attribute, and it will use its `bound`
200 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
201 methods allow overriding `bound` and `block` for a single call.
202
203 ================== ========== ==========
204 method block bound
205 ================== ========== ==========
206 apply self.block self.bound
207 apply_sync True False
208 apply_async False False
209 apply_sync_bound True True
210 apply_async_bound False True
211 ================== ========== ==========
212
213 DirectView
266 DirectView
214 ----------
267 ----------
215
268
@@ -379,24 +432,26 b' interactive session - you must poll the 0MQ sockets for incoming messages. Note'
379 this polling *does not* actually make any network requests. It simply performs a `select`
432 this polling *does not* actually make any network requests. It simply performs a `select`
380 operation, to check if messages are already in local memory, waiting to be handled.
433 operation, to check if messages are already in local memory, waiting to be handled.
381
434
382 The method that handles incoming messages is :meth:`spin`. This method flushes any waiting messages on the various incoming sockets, and updates the state of the Client.
435 The method that handles incoming messages is :meth:`spin`. This method flushes any waiting
436 messages on the various incoming sockets, and updates the state of the Client.
383
437
384 If you need to wait for particular results to finish, you can use the :meth:`barrier` method,
438 If you need to wait for particular results to finish, you can use the :meth:`wait` method,
385 which will call :meth:`spin` until the messages are no longer outstanding. Anything that
439 which will call :meth:`spin` until the messages are no longer outstanding. Anything that
386 represents a collection of messages, such as a list of msg_ids or one or more AsyncResult
440 represents a collection of messages, such as a list of msg_ids or one or more AsyncResult
387 objects, can be passed as argument to barrier. A timeout can be specified, which will prevent
441 objects, can be passed as argument to wait. A timeout can be specified, which will prevent
388 the barrier from blocking for more than a specified time, but the default behavior is to wait
442 the call from blocking for more than a specified time, but the default behavior is to wait
389 forever.
443 forever.
390
444
391
445
392
446
393 The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies.
447 The client also has an `outstanding` attribute - a ``set`` of msg_ids that are awaiting replies.
394 This is the default if barrier is called with no arguments - i.e. barrier on *all* outstanding messages.
448 This is the default if wait is called with no arguments - i.e. wait on *all* outstanding
449 messages.
395
450
396
451
397 .. note::
452 .. note::
398
453
399 TODO barrier example
454 TODO wait example
400
455
401 Map
456 Map
402 ===
457 ===
@@ -89,7 +89,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote'
89 The controller also provides a single point of contact for users who wish to
89 The controller also provides a single point of contact for users who wish to
90 utilize the engines connected to the controller. There are different ways of
90 utilize the engines connected to the controller. There are different ways of
91 working with a controller. In IPython, all of these models are implemented via
91 working with a controller. In IPython, all of these models are implemented via
92 the client's :meth:`.Client.apply` method, with various arguments, or
92 the client's :meth:`.View.apply` method, with various arguments, or
93 constructing :class:`.View` objects to represent subsets of engines. The two
93 constructing :class:`.View` objects to represent subsets of engines. The two
94 primary models for interacting with engines are:
94 primary models for interacting with engines are:
95
95
@@ -124,12 +124,13 b' themselves block when user code is run, the schedulers hide that from the user t'
124 a fully asynchronous interface to a set of engines.
124 a fully asynchronous interface to a set of engines.
125
125
126
126
127 IPython client
127 IPython client and views
128 --------------
128 ------------------------
129
129
130 There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a
130 There is one primary object, the :class:`~.parallel.client.Client`, for connecting to a cluster.
131 controller. For each model, there is a corresponding view. These views allow users to
131 For each execution model, there is a corresponding :class:`~.parallel.view.View`. These views
132 interact with a set of engines through the interface. Here are the two default views:
132 allow users to interact with a set of engines through the interface. Here are the two default
133 views:
133
134
134 * The :class:`DirectView` class for explicit addressing.
135 * The :class:`DirectView` class for explicit addressing.
135 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
136 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
@@ -212,7 +213,7 b' everything is working correctly, try the following commands:'
212 In [4]: c.ids
213 In [4]: c.ids
213 Out[4]: set([0, 1, 2, 3])
214 Out[4]: set([0, 1, 2, 3])
214
215
215 In [5]: c.apply(lambda : "Hello, World", targets='all', block=True)
216 In [5]: c[:].apply_sync(lambda : "Hello, World")
216 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
217 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
217
218
218
219
@@ -234,10 +235,10 b' then you would connect to it with:'
234 In [2]: c = client.Client(sshserver='myhub.example.com')
235 In [2]: c = client.Client(sshserver='myhub.example.com')
235
236
236 Where 'myhub.example.com' is the url or IP address of the machine on
237 Where 'myhub.example.com' is the url or IP address of the machine on
237 which the Hub process is running.
238 which the Hub process is running (or another machine that has direct access to the Hub's ports).
238
239
239 You are now ready to learn more about the :ref:`Direct
240 You are now ready to learn more about the :ref:`Direct
240 <parallelmultiengine>` and :ref:`LoadBalanced <paralleltask>` interfaces to the
241 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
241 controller.
242 controller.
242
243
243 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
244 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
@@ -1,4 +1,4 b''
1 .. _parallelmultiengine:
1 .. _parallel_multiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
@@ -9,7 +9,7 b' IPython engines. The basic idea behind the multiengine interface is that the'
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is thus the best place for
12 and is designed with interactive usage in mind, and is the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
@@ -91,9 +91,7 b" DirectView's :meth:`map` method:"
91
91
92 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
93
94 In [63]: dview.block = True
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95
96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
97
95
98 In [67]: serial_result==parallel_result
96 In [67]: serial_result==parallel_result
99 Out[67]: True
97 Out[67]: True
@@ -103,8 +101,7 b" DirectView's :meth:`map` method:"
103
101
104 The :class:`DirectView`'s version of :meth:`map` does
102 The :class:`DirectView`'s version of :meth:`map` does
105 not do dynamic load balancing. For a load balanced version, use a
103 not do dynamic load balancing. For a load balanced version, use a
106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
104 :class:`LoadBalancedView`.
107 `balanced=True`.
108
105
109 .. seealso::
106 .. seealso::
110
107
@@ -119,7 +116,7 b' two decorators:'
119
116
120 .. sourcecode:: ipython
117 .. sourcecode:: ipython
121
118
122 In [10]: @rc.remote(block=True, targets='all')
119 In [10]: @dview.remote(block=True)
123 ...: def getpid():
120 ...: def getpid():
124 ...: import os
121 ...: import os
125 ...: return os.getpid()
122 ...: return os.getpid()
@@ -128,7 +125,7 b' two decorators:'
128 In [11]: getpid()
125 In [11]: getpid()
129 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
130
127
131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
132 operations and distribute them, reconstructing the result.
129 operations and distribute them, reconstructing the result.
133
130
134 .. sourcecode:: ipython
131 .. sourcecode:: ipython
@@ -137,13 +134,13 b' operations and distribute them, reconstructing the result.'
137
134
138 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
139
136
140 In [14]: @rc.parallel(block=True, targets='all')
137 In [14]: @dview.parallel(block=True)
141 ...: def pmul(A,B):
138 ...: def pmul(A,B):
142 ...: return A*B
139 ...: return A*B
143
140
144 In [15]: C_local = A*A
141 In [15]: C_local = A*A
145
142
146 In [16]: C_remote_partial = pmul(A,A)
143 In [16]: C_remote = pmul(A,A)
147
144
148 In [17]: (C_local == C_remote).all()
145 In [17]: (C_local == C_remote).all()
149 Out[17]: True
146 Out[17]: True
@@ -159,38 +156,36 b' Calling Python functions'
159 The most basic type of operation that can be performed on the engines is to
156 The most basic type of operation that can be performed on the engines is to
160 execute Python code or call Python functions. Executing Python code can be
157 execute Python code or call Python functions. Executing Python code can be
161 done in blocking or non-blocking mode (non-blocking is default) using the
158 done in blocking or non-blocking mode (non-blocking is default) using the
162 :meth:`execute` method, and calling functions can be done via the
159 :meth:`.View.execute` method, and calling functions can be done via the
163 :meth:`.View.apply` method.
160 :meth:`.View.apply` method.
164
161
165 apply
162 apply
166 -----
163 -----
167
164
168 The main method for doing remote execution (in fact, all methods that
165 The main method for doing remote execution (in fact, all methods that
169 communicate with the engines are built on top of it), is :meth:`Client.apply`.
166 communicate with the engines are built on top of it), is :meth:`View.apply`.
170 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
171 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
172 require some more options, they cannot easily provide this interface.
173 Instead, they provide the signature:
174
175 .. sourcecode:: python
176
167
177 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
168 We strive to provide the cleanest interface we can, so `apply` has the following
178 after=None, follow=None, timeout=None)
169 signature:
179
170
180 Where various behavior is controlled via keyword arguments. This means that in the client,
171 .. sourcecode:: python
181 you must pass `args` as a tuple, and `kwargs` as a dict.
182
172
183 In order to provide the nicer interface, we have :class:`View` classes, which wrap
173 view.apply(f, *args, **kwargs)
184 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
185 the extra keyword arguments. This means that the views can have the desired pattern:
186
174
187 .. sourcecode:: python
175 There are various ways to call functions with IPython, and these flags are set as
176 attributes of the View. The ``DirectView`` has just two of these flags:
188
177
189 v.apply(f, *args, **kwargs)
178 dv.block : bool
179 whether to wait for the result, or return an :class:`AsyncResult` object
180 immediately
181 dv.track : bool
182 whether to instruct pyzmq to track when
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 edit in-place. You need to know when it becomes safe to edit the buffer
185 without corrupting the message.
190
186
191
187
192 For instance, performing index-access on a client creates a
188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
193 :class:`.DirectView`.
194
189
195 .. sourcecode:: ipython
190 .. sourcecode:: ipython
196
191
@@ -198,23 +193,9 b' For instance, performing index-access on a client creates a'
198 Out[4]: <DirectView [1, 2]>
193 Out[4]: <DirectView [1, 2]>
199
194
200 In [5]: view.apply<tab>
195 In [5]: view.apply<tab>
201 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
196 view.apply view.apply_async view.apply_sync view.apply_with_flags
202
203 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
204 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
205 methods allow specifying `bound` and `block` via the different methods.
206
197
207 ================== ========== ==========
198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
208 method block bound
209 ================== ========== ==========
210 apply self.block self.bound
211 apply_sync True False
212 apply_async False False
213 apply_sync_bound True True
214 apply_async_bound False True
215 ================== ========== ==========
216
217 For explanation of these values, read on.
218
199
219 Blocking execution
200 Blocking execution
220 ------------------
201 ------------------
@@ -232,63 +213,29 b' blocks until the engines are done executing the command:'
232
213
233 In [5]: dview['b'] = 10
214 In [5]: dview['b'] = 10
234
215
235 In [6]: dview.apply_sync(lambda x: a+b+x, 27)
216 In [6]: dview.apply(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]
217 Out[6]: [42, 42, 42, 42]
237
218
238 Python commands can be executed on specific engines by calling execute using the ``targets``
219 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
220 method:
240 index-access to the client:
241
242 .. sourcecode:: ipython
243
244 In [6]: rc.execute('c=a+b', targets=[0,2])
245
246 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
247
248 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
249 Out[8]: [15, -5, 15, -5]
250
251 .. note::
252
221
253 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
222 In [7]: dview.block=False
254 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
255 where this differs in in :meth:`apply`. The :class:`Client` takes many
256 arguments to apply, so it requires `args` and `kwargs` to be passed as
257 individual arguments. Extended options such as `bound`,`targets`, and
258 `block` are controlled by the attributes of the :class:`View` objects, so
259 they can provide the much more convenient
260 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
261 ``f(*args,**kwargs)`` remotely.
262
223
263 Bound and unbound execution
224 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
264 ---------------------------
225 Out[8]: [42, 42, 42, 42]
265
226
266 The previous example also shows one of the most important things about the IPython
227 Python commands can be executed as strings on specific engines by using a View's ``execute``
267 engines: they have a persistent user namespaces. The :meth:`apply` method can
228 method:
268 be run in either a bound or unbound manner.
269
229
270 When applying a function in a `bound` manner, the first argument to that function
230 .. sourcecode:: ipython
271 will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
272 also providing attribute-access to keys.
273
231
274 In all (unbound and bound) execution
232 In [6]: rc[::2].execute('c=a+b')
275
233
276 .. sourcecode:: ipython
234 In [7]: rc[1::2].execute('c=a-b')
277
235
278 In [9]: dview['b'] = 5 # assign b to 5 everywhere
236 In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True)
279
237 Out[8]: [15, -5, 15, -5]
280 In [10]: v0 = rc[0]
281
282 # multiply b*2 inplace
283 In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
284
285 # b is still available in globals during unbound execution
286 In [13]: v0.apply_sync(lambda a: a*b, 3)
287 Out[13]: 30
288
238
289 `bound=True` specifies that the engine's namespace is to be passed as the first argument when
290 the function is called, and the default `bound=False` specifies that the normal behavior, but
291 the engine's namespace will be available as the globals() when the function is called.
292
239
293 Non-blocking execution
240 Non-blocking execution
294 ----------------------
241 ----------------------
@@ -351,22 +298,24 b' local Python/IPython session:'
351 .. Note::
298 .. Note::
352
299
353 Note the import inside the function. This is a common model, to ensure
300 Note the import inside the function. This is a common model, to ensure
354 that the appropriate modules are imported where the task is run.
301 that the appropriate modules are imported where the task is run. You can
302 also manually import modules into the engine(s) namespace(s) via
303 :meth:`view.execute('import numpy')`.
355
304
356 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
305 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
357 are done. For this, there is a the method :meth:`barrier`. This method takes a
306 are done. For this, there is a the method :meth:`wait`. This method takes a
358 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
307 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
359 and blocks until all of the associated results are ready:
308 and blocks until all of the associated results are ready:
360
309
361 .. sourcecode:: ipython
310 .. sourcecode:: ipython
362
311
363 In [72]: rc.block=False
312 In [72]: dview.block=False
364
313
365 # A trivial list of AsyncResults objects
314 # A trivial list of AsyncResults objects
366 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
315 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
367
316
368 # Wait until all of them are done
317 # Wait until all of them are done
369 In [74]: rc.barrier(pr_list)
318 In [74]: dview.wait(pr_list)
370
319
371 # Then, their results are ready using get() or the `.r` attribute
320 # Then, their results are ready using get() or the `.r` attribute
372 In [75]: pr_list[0].get()
321 In [75]: pr_list[0].get()
@@ -374,12 +323,12 b' and blocks until all of the associated results are ready:'
374
323
375
324
376
325
377 The ``block`` keyword argument and attributes
326 The ``block`` attribute
378 ---------------------------------------------
327 -----------------------
379
328
380 Most client methods(like :meth:`apply`) accept
329 Many View methods(excluding :meth:`apply`) accept
381 ``block`` as a keyword argument. As we have seen above, these
330 ``block`` as a keyword argument. As we have seen above, these
382 keyword arguments control the blocking mode. The :class:`Client` class also has
331 keyword arguments control the blocking mode. The :class:`View` class also has
383 a :attr:`block` attribute that controls the default behavior when the keyword
332 a :attr:`block` attribute that controls the default behavior when the keyword
384 argument is not provided. Thus the following logic is used for :attr:`block`:
333 argument is not provided. Thus the following logic is used for :attr:`block`:
385
334
@@ -387,37 +336,33 b' argument is not provided. Thus the following logic is used for :attr:`block`:'
387 * Keyword argument, if provided override the instance attributes for
336 * Keyword argument, if provided override the instance attributes for
388 the duration of a single call.
337 the duration of a single call.
389
338
390 DirectView objects also have a ``bound`` attribute, which is used in the same way.
391
392 The following examples demonstrate how to use the instance attributes:
339 The following examples demonstrate how to use the instance attributes:
393
340
394 .. sourcecode:: ipython
341 .. sourcecode:: ipython
395
342
396 In [17]: rc.block = False
343 In [17]: dview.block = False
397
344
398 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
345 In [18]: ar = dview.apply(lambda : 10)
399
346
400 In [19]: ar.get()
347 In [19]: ar.get()
401 Out[19]: [10,10]
348 Out[19]: [10, 10, 10, 10]
402
349
403 In [21]: rc.block = True
350 In [21]: dview.block = True
404
351
405 # Note targets='all' means all engines
352 # Note targets='all' means all engines
406 In [22]: rc.apply(lambda : 42, targets='all')
353 In [22]: dview.apply(lambda : 42)
407 Out[22]: [42, 42, 42, 42]
354 Out[22]: [42, 42, 42, 42]
408
355
409 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
356 The :attr:`block` and :attr:`targets` instance attributes of the
410 :class:`.DirectView` also determine the behavior of the parallel magic commands.
357 :class:`.DirectView` also determine the behavior of the parallel magic commands.
411
358
412
413 Parallel magic commands
359 Parallel magic commands
414 -----------------------
360 -----------------------
415
361
416 .. warning::
362 .. warning::
417
363
418 The magics have not been changed to work with the zeromq system. ``%px``
364 The magics have not been changed to work with the zeromq system. The
419 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
365 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
420 not* print stdin/out.
421
366
422 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
423 that make it more pleasant to execute Python commands on the engines
368 that make it more pleasant to execute Python commands on the engines
@@ -428,6 +373,9 b' Python command on the engines specified by the :attr:`targets` attribute of the'
428
373
429 .. sourcecode:: ipython
374 .. sourcecode:: ipython
430
375
376 # load the parallel magic extension:
377 In [21]: %load_ext parallelmagic
378
431 # Create a DirectView for all targets
379 # Create a DirectView for all targets
432 In [22]: dv = rc[:]
380 In [22]: dv = rc[:]
433
381
@@ -512,7 +460,7 b' Moving Python objects around'
512 In addition to calling functions and executing code on engines, you can
460 In addition to calling functions and executing code on engines, you can
513 transfer Python objects to and from your IPython session and the engines. In
461 transfer Python objects to and from your IPython session and the engines. In
514 IPython, these operations are called :meth:`push` (sending an object to the
462 IPython, these operations are called :meth:`push` (sending an object to the
515 engines) and :meth:`pull` (getting an object from the engines).
463 engines) and :meth:`pull` (getting an object from the engines).
516
464
517 Basic push and pull
465 Basic push and pull
518 -------------------
466 -------------------
@@ -521,23 +469,19 b' Here are some examples of how you use :meth:`push` and :meth:`pull`:'
521
469
522 .. sourcecode:: ipython
470 .. sourcecode:: ipython
523
471
524 In [38]: rc.push(dict(a=1.03234,b=3453))
472 In [38]: dview.push(dict(a=1.03234,b=3453))
525 Out[38]: [None,None,None,None]
473 Out[38]: [None,None,None,None]
526
474
527 In [39]: rc.pull('a')
475 In [39]: dview.pull('a')
528 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
476 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
529
477
530 In [40]: rc.pull('b',targets=0)
478 In [40]: rc[0].pull('b')
531 Out[40]: 3453
479 Out[40]: 3453
532
480
533 In [41]: rc.pull(('a','b'))
481 In [41]: dview.pull(('a','b'))
534 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
482 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
535
483
536 # zmq client does not have zip_pull
484 In [43]: dview.push(dict(c='speed'))
537 In [42]: rc.zip_pull(('a','b'))
538 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
539
540 In [43]: rc.push(dict(c='speed'))
541 Out[43]: [None,None,None,None]
485 Out[43]: [None,None,None,None]
542
486
543 In non-blocking mode :meth:`push` and :meth:`pull` also return
487 In non-blocking mode :meth:`push` and :meth:`pull` also return
@@ -545,9 +489,7 b' In non-blocking mode :meth:`push` and :meth:`pull` also return'
545
489
546 .. sourcecode:: ipython
490 .. sourcecode:: ipython
547
491
548 In [47]: rc.block=False
492 In [48]: ar = dview.pull('a', block=False)
549
550 In [48]: ar = rc.pull('a')
551
493
552 In [49]: ar.get()
494 In [49]: ar.get()
553 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
495 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
@@ -563,8 +505,6 b' appear as a local dictionary. Underneath, these methods call :meth:`apply`:'
563
505
564 .. sourcecode:: ipython
506 .. sourcecode:: ipython
565
507
566 In [50]: dview.block=True
567
568 In [51]: dview['a']=['foo','bar']
508 In [51]: dview['a']=['foo','bar']
569
509
570 In [52]: dview['a']
510 In [52]: dview['a']
@@ -606,7 +546,7 b' basic effect using :meth:`scatter` and :meth:`gather`:'
606
546
607 In [66]: dview.scatter('x',range(64))
547 In [66]: dview.scatter('x',range(64))
608
548
609 In [67]: px y = [i**10 for i in x]
549 In [67]: %px y = [i**10 for i in x]
610 Parallel execution on engines: [0, 1, 2, 3]
550 Parallel execution on engines: [0, 1, 2, 3]
611 Out[67]:
551 Out[67]:
612
552
@@ -633,31 +573,47 b' more other types of exceptions. Here is how it works:'
633 In [77]: dview.execute('1/0')
573 In [77]: dview.execute('1/0')
634 ---------------------------------------------------------------------------
574 ---------------------------------------------------------------------------
635 CompositeError Traceback (most recent call last)
575 CompositeError Traceback (most recent call last)
636 /Users/minrk/<ipython-input-10-5d56b303a66c> in <module>()
576 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
637 ----> 1 dview.execute('1/0')
577 ----> 1 dview.execute('1/0', block=True)
638
578
639 ...
579 /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block)
640
580 460 default: self.block
641 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
581 461 """
642 1012 raise ValueError(msg)
582 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
643 1013 else:
583 463
644 -> 1014 return self._apply_direct(f, args, kwargs, **options)
584 464 def run(self, filename, block=None):
645 1015
585
646 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
586 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
647
587
648 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
588 /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs)
649 1100 if block:
589 46 def sync_results(f, self, *args, **kwargs):
650 1101 try:
590 47 """sync relevant results from self.client to our results attribute."""
651 -> 1102 return ar.get()
591 ---> 48 ret = f(self, *args, **kwargs)
652 1103 except KeyboardInterrupt:
592 49 delta = self.outstanding.difference(self.client.outstanding)
653 1104 return ar
593 50 completed = self.outstanding.intersection(delta)
594
595 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
596
597 /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs)
598 35 n_previous = len(self.client.history)
599 36 try:
600 ---> 37 ret = f(self, *args, **kwargs)
601 38 finally:
602 39 nmsgs = len(self.client.history) - n_previous
603
604 /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
605 398 if block:
606 399 try:
607 --> 400 return ar.get()
608 401 except KeyboardInterrupt:
609 402 pass
654
610
655 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
611 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
656 78 return self._result
612 87 return self._result
657 79 else:
613 88 else:
658 ---> 80 raise self._exception
614 ---> 89 raise self._exception
659 81 else:
615 90 else:
660 82 raise error.TimeoutError("Result not ready.")
616 91 raise error.TimeoutError("Result not ready.")
661
617
662 CompositeError: one or more exceptions from call to method: _execute
618 CompositeError: one or more exceptions from call to method: _execute
663 [0:apply]: ZeroDivisionError: integer division or modulo by zero
619 [0:apply]: ZeroDivisionError: integer division or modulo by zero
@@ -665,6 +621,7 b' more other types of exceptions. Here is how it works:'
665 [2:apply]: ZeroDivisionError: integer division or modulo by zero
621 [2:apply]: ZeroDivisionError: integer division or modulo by zero
666 [3:apply]: ZeroDivisionError: integer division or modulo by zero
622 [3:apply]: ZeroDivisionError: integer division or modulo by zero
667
623
624
668 Notice how the error message printed when :exc:`CompositeError` is raised has
625 Notice how the error message printed when :exc:`CompositeError` is raised has
669 information about the individual exceptions that were raised on each engine.
626 information about the individual exceptions that were raised on each engine.
670 If you want, you can even raise one of these original exceptions:
627 If you want, you can even raise one of these original exceptions:
@@ -672,7 +629,7 b' If you want, you can even raise one of these original exceptions:'
672 .. sourcecode:: ipython
629 .. sourcecode:: ipython
673
630
674 In [80]: try:
631 In [80]: try:
675 ....: rc.execute('1/0')
632 ....: dview.execute('1/0')
676 ....: except client.CompositeError, e:
633 ....: except client.CompositeError, e:
677 ....: e.raise_exception()
634 ....: e.raise_exception()
678 ....:
635 ....:
@@ -697,57 +654,50 b' instance:'
697
654
698 .. sourcecode:: ipython
655 .. sourcecode:: ipython
699
656
700 In [81]: rc.execute('1/0')
657 In [81]: dview.execute('1/0')
701 ---------------------------------------------------------------------------
658 ---------------------------------------------------------------------------
702 CompositeError Traceback (most recent call last)
659 CompositeError Traceback (most recent call last)
703 /Users/minrk/<ipython-input-5-b0c7a2b62c52> in <module>()
660 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
704 ----> 1 rc.execute('1/0')
661 ----> 1 dview.execute('1/0', block=True)
705
662
706 /Users/minrk/<string> in execute(self, code, targets, block)
663 /path/to/site-packages/IPython/zmq/parallel/view.py in execute(self, code, block)
707
664 460 default: self.block
708 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
665 461 """
709 88 self.block = block
666 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
710 89 try:
667 463
711 ---> 90 ret = f(self, *args, **kwargs)
668 464 def run(self, filename, block=None):
712 91 finally:
669
713 92 self.block = saveblock
670 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
714
671
715 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in execute(self, code, targets, block)
672 /path/to/site-packages/IPython/zmq/parallel/view.py in sync_results(f, self, *args, **kwargs)
716 855 default: self.block
673 46 def sync_results(f, self, *args, **kwargs):
717 856 """
674 47 """sync relevant results from self.client to our results attribute."""
718 --> 857 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
675 ---> 48 ret = f(self, *args, **kwargs)
719 858 if not block:
676 49 delta = self.outstanding.difference(self.client.outstanding)
720 859 return result
677 50 completed = self.outstanding.intersection(delta)
721
678
722 /Users/minrk/<string> in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
679 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
723
680
724 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in defaultblock(f, self, *args, **kwargs)
681 /path/to/site-packages/IPython/zmq/parallel/view.py in save_ids(f, self, *args, **kwargs)
725 88 self.block = block
682 35 n_previous = len(self.client.history)
726 89 try:
683 36 try:
727 ---> 90 ret = f(self, *args, **kwargs)
684 ---> 37 ret = f(self, *args, **kwargs)
728 91 finally:
685 38 finally:
729 92 self.block = saveblock
686 39 nmsgs = len(self.client.history) - n_previous
730
687
731 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in apply(self, f, args, kwargs, bound, block, targets, balanced, after, follow, timeout)
688 /path/to/site-packages/IPython/zmq/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
732 1012 raise ValueError(msg)
689 398 if block:
733 1013 else:
690 399 try:
734 -> 1014 return self._apply_direct(f, args, kwargs, **options)
691 --> 400 return ar.get()
735 1015
692 401 except KeyboardInterrupt:
736 1016 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
693 402 pass
737
738 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/client.pyc in _apply_direct(self, f, args, kwargs, bound, block, targets)
739 1100 if block:
740 1101 try:
741 -> 1102 return ar.get()
742 1103 except KeyboardInterrupt:
743 1104 return ar
744
694
745 /Users/minrk/dev/ip/mine/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
695 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
746 78 return self._result
696 87 return self._result
747 79 else:
697 88 else:
748 ---> 80 raise self._exception
698 ---> 89 raise self._exception
749 81 else:
699 90 else:
750 82 raise error.TimeoutError("Result not ready.")
700 91 raise error.TimeoutError("Result not ready.")
751
701
752 CompositeError: one or more exceptions from call to method: _execute
702 CompositeError: one or more exceptions from call to method: _execute
753 [0:apply]: ZeroDivisionError: integer division or modulo by zero
703 [0:apply]: ZeroDivisionError: integer division or modulo by zero
@@ -815,14 +765,18 b' instance:'
815 ZeroDivisionError: integer division or modulo by zero
765 ZeroDivisionError: integer division or modulo by zero
816
766
817
767
768 .. note::
769
770 TODO: The above tracebacks are not up to date
771
818
772
819 All of this same error handling magic even works in non-blocking mode:
773 All of this same error handling magic even works in non-blocking mode:
820
774
821 .. sourcecode:: ipython
775 .. sourcecode:: ipython
822
776
823 In [83]: rc.block=False
777 In [83]: dview.block=False
824
778
825 In [84]: ar = rc.execute('1/0')
779 In [84]: ar = dview.execute('1/0')
826
780
827 In [85]: ar.get()
781 In [85]: ar.get()
828 ---------------------------------------------------------------------------
782 ---------------------------------------------------------------------------
@@ -1,4 +1,4 b''
1 .. _paralleltask:
1 .. _parallel_task:
2
2
3 ==========================
3 ==========================
4 The IPython task interface
4 The IPython task interface
@@ -54,11 +54,12 b' argument to the constructor:'
54 # or to connect with a specific profile you have set up:
54 # or to connect with a specific profile you have set up:
55 In [3]: rc = client.Client(profile='mpi')
55 In [3]: rc = client.Client(profile='mpi')
56
56
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can be constructed via the client's :meth:`view` method:
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 be constructed via the client's :meth:`load_balanced_view` method:
58
59
59 .. sourcecode:: ipython
60 .. sourcecode:: ipython
60
61
61 In [4]: lview = rc.view() # default load-balanced view
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
62
63
63 .. seealso::
64 .. seealso::
64
65
@@ -110,6 +111,8 b' that turns any Python function into a parallel function:'
110 In [11]: f.map(range(32)) # this is done in parallel
111 In [11]: f.map(range(32)) # this is done in parallel
111 Out[11]: [0.0,10.0,160.0,...]
112 Out[11]: [0.0,10.0,160.0,...]
112
113
114 .. _parallel_dependencies:
115
113 Dependencies
116 Dependencies
114 ============
117 ============
115
118
@@ -230,12 +233,18 b' any|all'
230 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
231 boolean attribute, which defaults to ``True``.
234 boolean attribute, which defaults to ``True``.
232
235
233 success_only
236 success [default: True]
234 Whether to consider only tasks that did not raise an error as being fulfilled.
237 Whether to consider tasks that succeeded as fulfilling dependencies.
235 Sometimes you want to run a task after another, but only if that task succeeded. In
238
236 this case, ``success_only`` should be ``True``. However sometimes you may not care
239 failure [default : False]
237 whether the task succeeds, and always want the second task to run, in which case
240 Whether to consider tasks that failed as fulfilling dependencies.
238 you should use `success_only=False`. The default behavior is to only use successes.
241 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
242 only when tasks have failed.
243
244 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
245 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
246 not care whether the task succeeds, and always want the second task to run, in which case you
247 should use `success=failure=True`. The default behavior is to only use successes.
239
248
240 There are other switches for interpretation that are made at the *task* level. These are
249 There are other switches for interpretation that are made at the *task* level. These are
241 specified via keyword arguments to the client's :meth:`apply` method.
250 specified via keyword arguments to the client's :meth:`apply` method.
@@ -258,7 +267,7 b' timeout'
258 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
259 task to run after a job submitted via the MUX interface.
268 task to run after a job submitted via the MUX interface.
260
269
261 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
270 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
262 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
263 `follow` and `after` keywords to :meth:`client.apply`:
272 `follow` and `after` keywords to :meth:`client.apply`:
264
273
@@ -266,13 +275,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
266
275
267 In [14]: client.block=False
276 In [14]: client.block=False
268
277
269 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
278 In [15]: ar = lview.apply(f, args, kwargs)
270
279
271 In [16]: ar2 = client.apply(f2, balanced=True)
280 In [16]: ar2 = lview.apply(f2)
272
281
273 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
282 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
274
283
275 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
284 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
276
285
277
286
278 .. seealso::
287 .. seealso::
@@ -297,8 +306,8 b' The basic cases that are checked:'
297
306
298 * depending on nonexistent messages
307 * depending on nonexistent messages
299 * `follow` dependencies were run on more than one machine and `all=True`
308 * `follow` dependencies were run on more than one machine and `all=True`
300 * any dependencies failed and `all=True,success_only=True`
309 * any dependencies failed and `all=True,success=True,failures=False`
301 * all dependencies failed and `all=False,success_only=True`
310 * all dependencies failed and `all=False,success=True,failure=False`
302
311
303 .. warning::
312 .. warning::
304
313
@@ -386,27 +395,25 b' Disabled features when using the ZMQ Scheduler:'
386 More details
395 More details
387 ============
396 ============
388
397
389 The :class:`Client` has many more powerful features that allow quite a bit
398 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
390 of flexibility in how tasks are defined and run. The next places to look are
399 of flexibility in how tasks are defined and run. The next places to look are
391 in the following classes:
400 in the following classes:
392
401
393 * :class:`IPython.zmq.parallel.client.Client`
402 * :class:`IPython.zmq.parallel.view.LoadBalancedView`
394 * :class:`IPython.zmq.parallel.client.AsyncResult`
403 * :class:`IPython.zmq.parallel.client.AsyncResult`
395 * :meth:`IPython.zmq.parallel.client.Client.apply`
404 * :meth:`IPython.zmq.parallel.view.LoadBalancedView.apply`
396 * :mod:`IPython.zmq.parallel.dependency`
405 * :mod:`IPython.zmq.parallel.dependency`
397
406
398 The following is an overview of how to use these classes together:
407 The following is an overview of how to use these classes together:
399
408
400 1. Create a :class:`Client`.
409 1. Create a :class:`Client` and :class:`LoadBalancedView`
401 2. Define some functions to be run as tasks
410 2. Define some functions to be run as tasks
402 3. Submit your tasks to using the :meth:`apply` method of your
411 3. Submit your tasks to using the :meth:`apply` method of your
403 :class:`Client` instance, specifying `balanced=True`. This signals
412 :class:`LoadBalancedView` instance.
404 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
413 4. Use :meth:`Client.get_result` to get the results of the
405 4. Use :meth:`Client.get_results` to get the results of the
406 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
414 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
407 for and then receive the results.
415 for and then receive the results.
408
416
409
410 .. seealso::
417 .. seealso::
411
418
412 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
419 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now