##// END OF EJS Templates
reflect revised apply_bound pattern
MinRK -
Show More
@@ -102,7 +102,7 b' class SerializeIt(object):'
102 self.typeDescriptor = 'ndarray'
102 self.typeDescriptor = 'ndarray'
103 self.metadata = {'shape':self.obj.shape,
103 self.metadata = {'shape':self.obj.shape,
104 'dtype':self.obj.dtype.str}
104 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, bytes):
105 elif isinstance(self.obj, str):
106 self.typeDescriptor = 'bytes'
106 self.typeDescriptor = 'bytes'
107 self.metadata = {}
107 self.metadata = {}
108 elif isinstance(self.obj, buffer):
108 elif isinstance(self.obj, buffer):
@@ -148,7 +148,7 b' class UnSerializeIt(UnSerialized):'
148 typeDescriptor = self.serialized.getTypeDescriptor()
148 typeDescriptor = self.serialized.getTypeDescriptor()
149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
150 buf = self.serialized.getData()
150 buf = self.serialized.getData()
151 if isinstance(buf, (buffer,bytes)):
151 if isinstance(buf, (str, buffer)):
152 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
152 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
153 else:
153 else:
154 # memoryview
154 # memoryview
@@ -66,6 +66,7 b' class CannedFunction(CannedObject):'
66 def __init__(self, f):
66 def __init__(self, f):
67 self._checkType(f)
67 self._checkType(f)
68 self.code = f.func_code
68 self.code = f.func_code
69 self.defaults = f.func_defaults
69 self.__name__ = f.__name__
70 self.__name__ = f.__name__
70
71
71 def _checkType(self, obj):
72 def _checkType(self, obj):
@@ -74,7 +75,7 b' class CannedFunction(CannedObject):'
74 def getObject(self, g=None):
75 def getObject(self, g=None):
75 if g is None:
76 if g is None:
76 g = globals()
77 g = globals()
77 newFunc = FunctionType(self.code, g)
78 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
78 return newFunc
79 return newFunc
79
80
80 #-------------------------------------------------------------------------------
81 #-------------------------------------------------------------------------------
@@ -45,30 +45,29 b' from .view import DirectView, LoadBalancedView'
45 # helpers for implementing old MEC API via client.apply
45 # helpers for implementing old MEC API via client.apply
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47
47
48 def _push(ns):
48 def _push(user_ns, **ns):
49 """helper method for implementing `client.push` via `client.apply`"""
49 """helper method for implementing `client.push` via `client.apply`"""
50 globals().update(ns)
50 user_ns.update(ns)
51
51
52 def _pull(keys):
52 def _pull(user_ns, keys):
53 """helper method for implementing `client.pull` via `client.apply`"""
53 """helper method for implementing `client.pull` via `client.apply`"""
54 g = globals()
55 if isinstance(keys, (list,tuple, set)):
54 if isinstance(keys, (list,tuple, set)):
56 for key in keys:
55 for key in keys:
57 if not g.has_key(key):
56 if not user_ns.has_key(key):
58 raise NameError("name '%s' is not defined"%key)
57 raise NameError("name '%s' is not defined"%key)
59 return map(g.get, keys)
58 return map(user_ns.get, keys)
60 else:
59 else:
61 if not g.has_key(keys):
60 if not user_ns.has_key(keys):
62 raise NameError("name '%s' is not defined"%keys)
61 raise NameError("name '%s' is not defined"%keys)
63 return g.get(keys)
62 return user_ns.get(keys)
64
63
65 def _clear():
64 def _clear(user_ns):
66 """helper method for implementing `client.clear` via `client.apply`"""
65 """helper method for implementing `client.clear` via `client.apply`"""
67 globals().clear()
66 user_ns.clear()
68
67
69 def _execute(code):
68 def _execute(user_ns, code):
70 """helper method for implementing `client.execute` via `client.apply`"""
69 """helper method for implementing `client.execute` via `client.apply`"""
71 exec code in globals()
70 exec code in user_ns
72
71
73
72
74 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
@@ -946,7 +945,7 b' class Client(HasTraits):'
946 return list(Dependency(dep))
945 return list(Dependency(dep))
947
946
948 @defaultblock
947 @defaultblock
949 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
948 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
950 targets=None, balanced=None,
949 targets=None, balanced=None,
951 after=None, follow=None, timeout=None,
950 after=None, follow=None, timeout=None,
952 track=False):
951 track=False):
@@ -963,9 +962,8 b' class Client(HasTraits):'
963 The positional arguments passed to `f`
962 The positional arguments passed to `f`
964 kwargs : dict
963 kwargs : dict
965 The keyword arguments passed to `f`
964 The keyword arguments passed to `f`
966 bound : bool (default: True)
965 bound : bool (default: False)
967 Whether to execute in the Engine(s) namespace, or in a clean
966 Whether to pass the Engine(s) Namespace as the first argument to `f`.
968 namespace not affecting the engine.
969 block : bool (default: self.block)
967 block : bool (default: self.block)
970 Whether to wait for the result, or return immediately.
968 Whether to wait for the result, or return immediately.
971 False:
969 False:
@@ -1171,12 +1169,12 b' class Client(HasTraits):'
1171 #--------------------------------------------------------------------------
1169 #--------------------------------------------------------------------------
1172
1170
1173 @defaultblock
1171 @defaultblock
1174 def remote(self, bound=True, block=None, targets=None, balanced=None):
1172 def remote(self, bound=False, block=None, targets=None, balanced=None):
1175 """Decorator for making a RemoteFunction"""
1173 """Decorator for making a RemoteFunction"""
1176 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1174 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1177
1175
1178 @defaultblock
1176 @defaultblock
1179 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1177 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1180 """Decorator for making a ParallelFunction"""
1178 """Decorator for making a ParallelFunction"""
1181 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1179 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1182
1180
@@ -1249,19 +1247,21 b' class Client(HasTraits):'
1249 """Push the contents of `ns` into the namespace on `target`"""
1247 """Push the contents of `ns` into the namespace on `target`"""
1250 if not isinstance(ns, dict):
1248 if not isinstance(ns, dict):
1251 raise TypeError("Must be a dict, not %s"%type(ns))
1249 raise TypeError("Must be a dict, not %s"%type(ns))
1252 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False, track=track)
1250 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1253 if not block:
1251 if not block:
1254 return result
1252 return result
1255
1253
1256 @defaultblock
1254 @defaultblock
1257 def pull(self, keys, targets='all', block=None):
1255 def pull(self, keys, targets='all', block=None):
1258 """Pull objects from `target`'s namespace by `keys`"""
1256 """Pull objects from `target`'s namespace by `keys`"""
1259 if isinstance(keys, str):
1257 if isinstance(keys, basestring):
1260 pass
1258 pass
1261 elif isinstance(keys, (list,tuple,set)):
1259 elif isinstance(keys, (list,tuple,set)):
1262 for key in keys:
1260 for key in keys:
1263 if not isinstance(key, str):
1261 if not isinstance(key, basestring):
1264 raise TypeError
1262 raise TypeError("keys must be str, not type %r"%type(key))
1263 else:
1264 raise TypeError("keys must be strs, not %r"%keys)
1265 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1265 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1266 return result
1266 return result
1267
1267
@@ -22,7 +22,7 b' from .asyncresult import AsyncMapResult'
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 @testdec.skip_doctest
24 @testdec.skip_doctest
25 def remote(client, bound=True, block=None, targets=None, balanced=None):
25 def remote(client, bound=False, block=None, targets=None, balanced=None):
26 """Turn a function into a remote function.
26 """Turn a function into a remote function.
27
27
28 This method can be used for map:
28 This method can be used for map:
@@ -37,7 +37,7 b' def remote(client, bound=True, block=None, targets=None, balanced=None):'
37 return remote_function
37 return remote_function
38
38
39 @testdec.skip_doctest
39 @testdec.skip_doctest
40 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
40 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
41 """Turn a function into a parallel remote function.
41 """Turn a function into a parallel remote function.
42
42
43 This method can be used for map:
43 This method can be used for map:
@@ -34,7 +34,7 b' from .client import Client'
34 from .error import wrap_exception
34 from .error import wrap_exception
35 from .factory import SessionFactory
35 from .factory import SessionFactory
36 from .streamsession import StreamSession
36 from .streamsession import StreamSession
37 from .util import serialize_object, unpack_apply_message, ISO8601
37 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
38
38
39 def printer(*args):
39 def printer(*args):
40 pprint(args, stream=sys.__stdout__)
40 pprint(args, stream=sys.__stdout__)
@@ -305,35 +305,38 b' class Kernel(SessionFactory):'
305 sys.stdout.set_parent(parent)
305 sys.stdout.set_parent(parent)
306 sys.stderr.set_parent(parent)
306 sys.stderr.set_parent(parent)
307 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
307 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 if bound:
308 working = self.user_ns
309 working = self.user_ns
309 # suffix =
310 suffix = str(msg_id).replace("-","")
310 prefix = "_"+str(msg_id).replace("-","")+"_"
311 prefix = "_"
311 # if bound:
312
312 #
313 else:
313 # else:
314 working = dict()
314 # working = dict()
315 suffix = prefix = "_" # prevent keyword collisions with lambda
315 # suffix = prefix = "_" # prevent keyword collisions with lambda
316 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
316 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 if bound:
318 bound_ns = Namespace(working)
319 args = [bound_ns]+list(args)
317 # if f.fun
320 # if f.fun
318 fname = getattr(f, '__name__', 'f')
321 fname = getattr(f, '__name__', 'f')
319
322
320 fname = prefix+fname.strip('<>')+suffix
323 fname = prefix+"f"
321 argname = prefix+"args"+suffix
324 argname = prefix+"args"
322 kwargname = prefix+"kwargs"+suffix
325 kwargname = prefix+"kwargs"
323 resultname = prefix+"result"+suffix
326 resultname = prefix+"result"
324
327
325 ns = { fname : f, argname : args, kwargname : kwargs }
328 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
326 # print ns
329 # print ns
327 working.update(ns)
330 working.update(ns)
328 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
331 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
329 exec code in working, working
332 try:
330 result = working.get(resultname)
333 exec code in working,working
331 # clear the namespace
334 result = working.get(resultname)
332 if bound:
335 finally:
333 for key in ns.iterkeys():
336 for key in ns.iterkeys():
334 self.user_ns.pop(key)
337 working.pop(key)
335 else:
338 if bound:
336 del working
339 working.update(bound_ns)
337
340
338 packed_result,buf = serialize_object(result)
341 packed_result,buf = serialize_object(result)
339 result_buf = [packed_result]+buf
342 result_buf = [packed_result]+buf
@@ -4,8 +4,7 b' import tempfile'
4 import time
4 import time
5 from subprocess import Popen, PIPE, STDOUT
5 from subprocess import Popen, PIPE, STDOUT
6
6
7 from IPython.zmq.parallel.ipcluster import launch_process
7 from IPython.zmq.parallel import client
8 from IPython.zmq.parallel.entry_point import select_random_ports
9
8
10 processes = []
9 processes = []
11 blackhole = tempfile.TemporaryFile()
10 blackhole = tempfile.TemporaryFile()
@@ -17,7 +16,10 b' def setup():'
17 processes.append(cp)
16 processes.append(cp)
18 time.sleep(.5)
17 time.sleep(.5)
19 add_engine()
18 add_engine()
20 time.sleep(2)
19 c = client.Client(profile='iptest')
20 while not c.ids:
21 time.sleep(.1)
22 c.spin()
21
23
22 def add_engine(profile='iptest'):
24 def add_engine(profile='iptest'):
23 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
@@ -42,5 +44,5 b' def teardown():'
42 print 'killing'
44 print 'killing'
43 p.kill()
45 p.kill()
44 except:
46 except:
45 print "couldn't shutdown process: ",p
47 print "couldn't shutdown process: ", p
46
48
@@ -91,8 +91,8 b' class ClusterTestCase(BaseZMQTestCase):'
91 def tearDown(self):
91 def tearDown(self):
92 self.client.close()
92 self.client.close()
93 BaseZMQTestCase.tearDown(self)
93 BaseZMQTestCase.tearDown(self)
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
95 # [ e.wait() for e in self.engines ]
96 # while len(self.client.ids) > self.base_engine_count:
96 # while len(self.client.ids) > self.base_engine_count:
97 # time.sleep(.1)
97 # time.sleep(.1)
98 # del self.engines
98 # del self.engines
@@ -165,6 +165,17 b' class TestClient(ClusterTestCase):'
165 v.execute('b=f()')
165 v.execute('b=f()')
166 self.assertEquals(v['b'], 5)
166 self.assertEquals(v['b'], 5)
167
167
168 def test_push_function_defaults(self):
169 """test that pushed functions preserve default args"""
170 def echo(a=10):
171 return a
172 self.add_engines(1)
173 v = self.client[-1]
174 v.block=True
175 v['f'] = echo
176 v.execute('b=f()')
177 self.assertEquals(v['b'], 10)
178
168 def test_get_result(self):
179 def test_get_result(self):
169 """test getting results from the Hub."""
180 """test getting results from the Hub."""
170 c = clientmod.Client(profile='iptest')
181 c = clientmod.Client(profile='iptest')
@@ -195,7 +206,7 b' class TestClient(ClusterTestCase):'
195 """)
206 """)
196 v = self.client[-1]
207 v = self.client[-1]
197 v.run(tmpfile, block=True)
208 v.run(tmpfile, block=True)
198 self.assertEquals(v.apply_sync_bound(lambda : g()), 5)
209 self.assertEquals(v.apply_sync(lambda : g()), 5)
199
210
200 def test_apply_tracked(self):
211 def test_apply_tracked(self):
201 """test tracking for apply"""
212 """test tracking for apply"""
@@ -245,8 +256,7 b' class TestClient(ClusterTestCase):'
245 v = self.client[-1]
256 v = self.client[-1]
246 v['a'] = 123
257 v['a'] = 123
247 ra = clientmod.Reference('a')
258 ra = clientmod.Reference('a')
248 b = v.apply_sync_bound(lambda x: x, ra)
259 b = v.apply_sync(lambda x: x, ra)
249 self.assertEquals(b, 123)
260 self.assertEquals(b, 123)
250 self.assertRaisesRemote(NameError, v.apply_sync, lambda x: x, ra)
251
261
252
262
@@ -15,6 +15,23 b' from IPython.utils.newserialized import serialize, unserialize'
15
15
16 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
16 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
17
17
18 class Namespace(dict):
19 """Subclass of dict for attribute access to keys."""
20
21 def __getattr__(self, key):
22 """getattr aliased to getitem"""
23 if key in self.iterkeys():
24 return self[key]
25 else:
26 raise NameError(key)
27
28 def __setattr__(self, key, value):
29 """setattr aliased to setitem, with strict"""
30 if hasattr(dict, key):
31 raise KeyError("Cannot override dict keys %r"%key)
32 self[key] = value
33
34
18 class ReverseDict(dict):
35 class ReverseDict(dict):
19 """simple double-keyed subset of dict methods."""
36 """simple double-keyed subset of dict methods."""
20
37
@@ -264,7 +281,18 b' def unpack_apply_message(bufs, g=None, copy=True):'
264 for k in sorted(skwargs.iterkeys()):
281 for k in sorted(skwargs.iterkeys()):
265 sa = skwargs[k]
282 sa = skwargs[k]
266 if sa.data is None:
283 if sa.data is None:
267 sa.data = bufs.pop(0)
284 m = bufs.pop(0)
285 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
286 if copy:
287 sa.data = buffer(m)
288 else:
289 sa.data = m.buffer
290 else:
291 if copy:
292 sa.data = m
293 else:
294 sa.data = m.bytes
295
268 kwargs[k] = uncan(unserialize(sa), g)
296 kwargs[k] = uncan(unserialize(sa), g)
269
297
270 return f,args,kwargs
298 return f,args,kwargs
@@ -74,6 +74,7 b' class View(HasTraits):'
74 """
74 """
75 block=Bool(False)
75 block=Bool(False)
76 bound=Bool(False)
76 bound=Bool(False)
77 track=Bool(False)
77 history=List()
78 history=List()
78 outstanding = Set()
79 outstanding = Set()
79 results = Dict()
80 results = Dict()
@@ -81,7 +82,7 b' class View(HasTraits):'
81
82
82 _ntargets = Int(1)
83 _ntargets = Int(1)
83 _balanced = Bool(False)
84 _balanced = Bool(False)
84 _default_names = List(['block', 'bound'])
85 _default_names = List(['block', 'bound', 'track'])
85 _targets = Any()
86 _targets = Any()
86
87
87 def __init__(self, client=None, targets=None):
88 def __init__(self, client=None, targets=None):
@@ -139,7 +140,12 b' class View(HasTraits):'
139 block : bool
140 block : bool
140 whether to wait for results
141 whether to wait for results
141 bound : bool
142 bound : bool
142 whether to use the client's namespace
143 whether to pass the client's Namespace as the first argument
144 to functions called via `apply`.
145 track : bool
146 whether to create a MessageTracker to allow the user to
147 safely edit after arrays and buffers during non-copying
148 sends.
143 """
149 """
144 for key in kwargs:
150 for key in kwargs:
145 if key not in self._default_names:
151 if key not in self._default_names:
@@ -161,10 +167,11 b' class View(HasTraits):'
161 def apply(self, f, *args, **kwargs):
167 def apply(self, f, *args, **kwargs):
162 """calls f(*args, **kwargs) on remote engines, returning the result.
168 """calls f(*args, **kwargs) on remote engines, returning the result.
163
169
164 This method does not involve the engine's namespace.
170 This method sets all of `client.apply`'s keyword arguments via this
171 View's attributes.
165
172
166 if self.block is False:
173 if self.block is False:
167 returns msg_id
174 returns AsyncResult
168 else:
175 else:
169 returns actual result of f(*args, **kwargs)
176 returns actual result of f(*args, **kwargs)
170 """
177 """
@@ -174,9 +181,7 b' class View(HasTraits):'
174 def apply_async(self, f, *args, **kwargs):
181 def apply_async(self, f, *args, **kwargs):
175 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
182 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
176
183
177 This method does not involve the engine's namespace.
184 returns AsyncResult
178
179 returns msg_id
180 """
185 """
181 d = self._defaults('block', 'bound')
186 d = self._defaults('block', 'bound')
182 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
187 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
@@ -187,11 +192,9 b' class View(HasTraits):'
187 """calls f(*args, **kwargs) on remote engines in a blocking manner,
192 """calls f(*args, **kwargs) on remote engines in a blocking manner,
188 returning the result.
193 returning the result.
189
194
190 This method does not involve the engine's namespace.
191
192 returns: actual result of f(*args, **kwargs)
195 returns: actual result of f(*args, **kwargs)
193 """
196 """
194 d = self._defaults('block', 'bound')
197 d = self._defaults('block', 'bound', 'track')
195 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
198 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
196
199
197 # @sync_results
200 # @sync_results
@@ -216,9 +219,9 b' class View(HasTraits):'
216 """calls f(*args, **kwargs) bound to engine namespace(s)
219 """calls f(*args, **kwargs) bound to engine namespace(s)
217 in a nonblocking manner.
220 in a nonblocking manner.
218
221
219 returns: msg_id
222 The first argument to `f` will be the Engine's Namespace
220
223
221 This method has access to the targets' namespace via globals()
224 returns: AsyncResult
222
225
223 """
226 """
224 d = self._defaults('block', 'bound')
227 d = self._defaults('block', 'bound')
@@ -229,9 +232,9 b' class View(HasTraits):'
229 def apply_sync_bound(self, f, *args, **kwargs):
232 def apply_sync_bound(self, f, *args, **kwargs):
230 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
233 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
231
234
232 returns: actual result of f(*args, **kwargs)
235 The first argument to `f` will be the Engine's Namespace
233
236
234 This method has access to the targets' namespace via globals()
237 returns: actual result of f(*args, **kwargs)
235
238
236 """
239 """
237 d = self._defaults('block', 'bound')
240 d = self._defaults('block', 'bound')
@@ -323,11 +326,11 b' class View(HasTraits):'
323 # Decorators
326 # Decorators
324 #-------------------------------------------------------------------
327 #-------------------------------------------------------------------
325
328
326 def remote(self, bound=True, block=True):
329 def remote(self, bound=False, block=True):
327 """Decorator for making a RemoteFunction"""
330 """Decorator for making a RemoteFunction"""
328 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
331 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
329
332
330 def parallel(self, dist='b', bound=True, block=None):
333 def parallel(self, dist='b', bound=False, block=None):
331 """Decorator for making a ParallelFunction"""
334 """Decorator for making a ParallelFunction"""
332 block = self.block if block is None else block
335 block = self.block if block is None else block
333 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
336 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
@@ -378,7 +381,7 b' class DirectView(View):'
378 block : bool
381 block : bool
379 whether to wait for the result or not [default self.block]
382 whether to wait for the result or not [default self.block]
380 bound : bool
383 bound : bool
381 whether to have access to the engines' namespaces [default self.bound]
384 whether to pass the client's Namespace as the first argument to `f`
382
385
383 Returns
386 Returns
384 -------
387 -------
@@ -572,7 +575,12 b' class LoadBalancedView(View):'
572 block : bool
575 block : bool
573 whether to wait for results
576 whether to wait for results
574 bound : bool
577 bound : bool
575 whether to use the engine's namespace
578 whether to pass the client's Namespace as the first argument
579 to functions called via `apply`.
580 track : bool
581 whether to create a MessageTracker to allow the user to
582 safely edit after arrays and buffers during non-copying
583 sends.
576 follow : Dependency, list, msg_id, AsyncResult
584 follow : Dependency, list, msg_id, AsyncResult
577 the location dependencies of tasks
585 the location dependencies of tasks
578 after : Dependency, list, msg_id, AsyncResult
586 after : Dependency, list, msg_id, AsyncResult
@@ -621,7 +629,11 b' class LoadBalancedView(View):'
621 block : bool
629 block : bool
622 whether to wait for the result or not [default self.block]
630 whether to wait for the result or not [default self.block]
623 bound : bool
631 bound : bool
624 whether to use the engine's namespace [default self.bound]
632 whether to pass the client's Namespace as the first argument to `f`
633 track : bool
634 whether to create a MessageTracker to allow the user to
635 safely edit after arrays and buffers during non-copying
636 sends.
625 chunk_size : int
637 chunk_size : int
626 how many elements should be in each task [default 1]
638 how many elements should be in each task [default 1]
627
639
@@ -10,12 +10,12 b" view.run('communicator.py')"
10 view.execute('com = EngineCommunicator()')
10 view.execute('com = EngineCommunicator()')
11
11
12 # gather the connection information into a dict
12 # gather the connection information into a dict
13 ar = view.apply_async_bound(lambda : com.info)
13 ar = view.apply_async(lambda : com.info)
14 peers = ar.get_dict()
14 peers = ar.get_dict()
15 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
15 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
16
16
17 # connect the engines to each other:
17 # connect the engines to each other:
18 view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)
18 view.apply_sync(lambda pdict: com.connect(pdict), peers)
19
19
20 # now all the engines are connected, and we can communicate between them:
20 # now all the engines are connected, and we can communicate between them:
21
21
@@ -34,7 +34,7 b' def send(client, sender, targets, msg_name, dest_name=None, block=None):'
34 msg = globals()[m_name]
34 msg = globals()[m_name]
35 return com.send(targets, msg)
35 return com.send(targets, msg)
36
36
37 client[sender].apply_async_bound(_send, targets, msg_name)
37 client[sender].apply_async(_send, targets, msg_name)
38
38
39 return client[targets].execute('%s=com.recv()'%dest_name, block=None)
39 return client[targets].execute('%s=com.recv()'%dest_name, block=None)
40
40
@@ -42,7 +42,7 b' print "done"'
42
42
43 # Run 10m digits on 1 engine
43 # Run 10m digits on 1 engine
44 t1 = clock()
44 t1 = clock()
45 freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
45 freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
46 t2 = clock()
46 t2 = clock()
47 digits_per_second1 = 10.0e6/(t2-t1)
47 digits_per_second1 = 10.0e6/(t2-t1)
48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
@@ -68,7 +68,7 b' constructed via list-access to the client:'
68
68
69 .. seealso::
69 .. seealso::
70
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72
72
73
73
74 Quick and easy parallelism
74 Quick and easy parallelism
@@ -232,8 +232,8 b' blocks until the engines are done executing the command:'
232
232
233 In [5]: dview['b'] = 10
233 In [5]: dview['b'] = 10
234
234
235 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
235 In [6]: dview.apply_sync(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]%exit
236 Out[6]: [42, 42, 42, 42]
237
237
238 Python commands can be executed on specific engines by calling execute using the ``targets``
238 Python commands can be executed on specific engines by calling execute using the ``targets``
239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
@@ -265,7 +265,13 b' Bound and unbound execution'
265
265
266 The previous example also shows one of the most important things about the IPython
266 The previous example also shows one of the most important things about the IPython
267 engines: they have a persistent user namespaces. The :meth:`apply` method can
267 engines: they have a persistent user namespaces. The :meth:`apply` method can
268 be run in either a bound or unbound manner:
268 be run in either a bound or unbound manner.
269
270 When applying a function in a `bound` manner, the first argument to that function
271 will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
272 also providing attribute-access to keys.
273
274 In all (unbound and bound) execution
269
275
270 .. sourcecode:: ipython
276 .. sourcecode:: ipython
271
277
@@ -273,31 +279,16 b' be run in either a bound or unbound manner:'
273
279
274 In [10]: v0 = rc[0]
280 In [10]: v0 = rc[0]
275
281
276 In [12]: v0.apply_sync_bound(lambda : b)
282 # multiply b*2 inplace
277 Out[12]: 5
283 In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
278
284
279 In [13]: v0.apply_sync(lambda : b)
285 # b is still available in globals during unbound execution
280 ---------------------------------------------------------------------------
286 In [13]: v0.apply_sync(lambda a: a*b, 3)
281 RemoteError Traceback (most recent call last)
287 Out[13]: 30
282 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
283 ----> 1 v0.apply(lambda : b)
284 ...
285 RemoteError: NameError(global name 'b' is not defined)
286 Traceback (most recent call last):
287 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
288 exec code in working, working
289 File "<string>", line 1, in <module>
290 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
291 NameError: global name 'b' is not defined
292
293
294 Specifically, `bound=True` specifies that the engine's namespace is to be used
295 as the `globals` when the function is called, and `bound=False` specifies that
296 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
297 execution, since the function is called in an empty namespace). Unbound execution is
298 often useful for large numbers of atomic tasks, which prevents bloating the engine's
299 memory, while bound execution lets you build on your previous work.
300
288
289 `bound=True` specifies that the engine's namespace is to be passed as the first argument when
290 the function is called, and the default `bound=False` specifies that the normal behavior, but
291 the engine's namespace will be available as the globals() when the function is called.
301
292
302 Non-blocking execution
293 Non-blocking execution
303 ----------------------
294 ----------------------
@@ -469,7 +460,7 b' specifying the index of the result to be requested. It is simply a shortcut to t'
469
460
470 .. sourcecode:: ipython
461 .. sourcecode:: ipython
471
462
472 In [29]: dv.apply_async_bound(lambda : ev)
463 In [29]: dv.apply_async(lambda : ev)
473
464
474 In [30]: %result
465 In [30]: %result
475 Out[30]: [ [ 1.28167017 0.14197338],
466 Out[30]: [ [ 1.28167017 0.14197338],
@@ -298,6 +298,8 b' The basic cases that are checked:'
298 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 This analysis has not been proven to be rigorous, so it is likely possible for tasks
299 to become impossible to run in obscure situations, so a timeout may be a good choice.
299 to become impossible to run in obscure situations, so a timeout may be a good choice.
300
300
301 .. _parallel_schedulers:
302
301 Schedulers
303 Schedulers
302 ==========
304 ==========
303
305
@@ -309,6 +311,12 b' of a controller config object.'
309
311
310 The built-in routing schemes:
312 The built-in routing schemes:
311
313
314 To select one of these schemes, simply do::
315
316 $ ipcontrollerz --scheme <schemename>
317 for instance:
318 $ ipcontrollerz --scheme lru
319
312 lru: Least Recently Used
320 lru: Least Recently Used
313
321
314 Always assign work to the least-recently-used engine. A close relative of
322 Always assign work to the least-recently-used engine. A close relative of
@@ -316,11 +324,12 b' lru: Least Recently Used'
316 with respect to runtime of each task.
324 with respect to runtime of each task.
317
325
318 plainrandom: Plain Random
326 plainrandom: Plain Random
327
319 Randomly picks an engine on which to run.
328 Randomly picks an engine on which to run.
320
329
321 twobin: Two-Bin Random
330 twobin: Two-Bin Random
322
331
323 **Depends on numpy**
332 **Requires numpy**
324
333
325 Pick two engines at random, and use the LRU of the two. This is known to be better
334 Pick two engines at random, and use the LRU of the two. This is known to be better
326 than plain random in many cases, but requires a small amount of computation.
335 than plain random in many cases, but requires a small amount of computation.
@@ -333,7 +342,7 b' leastload: Least Load'
333
342
334 weighted: Weighted Two-Bin Random
343 weighted: Weighted Two-Bin Random
335
344
336 **Depends on numpy**
345 **Requires numpy**
337
346
338 Pick two engines at random using the number of outstanding tasks as inverse weights,
347 Pick two engines at random using the number of outstanding tasks as inverse weights,
339 and use the one with the lower load.
348 and use the one with the lower load.
@@ -360,7 +369,7 b' Disabled features when using the ZMQ Scheduler:'
360 allows graceful handling of Engines coming and going. There is no way to know
369 allows graceful handling of Engines coming and going. There is no way to know
361 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
370 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
362 engine until they *finish*. This makes recovery from engine shutdown very difficult.
371 engine until they *finish*. This makes recovery from engine shutdown very difficult.
363
372
364
373
365 .. note::
374 .. note::
366
375
General Comments 0
You need to be logged in to leave comments. Login now