##// END OF EJS Templates
reflect revised apply_bound pattern
MinRK -
Show More
@@ -102,7 +102,7 b' class SerializeIt(object):'
102 102 self.typeDescriptor = 'ndarray'
103 103 self.metadata = {'shape':self.obj.shape,
104 104 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, bytes):
105 elif isinstance(self.obj, str):
106 106 self.typeDescriptor = 'bytes'
107 107 self.metadata = {}
108 108 elif isinstance(self.obj, buffer):
@@ -148,7 +148,7 b' class UnSerializeIt(UnSerialized):'
148 148 typeDescriptor = self.serialized.getTypeDescriptor()
149 149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
150 150 buf = self.serialized.getData()
151 if isinstance(buf, (buffer,bytes)):
151 if isinstance(buf, (str, buffer)):
152 152 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
153 153 else:
154 154 # memoryview
@@ -66,6 +66,7 b' class CannedFunction(CannedObject):'
66 66 def __init__(self, f):
67 67 self._checkType(f)
68 68 self.code = f.func_code
69 self.defaults = f.func_defaults
69 70 self.__name__ = f.__name__
70 71
71 72 def _checkType(self, obj):
@@ -74,7 +75,7 b' class CannedFunction(CannedObject):'
74 75 def getObject(self, g=None):
75 76 if g is None:
76 77 g = globals()
77 newFunc = FunctionType(self.code, g)
78 newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
78 79 return newFunc
79 80
80 81 #-------------------------------------------------------------------------------
@@ -45,30 +45,29 b' from .view import DirectView, LoadBalancedView'
45 45 # helpers for implementing old MEC API via client.apply
46 46 #--------------------------------------------------------------------------
47 47
48 def _push(ns):
48 def _push(user_ns, **ns):
49 49 """helper method for implementing `client.push` via `client.apply`"""
50 globals().update(ns)
50 user_ns.update(ns)
51 51
52 def _pull(keys):
52 def _pull(user_ns, keys):
53 53 """helper method for implementing `client.pull` via `client.apply`"""
54 g = globals()
55 54 if isinstance(keys, (list,tuple, set)):
56 55 for key in keys:
57 if not g.has_key(key):
56 if not user_ns.has_key(key):
58 57 raise NameError("name '%s' is not defined"%key)
59 return map(g.get, keys)
58 return map(user_ns.get, keys)
60 59 else:
61 if not g.has_key(keys):
60 if not user_ns.has_key(keys):
62 61 raise NameError("name '%s' is not defined"%keys)
63 return g.get(keys)
62 return user_ns.get(keys)
64 63
65 def _clear():
64 def _clear(user_ns):
66 65 """helper method for implementing `client.clear` via `client.apply`"""
67 globals().clear()
66 user_ns.clear()
68 67
69 def _execute(code):
68 def _execute(user_ns, code):
70 69 """helper method for implementing `client.execute` via `client.apply`"""
71 exec code in globals()
70 exec code in user_ns
72 71
73 72
74 73 #--------------------------------------------------------------------------
@@ -946,7 +945,7 b' class Client(HasTraits):'
946 945 return list(Dependency(dep))
947 946
948 947 @defaultblock
949 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
948 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
950 949 targets=None, balanced=None,
951 950 after=None, follow=None, timeout=None,
952 951 track=False):
@@ -963,9 +962,8 b' class Client(HasTraits):'
963 962 The positional arguments passed to `f`
964 963 kwargs : dict
965 964 The keyword arguments passed to `f`
966 bound : bool (default: True)
967 Whether to execute in the Engine(s) namespace, or in a clean
968 namespace not affecting the engine.
965 bound : bool (default: False)
966 Whether to pass the Engine(s) Namespace as the first argument to `f`.
969 967 block : bool (default: self.block)
970 968 Whether to wait for the result, or return immediately.
971 969 False:
@@ -1171,12 +1169,12 b' class Client(HasTraits):'
1171 1169 #--------------------------------------------------------------------------
1172 1170
1173 1171 @defaultblock
1174 def remote(self, bound=True, block=None, targets=None, balanced=None):
1172 def remote(self, bound=False, block=None, targets=None, balanced=None):
1175 1173 """Decorator for making a RemoteFunction"""
1176 1174 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1177 1175
1178 1176 @defaultblock
1179 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1177 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1180 1178 """Decorator for making a ParallelFunction"""
1181 1179 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1182 1180
@@ -1249,19 +1247,21 b' class Client(HasTraits):'
1249 1247 """Push the contents of `ns` into the namespace on `target`"""
1250 1248 if not isinstance(ns, dict):
1251 1249 raise TypeError("Must be a dict, not %s"%type(ns))
1252 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False, track=track)
1250 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1253 1251 if not block:
1254 1252 return result
1255 1253
1256 1254 @defaultblock
1257 1255 def pull(self, keys, targets='all', block=None):
1258 1256 """Pull objects from `target`'s namespace by `keys`"""
1259 if isinstance(keys, str):
1257 if isinstance(keys, basestring):
1260 1258 pass
1261 1259 elif isinstance(keys, (list,tuple,set)):
1262 1260 for key in keys:
1263 if not isinstance(key, str):
1264 raise TypeError
1261 if not isinstance(key, basestring):
1262 raise TypeError("keys must be str, not type %r"%type(key))
1263 else:
1264 raise TypeError("keys must be strs, not %r"%keys)
1265 1265 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1266 1266 return result
1267 1267
@@ -22,7 +22,7 b' from .asyncresult import AsyncMapResult'
22 22 #-----------------------------------------------------------------------------
23 23
24 24 @testdec.skip_doctest
25 def remote(client, bound=True, block=None, targets=None, balanced=None):
25 def remote(client, bound=False, block=None, targets=None, balanced=None):
26 26 """Turn a function into a remote function.
27 27
28 28 This method can be used for map:
@@ -37,7 +37,7 b' def remote(client, bound=True, block=None, targets=None, balanced=None):'
37 37 return remote_function
38 38
39 39 @testdec.skip_doctest
40 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
40 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
41 41 """Turn a function into a parallel remote function.
42 42
43 43 This method can be used for map:
@@ -34,7 +34,7 b' from .client import Client'
34 34 from .error import wrap_exception
35 35 from .factory import SessionFactory
36 36 from .streamsession import StreamSession
37 from .util import serialize_object, unpack_apply_message, ISO8601
37 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
38 38
39 39 def printer(*args):
40 40 pprint(args, stream=sys.__stdout__)
@@ -305,35 +305,38 b' class Kernel(SessionFactory):'
305 305 sys.stdout.set_parent(parent)
306 306 sys.stderr.set_parent(parent)
307 307 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 if bound:
309 working = self.user_ns
310 suffix = str(msg_id).replace("-","")
311 prefix = "_"
312
313 else:
314 working = dict()
315 suffix = prefix = "_" # prevent keyword collisions with lambda
308 working = self.user_ns
309 # suffix =
310 prefix = "_"+str(msg_id).replace("-","")+"_"
311 # if bound:
312 #
313 # else:
314 # working = dict()
315 # suffix = prefix = "_" # prevent keyword collisions with lambda
316 316 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 if bound:
318 bound_ns = Namespace(working)
319 args = [bound_ns]+list(args)
317 320 # if f.fun
318 321 fname = getattr(f, '__name__', 'f')
319 322
320 fname = prefix+fname.strip('<>')+suffix
321 argname = prefix+"args"+suffix
322 kwargname = prefix+"kwargs"+suffix
323 resultname = prefix+"result"+suffix
323 fname = prefix+"f"
324 argname = prefix+"args"
325 kwargname = prefix+"kwargs"
326 resultname = prefix+"result"
324 327
325 ns = { fname : f, argname : args, kwargname : kwargs }
328 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
326 329 # print ns
327 330 working.update(ns)
328 331 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
329 exec code in working, working
330 result = working.get(resultname)
331 # clear the namespace
332 if bound:
332 try:
333 exec code in working,working
334 result = working.get(resultname)
335 finally:
333 336 for key in ns.iterkeys():
334 self.user_ns.pop(key)
335 else:
336 del working
337 working.pop(key)
338 if bound:
339 working.update(bound_ns)
337 340
338 341 packed_result,buf = serialize_object(result)
339 342 result_buf = [packed_result]+buf
@@ -4,8 +4,7 b' import tempfile'
4 4 import time
5 5 from subprocess import Popen, PIPE, STDOUT
6 6
7 from IPython.zmq.parallel.ipcluster import launch_process
8 from IPython.zmq.parallel.entry_point import select_random_ports
7 from IPython.zmq.parallel import client
9 8
10 9 processes = []
11 10 blackhole = tempfile.TemporaryFile()
@@ -17,7 +16,10 b' def setup():'
17 16 processes.append(cp)
18 17 time.sleep(.5)
19 18 add_engine()
20 time.sleep(2)
19 c = client.Client(profile='iptest')
20 while not c.ids:
21 time.sleep(.1)
22 c.spin()
21 23
22 24 def add_engine(profile='iptest'):
23 25 ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT)
@@ -42,5 +44,5 b' def teardown():'
42 44 print 'killing'
43 45 p.kill()
44 46 except:
45 print "couldn't shutdown process: ",p
47 print "couldn't shutdown process: ", p
46 48
@@ -91,8 +91,8 b' class ClusterTestCase(BaseZMQTestCase):'
91 91 def tearDown(self):
92 92 self.client.close()
93 93 BaseZMQTestCase.tearDown(self)
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
94 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
95 # [ e.wait() for e in self.engines ]
96 96 # while len(self.client.ids) > self.base_engine_count:
97 97 # time.sleep(.1)
98 98 # del self.engines
@@ -165,6 +165,17 b' class TestClient(ClusterTestCase):'
165 165 v.execute('b=f()')
166 166 self.assertEquals(v['b'], 5)
167 167
168 def test_push_function_defaults(self):
169 """test that pushed functions preserve default args"""
170 def echo(a=10):
171 return a
172 self.add_engines(1)
173 v = self.client[-1]
174 v.block=True
175 v['f'] = echo
176 v.execute('b=f()')
177 self.assertEquals(v['b'], 10)
178
168 179 def test_get_result(self):
169 180 """test getting results from the Hub."""
170 181 c = clientmod.Client(profile='iptest')
@@ -195,7 +206,7 b' class TestClient(ClusterTestCase):'
195 206 """)
196 207 v = self.client[-1]
197 208 v.run(tmpfile, block=True)
198 self.assertEquals(v.apply_sync_bound(lambda : g()), 5)
209 self.assertEquals(v.apply_sync(lambda : g()), 5)
199 210
200 211 def test_apply_tracked(self):
201 212 """test tracking for apply"""
@@ -245,8 +256,7 b' class TestClient(ClusterTestCase):'
245 256 v = self.client[-1]
246 257 v['a'] = 123
247 258 ra = clientmod.Reference('a')
248 b = v.apply_sync_bound(lambda x: x, ra)
259 b = v.apply_sync(lambda x: x, ra)
249 260 self.assertEquals(b, 123)
250 self.assertRaisesRemote(NameError, v.apply_sync, lambda x: x, ra)
251 261
252 262
@@ -15,6 +15,23 b' from IPython.utils.newserialized import serialize, unserialize'
15 15
16 16 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
17 17
18 class Namespace(dict):
19 """Subclass of dict for attribute access to keys."""
20
21 def __getattr__(self, key):
22 """getattr aliased to getitem"""
23 if key in self.iterkeys():
24 return self[key]
25 else:
26 raise NameError(key)
27
28 def __setattr__(self, key, value):
29 """setattr aliased to setitem, with strict"""
30 if hasattr(dict, key):
31 raise KeyError("Cannot override dict keys %r"%key)
32 self[key] = value
33
34
18 35 class ReverseDict(dict):
19 36 """simple double-keyed subset of dict methods."""
20 37
@@ -264,7 +281,18 b' def unpack_apply_message(bufs, g=None, copy=True):'
264 281 for k in sorted(skwargs.iterkeys()):
265 282 sa = skwargs[k]
266 283 if sa.data is None:
267 sa.data = bufs.pop(0)
284 m = bufs.pop(0)
285 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
286 if copy:
287 sa.data = buffer(m)
288 else:
289 sa.data = m.buffer
290 else:
291 if copy:
292 sa.data = m
293 else:
294 sa.data = m.bytes
295
268 296 kwargs[k] = uncan(unserialize(sa), g)
269 297
270 298 return f,args,kwargs
@@ -74,6 +74,7 b' class View(HasTraits):'
74 74 """
75 75 block=Bool(False)
76 76 bound=Bool(False)
77 track=Bool(False)
77 78 history=List()
78 79 outstanding = Set()
79 80 results = Dict()
@@ -81,7 +82,7 b' class View(HasTraits):'
81 82
82 83 _ntargets = Int(1)
83 84 _balanced = Bool(False)
84 _default_names = List(['block', 'bound'])
85 _default_names = List(['block', 'bound', 'track'])
85 86 _targets = Any()
86 87
87 88 def __init__(self, client=None, targets=None):
@@ -139,7 +140,12 b' class View(HasTraits):'
139 140 block : bool
140 141 whether to wait for results
141 142 bound : bool
142 whether to use the client's namespace
143 whether to pass the client's Namespace as the first argument
144 to functions called via `apply`.
145 track : bool
146 whether to create a MessageTracker to allow the user to
147 safely edit after arrays and buffers during non-copying
148 sends.
143 149 """
144 150 for key in kwargs:
145 151 if key not in self._default_names:
@@ -161,10 +167,11 b' class View(HasTraits):'
161 167 def apply(self, f, *args, **kwargs):
162 168 """calls f(*args, **kwargs) on remote engines, returning the result.
163 169
164 This method does not involve the engine's namespace.
170 This method sets all of `client.apply`'s keyword arguments via this
171 View's attributes.
165 172
166 173 if self.block is False:
167 returns msg_id
174 returns AsyncResult
168 175 else:
169 176 returns actual result of f(*args, **kwargs)
170 177 """
@@ -174,9 +181,7 b' class View(HasTraits):'
174 181 def apply_async(self, f, *args, **kwargs):
175 182 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
176 183
177 This method does not involve the engine's namespace.
178
179 returns msg_id
184 returns AsyncResult
180 185 """
181 186 d = self._defaults('block', 'bound')
182 187 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
@@ -187,11 +192,9 b' class View(HasTraits):'
187 192 """calls f(*args, **kwargs) on remote engines in a blocking manner,
188 193 returning the result.
189 194
190 This method does not involve the engine's namespace.
191
192 195 returns: actual result of f(*args, **kwargs)
193 196 """
194 d = self._defaults('block', 'bound')
197 d = self._defaults('block', 'bound', 'track')
195 198 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
196 199
197 200 # @sync_results
@@ -216,9 +219,9 b' class View(HasTraits):'
216 219 """calls f(*args, **kwargs) bound to engine namespace(s)
217 220 in a nonblocking manner.
218 221
219 returns: msg_id
222 The first argument to `f` will be the Engine's Namespace
220 223
221 This method has access to the targets' namespace via globals()
224 returns: AsyncResult
222 225
223 226 """
224 227 d = self._defaults('block', 'bound')
@@ -229,9 +232,9 b' class View(HasTraits):'
229 232 def apply_sync_bound(self, f, *args, **kwargs):
230 233 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
231 234
232 returns: actual result of f(*args, **kwargs)
235 The first argument to `f` will be the Engine's Namespace
233 236
234 This method has access to the targets' namespace via globals()
237 returns: actual result of f(*args, **kwargs)
235 238
236 239 """
237 240 d = self._defaults('block', 'bound')
@@ -323,11 +326,11 b' class View(HasTraits):'
323 326 # Decorators
324 327 #-------------------------------------------------------------------
325 328
326 def remote(self, bound=True, block=True):
329 def remote(self, bound=False, block=True):
327 330 """Decorator for making a RemoteFunction"""
328 331 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
329 332
330 def parallel(self, dist='b', bound=True, block=None):
333 def parallel(self, dist='b', bound=False, block=None):
331 334 """Decorator for making a ParallelFunction"""
332 335 block = self.block if block is None else block
333 336 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
@@ -378,7 +381,7 b' class DirectView(View):'
378 381 block : bool
379 382 whether to wait for the result or not [default self.block]
380 383 bound : bool
381 whether to have access to the engines' namespaces [default self.bound]
384 whether to pass the client's Namespace as the first argument to `f`
382 385
383 386 Returns
384 387 -------
@@ -572,7 +575,12 b' class LoadBalancedView(View):'
572 575 block : bool
573 576 whether to wait for results
574 577 bound : bool
575 whether to use the engine's namespace
578 whether to pass the client's Namespace as the first argument
579 to functions called via `apply`.
580 track : bool
581 whether to create a MessageTracker to allow the user to
582 safely edit after arrays and buffers during non-copying
583 sends.
576 584 follow : Dependency, list, msg_id, AsyncResult
577 585 the location dependencies of tasks
578 586 after : Dependency, list, msg_id, AsyncResult
@@ -621,7 +629,11 b' class LoadBalancedView(View):'
621 629 block : bool
622 630 whether to wait for the result or not [default self.block]
623 631 bound : bool
624 whether to use the engine's namespace [default self.bound]
632 whether to pass the client's Namespace as the first argument to `f`
633 track : bool
634 whether to create a MessageTracker to allow the user to
635 safely edit after arrays and buffers during non-copying
636 sends.
625 637 chunk_size : int
626 638 how many elements should be in each task [default 1]
627 639
@@ -10,12 +10,12 b" view.run('communicator.py')"
10 10 view.execute('com = EngineCommunicator()')
11 11
12 12 # gather the connection information into a dict
13 ar = view.apply_async_bound(lambda : com.info)
13 ar = view.apply_async(lambda : com.info)
14 14 peers = ar.get_dict()
15 15 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
16 16
17 17 # connect the engines to each other:
18 view.apply_sync_bound(lambda pdict: com.connect(pdict), peers)
18 view.apply_sync(lambda pdict: com.connect(pdict), peers)
19 19
20 20 # now all the engines are connected, and we can communicate between them:
21 21
@@ -34,7 +34,7 b' def send(client, sender, targets, msg_name, dest_name=None, block=None):'
34 34 msg = globals()[m_name]
35 35 return com.send(targets, msg)
36 36
37 client[sender].apply_async_bound(_send, targets, msg_name)
37 client[sender].apply_async(_send, targets, msg_name)
38 38
39 39 return client[targets].execute('%s=com.recv()'%dest_name, block=None)
40 40
@@ -42,7 +42,7 b' print "done"'
42 42
43 43 # Run 10m digits on 1 engine
44 44 t1 = clock()
45 freqs10m = c[id0].apply_sync_bound(compute_two_digit_freqs, files[0])
45 freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
46 46 t2 = clock()
47 47 digits_per_second1 = 10.0e6/(t2-t1)
48 48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
@@ -68,7 +68,7 b' constructed via list-access to the client:'
68 68
69 69 .. seealso::
70 70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
@@ -232,8 +232,8 b' blocks until the engines are done executing the command:'
232 232
233 233 In [5]: dview['b'] = 10
234 234
235 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]%exit
235 In [6]: dview.apply_sync(lambda x: a+b+x, 27)
236 Out[6]: [42, 42, 42, 42]
237 237
238 238 Python commands can be executed on specific engines by calling execute using the ``targets``
239 239 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
@@ -265,7 +265,13 b' Bound and unbound execution'
265 265
266 266 The previous example also shows one of the most important things about the IPython
267 267 engines: they have a persistent user namespaces. The :meth:`apply` method can
268 be run in either a bound or unbound manner:
268 be run in either a bound or unbound manner.
269
270 When applying a function in a `bound` manner, the first argument to that function
271 will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary
272 also providing attribute-access to keys.
273
274 In all (unbound and bound) execution
269 275
270 276 .. sourcecode:: ipython
271 277
@@ -273,31 +279,16 b' be run in either a bound or unbound manner:'
273 279
274 280 In [10]: v0 = rc[0]
275 281
276 In [12]: v0.apply_sync_bound(lambda : b)
277 Out[12]: 5
282 # multiply b*2 inplace
283 In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2)
278 284
279 In [13]: v0.apply_sync(lambda : b)
280 ---------------------------------------------------------------------------
281 RemoteError Traceback (most recent call last)
282 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
283 ----> 1 v0.apply(lambda : b)
284 ...
285 RemoteError: NameError(global name 'b' is not defined)
286 Traceback (most recent call last):
287 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
288 exec code in working, working
289 File "<string>", line 1, in <module>
290 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
291 NameError: global name 'b' is not defined
292
293
294 Specifically, `bound=True` specifies that the engine's namespace is to be used
295 as the `globals` when the function is called, and `bound=False` specifies that
296 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
297 execution, since the function is called in an empty namespace). Unbound execution is
298 often useful for large numbers of atomic tasks, which prevents bloating the engine's
299 memory, while bound execution lets you build on your previous work.
285 # b is still available in globals during unbound execution
286 In [13]: v0.apply_sync(lambda a: a*b, 3)
287 Out[13]: 30
300 288
289 `bound=True` specifies that the engine's namespace is to be passed as the first argument when
290 the function is called, and the default `bound=False` specifies that the normal behavior, but
291 the engine's namespace will be available as the globals() when the function is called.
301 292
302 293 Non-blocking execution
303 294 ----------------------
@@ -469,7 +460,7 b' specifying the index of the result to be requested. It is simply a shortcut to t'
469 460
470 461 .. sourcecode:: ipython
471 462
472 In [29]: dv.apply_async_bound(lambda : ev)
463 In [29]: dv.apply_async(lambda : ev)
473 464
474 465 In [30]: %result
475 466 Out[30]: [ [ 1.28167017 0.14197338],
@@ -298,6 +298,8 b' The basic cases that are checked:'
298 298 This analysis has not been proven to be rigorous, so it is likely possible for tasks
299 299 to become impossible to run in obscure situations, so a timeout may be a good choice.
300 300
301 .. _parallel_schedulers:
302
301 303 Schedulers
302 304 ==========
303 305
@@ -309,6 +311,12 b' of a controller config object.'
309 311
310 312 The built-in routing schemes:
311 313
314 To select one of these schemes, simply do::
315
316 $ ipcontrollerz --scheme <schemename>
317 for instance:
318 $ ipcontrollerz --scheme lru
319
312 320 lru: Least Recently Used
313 321
314 322 Always assign work to the least-recently-used engine. A close relative of
@@ -316,11 +324,12 b' lru: Least Recently Used'
316 324 with respect to runtime of each task.
317 325
318 326 plainrandom: Plain Random
327
319 328 Randomly picks an engine on which to run.
320 329
321 330 twobin: Two-Bin Random
322 331
323 **Depends on numpy**
332 **Requires numpy**
324 333
325 334 Pick two engines at random, and use the LRU of the two. This is known to be better
326 335 than plain random in many cases, but requires a small amount of computation.
@@ -333,7 +342,7 b' leastload: Least Load'
333 342
334 343 weighted: Weighted Two-Bin Random
335 344
336 **Depends on numpy**
345 **Requires numpy**
337 346
338 347 Pick two engines at random using the number of outstanding tasks as inverse weights,
339 348 and use the one with the lower load.
@@ -360,7 +369,7 b' Disabled features when using the ZMQ Scheduler:'
360 369 allows graceful handling of Engines coming and going. There is no way to know
361 370 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
362 371 engine until they *finish*. This makes recovery from engine shutdown very difficult.
363
372
364 373
365 374 .. note::
366 375
General Comments 0
You need to be logged in to leave comments. Login now