Show More
@@ -102,7 +102,7 b' class SerializeIt(object):' | |||||
102 | self.typeDescriptor = 'ndarray' |
|
102 | self.typeDescriptor = 'ndarray' | |
103 | self.metadata = {'shape':self.obj.shape, |
|
103 | self.metadata = {'shape':self.obj.shape, | |
104 | 'dtype':self.obj.dtype.str} |
|
104 | 'dtype':self.obj.dtype.str} | |
105 |
elif isinstance(self.obj, |
|
105 | elif isinstance(self.obj, str): | |
106 | self.typeDescriptor = 'bytes' |
|
106 | self.typeDescriptor = 'bytes' | |
107 | self.metadata = {} |
|
107 | self.metadata = {} | |
108 | elif isinstance(self.obj, buffer): |
|
108 | elif isinstance(self.obj, buffer): | |
@@ -148,7 +148,7 b' class UnSerializeIt(UnSerialized):' | |||||
148 | typeDescriptor = self.serialized.getTypeDescriptor() |
|
148 | typeDescriptor = self.serialized.getTypeDescriptor() | |
149 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': |
|
149 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': | |
150 | buf = self.serialized.getData() |
|
150 | buf = self.serialized.getData() | |
151 |
if isinstance(buf, (buffer |
|
151 | if isinstance(buf, (str, buffer)): | |
152 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) |
|
152 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) | |
153 | else: |
|
153 | else: | |
154 | # memoryview |
|
154 | # memoryview |
@@ -66,6 +66,7 b' class CannedFunction(CannedObject):' | |||||
66 | def __init__(self, f): |
|
66 | def __init__(self, f): | |
67 | self._checkType(f) |
|
67 | self._checkType(f) | |
68 | self.code = f.func_code |
|
68 | self.code = f.func_code | |
|
69 | self.defaults = f.func_defaults | |||
69 | self.__name__ = f.__name__ |
|
70 | self.__name__ = f.__name__ | |
70 |
|
71 | |||
71 | def _checkType(self, obj): |
|
72 | def _checkType(self, obj): | |
@@ -74,7 +75,7 b' class CannedFunction(CannedObject):' | |||||
74 | def getObject(self, g=None): |
|
75 | def getObject(self, g=None): | |
75 | if g is None: |
|
76 | if g is None: | |
76 | g = globals() |
|
77 | g = globals() | |
77 | newFunc = FunctionType(self.code, g) |
|
78 | newFunc = FunctionType(self.code, g, self.__name__, self.defaults) | |
78 | return newFunc |
|
79 | return newFunc | |
79 |
|
80 | |||
80 | #------------------------------------------------------------------------------- |
|
81 | #------------------------------------------------------------------------------- |
@@ -45,30 +45,29 b' from .view import DirectView, LoadBalancedView' | |||||
45 | # helpers for implementing old MEC API via client.apply |
|
45 | # helpers for implementing old MEC API via client.apply | |
46 | #-------------------------------------------------------------------------- |
|
46 | #-------------------------------------------------------------------------- | |
47 |
|
47 | |||
48 | def _push(ns): |
|
48 | def _push(user_ns, **ns): | |
49 | """helper method for implementing `client.push` via `client.apply`""" |
|
49 | """helper method for implementing `client.push` via `client.apply`""" | |
50 |
|
|
50 | user_ns.update(ns) | |
51 |
|
51 | |||
52 | def _pull(keys): |
|
52 | def _pull(user_ns, keys): | |
53 | """helper method for implementing `client.pull` via `client.apply`""" |
|
53 | """helper method for implementing `client.pull` via `client.apply`""" | |
54 | g = globals() |
|
|||
55 | if isinstance(keys, (list,tuple, set)): |
|
54 | if isinstance(keys, (list,tuple, set)): | |
56 | for key in keys: |
|
55 | for key in keys: | |
57 |
if not |
|
56 | if not user_ns.has_key(key): | |
58 | raise NameError("name '%s' is not defined"%key) |
|
57 | raise NameError("name '%s' is not defined"%key) | |
59 |
return map( |
|
58 | return map(user_ns.get, keys) | |
60 | else: |
|
59 | else: | |
61 |
if not |
|
60 | if not user_ns.has_key(keys): | |
62 | raise NameError("name '%s' is not defined"%keys) |
|
61 | raise NameError("name '%s' is not defined"%keys) | |
63 |
return |
|
62 | return user_ns.get(keys) | |
64 |
|
63 | |||
65 | def _clear(): |
|
64 | def _clear(user_ns): | |
66 | """helper method for implementing `client.clear` via `client.apply`""" |
|
65 | """helper method for implementing `client.clear` via `client.apply`""" | |
67 |
|
|
66 | user_ns.clear() | |
68 |
|
67 | |||
69 | def _execute(code): |
|
68 | def _execute(user_ns, code): | |
70 | """helper method for implementing `client.execute` via `client.apply`""" |
|
69 | """helper method for implementing `client.execute` via `client.apply`""" | |
71 |
exec code in |
|
70 | exec code in user_ns | |
72 |
|
71 | |||
73 |
|
72 | |||
74 | #-------------------------------------------------------------------------- |
|
73 | #-------------------------------------------------------------------------- | |
@@ -946,7 +945,7 b' class Client(HasTraits):' | |||||
946 | return list(Dependency(dep)) |
|
945 | return list(Dependency(dep)) | |
947 |
|
946 | |||
948 | @defaultblock |
|
947 | @defaultblock | |
949 |
def apply(self, f, args=None, kwargs=None, bound= |
|
948 | def apply(self, f, args=None, kwargs=None, bound=False, block=None, | |
950 | targets=None, balanced=None, |
|
949 | targets=None, balanced=None, | |
951 | after=None, follow=None, timeout=None, |
|
950 | after=None, follow=None, timeout=None, | |
952 | track=False): |
|
951 | track=False): | |
@@ -963,9 +962,8 b' class Client(HasTraits):' | |||||
963 | The positional arguments passed to `f` |
|
962 | The positional arguments passed to `f` | |
964 | kwargs : dict |
|
963 | kwargs : dict | |
965 | The keyword arguments passed to `f` |
|
964 | The keyword arguments passed to `f` | |
966 |
bound : bool (default: |
|
965 | bound : bool (default: False) | |
967 |
Whether to |
|
966 | Whether to pass the Engine(s) Namespace as the first argument to `f`. | |
968 | namespace not affecting the engine. |
|
|||
969 | block : bool (default: self.block) |
|
967 | block : bool (default: self.block) | |
970 | Whether to wait for the result, or return immediately. |
|
968 | Whether to wait for the result, or return immediately. | |
971 | False: |
|
969 | False: | |
@@ -1171,12 +1169,12 b' class Client(HasTraits):' | |||||
1171 | #-------------------------------------------------------------------------- |
|
1169 | #-------------------------------------------------------------------------- | |
1172 |
|
1170 | |||
1173 | @defaultblock |
|
1171 | @defaultblock | |
1174 |
def remote(self, bound= |
|
1172 | def remote(self, bound=False, block=None, targets=None, balanced=None): | |
1175 | """Decorator for making a RemoteFunction""" |
|
1173 | """Decorator for making a RemoteFunction""" | |
1176 | return remote(self, bound=bound, targets=targets, block=block, balanced=balanced) |
|
1174 | return remote(self, bound=bound, targets=targets, block=block, balanced=balanced) | |
1177 |
|
1175 | |||
1178 | @defaultblock |
|
1176 | @defaultblock | |
1179 |
def parallel(self, dist='b', bound= |
|
1177 | def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None): | |
1180 | """Decorator for making a ParallelFunction""" |
|
1178 | """Decorator for making a ParallelFunction""" | |
1181 | return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced) |
|
1179 | return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced) | |
1182 |
|
1180 | |||
@@ -1249,19 +1247,21 b' class Client(HasTraits):' | |||||
1249 | """Push the contents of `ns` into the namespace on `target`""" |
|
1247 | """Push the contents of `ns` into the namespace on `target`""" | |
1250 | if not isinstance(ns, dict): |
|
1248 | if not isinstance(ns, dict): | |
1251 | raise TypeError("Must be a dict, not %s"%type(ns)) |
|
1249 | raise TypeError("Must be a dict, not %s"%type(ns)) | |
1252 |
result = self.apply(_push, |
|
1250 | result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track) | |
1253 | if not block: |
|
1251 | if not block: | |
1254 | return result |
|
1252 | return result | |
1255 |
|
1253 | |||
1256 | @defaultblock |
|
1254 | @defaultblock | |
1257 | def pull(self, keys, targets='all', block=None): |
|
1255 | def pull(self, keys, targets='all', block=None): | |
1258 | """Pull objects from `target`'s namespace by `keys`""" |
|
1256 | """Pull objects from `target`'s namespace by `keys`""" | |
1259 | if isinstance(keys, str): |
|
1257 | if isinstance(keys, basestring): | |
1260 | pass |
|
1258 | pass | |
1261 | elif isinstance(keys, (list,tuple,set)): |
|
1259 | elif isinstance(keys, (list,tuple,set)): | |
1262 | for key in keys: |
|
1260 | for key in keys: | |
1263 | if not isinstance(key, str): |
|
1261 | if not isinstance(key, basestring): | |
1264 | raise TypeError |
|
1262 | raise TypeError("keys must be str, not type %r"%type(key)) | |
|
1263 | else: | |||
|
1264 | raise TypeError("keys must be strs, not %r"%keys) | |||
1265 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) |
|
1265 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | |
1266 | return result |
|
1266 | return result | |
1267 |
|
1267 |
@@ -22,7 +22,7 b' from .asyncresult import AsyncMapResult' | |||||
22 | #----------------------------------------------------------------------------- |
|
22 | #----------------------------------------------------------------------------- | |
23 |
|
23 | |||
24 | @testdec.skip_doctest |
|
24 | @testdec.skip_doctest | |
25 |
def remote(client, bound= |
|
25 | def remote(client, bound=False, block=None, targets=None, balanced=None): | |
26 | """Turn a function into a remote function. |
|
26 | """Turn a function into a remote function. | |
27 |
|
27 | |||
28 | This method can be used for map: |
|
28 | This method can be used for map: | |
@@ -37,7 +37,7 b' def remote(client, bound=True, block=None, targets=None, balanced=None):' | |||||
37 | return remote_function |
|
37 | return remote_function | |
38 |
|
38 | |||
39 | @testdec.skip_doctest |
|
39 | @testdec.skip_doctest | |
40 |
def parallel(client, dist='b', bound= |
|
40 | def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): | |
41 | """Turn a function into a parallel remote function. |
|
41 | """Turn a function into a parallel remote function. | |
42 |
|
42 | |||
43 | This method can be used for map: |
|
43 | This method can be used for map: |
@@ -34,7 +34,7 b' from .client import Client' | |||||
34 | from .error import wrap_exception |
|
34 | from .error import wrap_exception | |
35 | from .factory import SessionFactory |
|
35 | from .factory import SessionFactory | |
36 | from .streamsession import StreamSession |
|
36 | from .streamsession import StreamSession | |
37 | from .util import serialize_object, unpack_apply_message, ISO8601 |
|
37 | from .util import serialize_object, unpack_apply_message, ISO8601, Namespace | |
38 |
|
38 | |||
39 | def printer(*args): |
|
39 | def printer(*args): | |
40 | pprint(args, stream=sys.__stdout__) |
|
40 | pprint(args, stream=sys.__stdout__) | |
@@ -305,35 +305,38 b' class Kernel(SessionFactory):' | |||||
305 | sys.stdout.set_parent(parent) |
|
305 | sys.stdout.set_parent(parent) | |
306 | sys.stderr.set_parent(parent) |
|
306 | sys.stderr.set_parent(parent) | |
307 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
307 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |
308 | if bound: |
|
308 | working = self.user_ns | |
309 | working = self.user_ns |
|
309 | # suffix = | |
310 |
|
|
310 | prefix = "_"+str(msg_id).replace("-","")+"_" | |
311 | prefix = "_" |
|
311 | # if bound: | |
312 |
|
312 | # | ||
313 | else: |
|
313 | # else: | |
314 | working = dict() |
|
314 | # working = dict() | |
315 | suffix = prefix = "_" # prevent keyword collisions with lambda |
|
315 | # suffix = prefix = "_" # prevent keyword collisions with lambda | |
316 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
316 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
|
317 | if bound: | |||
|
318 | bound_ns = Namespace(working) | |||
|
319 | args = [bound_ns]+list(args) | |||
317 | # if f.fun |
|
320 | # if f.fun | |
318 | fname = getattr(f, '__name__', 'f') |
|
321 | fname = getattr(f, '__name__', 'f') | |
319 |
|
322 | |||
320 |
fname = prefix+ |
|
323 | fname = prefix+"f" | |
321 |
argname = prefix+"args" |
|
324 | argname = prefix+"args" | |
322 |
kwargname = prefix+"kwargs" |
|
325 | kwargname = prefix+"kwargs" | |
323 |
resultname = prefix+"result" |
|
326 | resultname = prefix+"result" | |
324 |
|
327 | |||
325 | ns = { fname : f, argname : args, kwargname : kwargs } |
|
328 | ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | |
326 | # print ns |
|
329 | # print ns | |
327 | working.update(ns) |
|
330 | working.update(ns) | |
328 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) |
|
331 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |
329 | exec code in working, working |
|
332 | try: | |
330 | result = working.get(resultname) |
|
333 | exec code in working,working | |
331 | # clear the namespace |
|
334 | result = working.get(resultname) | |
332 |
|
|
335 | finally: | |
333 | for key in ns.iterkeys(): |
|
336 | for key in ns.iterkeys(): | |
334 |
|
|
337 | working.pop(key) | |
335 |
|
|
338 | if bound: | |
336 |
|
|
339 | working.update(bound_ns) | |
337 |
|
340 | |||
338 | packed_result,buf = serialize_object(result) |
|
341 | packed_result,buf = serialize_object(result) | |
339 | result_buf = [packed_result]+buf |
|
342 | result_buf = [packed_result]+buf |
@@ -4,8 +4,7 b' import tempfile' | |||||
4 | import time |
|
4 | import time | |
5 | from subprocess import Popen, PIPE, STDOUT |
|
5 | from subprocess import Popen, PIPE, STDOUT | |
6 |
|
6 | |||
7 |
from IPython.zmq.parallel |
|
7 | from IPython.zmq.parallel import client | |
8 | from IPython.zmq.parallel.entry_point import select_random_ports |
|
|||
9 |
|
8 | |||
10 | processes = [] |
|
9 | processes = [] | |
11 | blackhole = tempfile.TemporaryFile() |
|
10 | blackhole = tempfile.TemporaryFile() | |
@@ -17,7 +16,10 b' def setup():' | |||||
17 | processes.append(cp) |
|
16 | processes.append(cp) | |
18 | time.sleep(.5) |
|
17 | time.sleep(.5) | |
19 | add_engine() |
|
18 | add_engine() | |
20 | time.sleep(2) |
|
19 | c = client.Client(profile='iptest') | |
|
20 | while not c.ids: | |||
|
21 | time.sleep(.1) | |||
|
22 | c.spin() | |||
21 |
|
23 | |||
22 | def add_engine(profile='iptest'): |
|
24 | def add_engine(profile='iptest'): | |
23 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT) |
|
25 | ep = Popen(['ipenginez']+ ['--profile', profile, '--log-level', '40'], stdout=blackhole, stderr=STDOUT) | |
@@ -42,5 +44,5 b' def teardown():' | |||||
42 | print 'killing' |
|
44 | print 'killing' | |
43 | p.kill() |
|
45 | p.kill() | |
44 | except: |
|
46 | except: | |
45 | print "couldn't shutdown process: ",p |
|
47 | print "couldn't shutdown process: ", p | |
46 |
|
48 |
@@ -91,8 +91,8 b' class ClusterTestCase(BaseZMQTestCase):' | |||||
91 | def tearDown(self): |
|
91 | def tearDown(self): | |
92 | self.client.close() |
|
92 | self.client.close() | |
93 | BaseZMQTestCase.tearDown(self) |
|
93 | BaseZMQTestCase.tearDown(self) | |
94 |
|
|
94 | # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |
95 |
|
|
95 | # [ e.wait() for e in self.engines ] | |
96 | # while len(self.client.ids) > self.base_engine_count: |
|
96 | # while len(self.client.ids) > self.base_engine_count: | |
97 | # time.sleep(.1) |
|
97 | # time.sleep(.1) | |
98 | # del self.engines |
|
98 | # del self.engines |
@@ -165,6 +165,17 b' class TestClient(ClusterTestCase):' | |||||
165 | v.execute('b=f()') |
|
165 | v.execute('b=f()') | |
166 | self.assertEquals(v['b'], 5) |
|
166 | self.assertEquals(v['b'], 5) | |
167 |
|
167 | |||
|
168 | def test_push_function_defaults(self): | |||
|
169 | """test that pushed functions preserve default args""" | |||
|
170 | def echo(a=10): | |||
|
171 | return a | |||
|
172 | self.add_engines(1) | |||
|
173 | v = self.client[-1] | |||
|
174 | v.block=True | |||
|
175 | v['f'] = echo | |||
|
176 | v.execute('b=f()') | |||
|
177 | self.assertEquals(v['b'], 10) | |||
|
178 | ||||
168 | def test_get_result(self): |
|
179 | def test_get_result(self): | |
169 | """test getting results from the Hub.""" |
|
180 | """test getting results from the Hub.""" | |
170 | c = clientmod.Client(profile='iptest') |
|
181 | c = clientmod.Client(profile='iptest') | |
@@ -195,7 +206,7 b' class TestClient(ClusterTestCase):' | |||||
195 | """) |
|
206 | """) | |
196 | v = self.client[-1] |
|
207 | v = self.client[-1] | |
197 | v.run(tmpfile, block=True) |
|
208 | v.run(tmpfile, block=True) | |
198 |
self.assertEquals(v.apply_sync |
|
209 | self.assertEquals(v.apply_sync(lambda : g()), 5) | |
199 |
|
210 | |||
200 | def test_apply_tracked(self): |
|
211 | def test_apply_tracked(self): | |
201 | """test tracking for apply""" |
|
212 | """test tracking for apply""" | |
@@ -245,8 +256,7 b' class TestClient(ClusterTestCase):' | |||||
245 | v = self.client[-1] |
|
256 | v = self.client[-1] | |
246 | v['a'] = 123 |
|
257 | v['a'] = 123 | |
247 | ra = clientmod.Reference('a') |
|
258 | ra = clientmod.Reference('a') | |
248 |
b = v.apply_sync |
|
259 | b = v.apply_sync(lambda x: x, ra) | |
249 | self.assertEquals(b, 123) |
|
260 | self.assertEquals(b, 123) | |
250 | self.assertRaisesRemote(NameError, v.apply_sync, lambda x: x, ra) |
|
|||
251 |
|
261 | |||
252 |
|
262 |
@@ -15,6 +15,23 b' from IPython.utils.newserialized import serialize, unserialize' | |||||
15 |
|
15 | |||
16 | ISO8601="%Y-%m-%dT%H:%M:%S.%f" |
|
16 | ISO8601="%Y-%m-%dT%H:%M:%S.%f" | |
17 |
|
17 | |||
|
18 | class Namespace(dict): | |||
|
19 | """Subclass of dict for attribute access to keys.""" | |||
|
20 | ||||
|
21 | def __getattr__(self, key): | |||
|
22 | """getattr aliased to getitem""" | |||
|
23 | if key in self.iterkeys(): | |||
|
24 | return self[key] | |||
|
25 | else: | |||
|
26 | raise NameError(key) | |||
|
27 | ||||
|
28 | def __setattr__(self, key, value): | |||
|
29 | """setattr aliased to setitem, with strict""" | |||
|
30 | if hasattr(dict, key): | |||
|
31 | raise KeyError("Cannot override dict keys %r"%key) | |||
|
32 | self[key] = value | |||
|
33 | ||||
|
34 | ||||
18 | class ReverseDict(dict): |
|
35 | class ReverseDict(dict): | |
19 | """simple double-keyed subset of dict methods.""" |
|
36 | """simple double-keyed subset of dict methods.""" | |
20 |
|
37 | |||
@@ -264,7 +281,18 b' def unpack_apply_message(bufs, g=None, copy=True):' | |||||
264 | for k in sorted(skwargs.iterkeys()): |
|
281 | for k in sorted(skwargs.iterkeys()): | |
265 | sa = skwargs[k] |
|
282 | sa = skwargs[k] | |
266 | if sa.data is None: |
|
283 | if sa.data is None: | |
267 |
|
|
284 | m = bufs.pop(0) | |
|
285 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
286 | if copy: | |||
|
287 | sa.data = buffer(m) | |||
|
288 | else: | |||
|
289 | sa.data = m.buffer | |||
|
290 | else: | |||
|
291 | if copy: | |||
|
292 | sa.data = m | |||
|
293 | else: | |||
|
294 | sa.data = m.bytes | |||
|
295 | ||||
268 | kwargs[k] = uncan(unserialize(sa), g) |
|
296 | kwargs[k] = uncan(unserialize(sa), g) | |
269 |
|
297 | |||
270 | return f,args,kwargs |
|
298 | return f,args,kwargs |
@@ -74,6 +74,7 b' class View(HasTraits):' | |||||
74 | """ |
|
74 | """ | |
75 | block=Bool(False) |
|
75 | block=Bool(False) | |
76 | bound=Bool(False) |
|
76 | bound=Bool(False) | |
|
77 | track=Bool(False) | |||
77 | history=List() |
|
78 | history=List() | |
78 | outstanding = Set() |
|
79 | outstanding = Set() | |
79 | results = Dict() |
|
80 | results = Dict() | |
@@ -81,7 +82,7 b' class View(HasTraits):' | |||||
81 |
|
82 | |||
82 | _ntargets = Int(1) |
|
83 | _ntargets = Int(1) | |
83 | _balanced = Bool(False) |
|
84 | _balanced = Bool(False) | |
84 | _default_names = List(['block', 'bound']) |
|
85 | _default_names = List(['block', 'bound', 'track']) | |
85 | _targets = Any() |
|
86 | _targets = Any() | |
86 |
|
87 | |||
87 | def __init__(self, client=None, targets=None): |
|
88 | def __init__(self, client=None, targets=None): | |
@@ -139,7 +140,12 b' class View(HasTraits):' | |||||
139 | block : bool |
|
140 | block : bool | |
140 | whether to wait for results |
|
141 | whether to wait for results | |
141 | bound : bool |
|
142 | bound : bool | |
142 |
whether to |
|
143 | whether to pass the client's Namespace as the first argument | |
|
144 | to functions called via `apply`. | |||
|
145 | track : bool | |||
|
146 | whether to create a MessageTracker to allow the user to | |||
|
147 | safely edit after arrays and buffers during non-copying | |||
|
148 | sends. | |||
143 | """ |
|
149 | """ | |
144 | for key in kwargs: |
|
150 | for key in kwargs: | |
145 | if key not in self._default_names: |
|
151 | if key not in self._default_names: | |
@@ -161,10 +167,11 b' class View(HasTraits):' | |||||
161 | def apply(self, f, *args, **kwargs): |
|
167 | def apply(self, f, *args, **kwargs): | |
162 | """calls f(*args, **kwargs) on remote engines, returning the result. |
|
168 | """calls f(*args, **kwargs) on remote engines, returning the result. | |
163 |
|
169 | |||
164 | This method does not involve the engine's namespace. |
|
170 | This method sets all of `client.apply`'s keyword arguments via this | |
|
171 | View's attributes. | |||
165 |
|
172 | |||
166 | if self.block is False: |
|
173 | if self.block is False: | |
167 |
returns |
|
174 | returns AsyncResult | |
168 | else: |
|
175 | else: | |
169 | returns actual result of f(*args, **kwargs) |
|
176 | returns actual result of f(*args, **kwargs) | |
170 | """ |
|
177 | """ | |
@@ -174,9 +181,7 b' class View(HasTraits):' | |||||
174 | def apply_async(self, f, *args, **kwargs): |
|
181 | def apply_async(self, f, *args, **kwargs): | |
175 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. |
|
182 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |
176 |
|
183 | |||
177 | This method does not involve the engine's namespace. |
|
184 | returns AsyncResult | |
178 |
|
||||
179 | returns msg_id |
|
|||
180 | """ |
|
185 | """ | |
181 | d = self._defaults('block', 'bound') |
|
186 | d = self._defaults('block', 'bound') | |
182 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) |
|
187 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | |
@@ -187,11 +192,9 b' class View(HasTraits):' | |||||
187 | """calls f(*args, **kwargs) on remote engines in a blocking manner, |
|
192 | """calls f(*args, **kwargs) on remote engines in a blocking manner, | |
188 | returning the result. |
|
193 | returning the result. | |
189 |
|
194 | |||
190 | This method does not involve the engine's namespace. |
|
|||
191 |
|
||||
192 | returns: actual result of f(*args, **kwargs) |
|
195 | returns: actual result of f(*args, **kwargs) | |
193 | """ |
|
196 | """ | |
194 | d = self._defaults('block', 'bound') |
|
197 | d = self._defaults('block', 'bound', 'track') | |
195 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) |
|
198 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | |
196 |
|
199 | |||
197 | # @sync_results |
|
200 | # @sync_results | |
@@ -216,9 +219,9 b' class View(HasTraits):' | |||||
216 | """calls f(*args, **kwargs) bound to engine namespace(s) |
|
219 | """calls f(*args, **kwargs) bound to engine namespace(s) | |
217 | in a nonblocking manner. |
|
220 | in a nonblocking manner. | |
218 |
|
221 | |||
219 | returns: msg_id |
|
222 | The first argument to `f` will be the Engine's Namespace | |
220 |
|
223 | |||
221 | This method has access to the targets' namespace via globals() |
|
224 | returns: AsyncResult | |
222 |
|
225 | |||
223 | """ |
|
226 | """ | |
224 | d = self._defaults('block', 'bound') |
|
227 | d = self._defaults('block', 'bound') | |
@@ -229,9 +232,9 b' class View(HasTraits):' | |||||
229 | def apply_sync_bound(self, f, *args, **kwargs): |
|
232 | def apply_sync_bound(self, f, *args, **kwargs): | |
230 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. |
|
233 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |
231 |
|
234 | |||
232 | returns: actual result of f(*args, **kwargs) |
|
235 | The first argument to `f` will be the Engine's Namespace | |
233 |
|
236 | |||
234 | This method has access to the targets' namespace via globals() |
|
237 | returns: actual result of f(*args, **kwargs) | |
235 |
|
238 | |||
236 | """ |
|
239 | """ | |
237 | d = self._defaults('block', 'bound') |
|
240 | d = self._defaults('block', 'bound') | |
@@ -323,11 +326,11 b' class View(HasTraits):' | |||||
323 | # Decorators |
|
326 | # Decorators | |
324 | #------------------------------------------------------------------- |
|
327 | #------------------------------------------------------------------- | |
325 |
|
328 | |||
326 |
def remote(self, bound= |
|
329 | def remote(self, bound=False, block=True): | |
327 | """Decorator for making a RemoteFunction""" |
|
330 | """Decorator for making a RemoteFunction""" | |
328 | return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) |
|
331 | return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
329 |
|
332 | |||
330 |
def parallel(self, dist='b', bound= |
|
333 | def parallel(self, dist='b', bound=False, block=None): | |
331 | """Decorator for making a ParallelFunction""" |
|
334 | """Decorator for making a ParallelFunction""" | |
332 | block = self.block if block is None else block |
|
335 | block = self.block if block is None else block | |
333 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) |
|
336 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
@@ -378,7 +381,7 b' class DirectView(View):' | |||||
378 | block : bool |
|
381 | block : bool | |
379 | whether to wait for the result or not [default self.block] |
|
382 | whether to wait for the result or not [default self.block] | |
380 | bound : bool |
|
383 | bound : bool | |
381 |
whether to |
|
384 | whether to pass the client's Namespace as the first argument to `f` | |
382 |
|
385 | |||
383 | Returns |
|
386 | Returns | |
384 | ------- |
|
387 | ------- | |
@@ -572,7 +575,12 b' class LoadBalancedView(View):' | |||||
572 | block : bool |
|
575 | block : bool | |
573 | whether to wait for results |
|
576 | whether to wait for results | |
574 | bound : bool |
|
577 | bound : bool | |
575 |
whether to |
|
578 | whether to pass the client's Namespace as the first argument | |
|
579 | to functions called via `apply`. | |||
|
580 | track : bool | |||
|
581 | whether to create a MessageTracker to allow the user to | |||
|
582 | safely edit after arrays and buffers during non-copying | |||
|
583 | sends. | |||
576 | follow : Dependency, list, msg_id, AsyncResult |
|
584 | follow : Dependency, list, msg_id, AsyncResult | |
577 | the location dependencies of tasks |
|
585 | the location dependencies of tasks | |
578 | after : Dependency, list, msg_id, AsyncResult |
|
586 | after : Dependency, list, msg_id, AsyncResult | |
@@ -621,7 +629,11 b' class LoadBalancedView(View):' | |||||
621 | block : bool |
|
629 | block : bool | |
622 | whether to wait for the result or not [default self.block] |
|
630 | whether to wait for the result or not [default self.block] | |
623 | bound : bool |
|
631 | bound : bool | |
624 |
whether to |
|
632 | whether to pass the client's Namespace as the first argument to `f` | |
|
633 | track : bool | |||
|
634 | whether to create a MessageTracker to allow the user to | |||
|
635 | safely edit after arrays and buffers during non-copying | |||
|
636 | sends. | |||
625 | chunk_size : int |
|
637 | chunk_size : int | |
626 | how many elements should be in each task [default 1] |
|
638 | how many elements should be in each task [default 1] | |
627 |
|
639 |
@@ -10,12 +10,12 b" view.run('communicator.py')" | |||||
10 | view.execute('com = EngineCommunicator()') |
|
10 | view.execute('com = EngineCommunicator()') | |
11 |
|
11 | |||
12 | # gather the connection information into a dict |
|
12 | # gather the connection information into a dict | |
13 |
ar = view.apply_async |
|
13 | ar = view.apply_async(lambda : com.info) | |
14 | peers = ar.get_dict() |
|
14 | peers = ar.get_dict() | |
15 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators |
|
15 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
16 |
|
16 | |||
17 | # connect the engines to each other: |
|
17 | # connect the engines to each other: | |
18 |
view.apply_sync |
|
18 | view.apply_sync(lambda pdict: com.connect(pdict), peers) | |
19 |
|
19 | |||
20 | # now all the engines are connected, and we can communicate between them: |
|
20 | # now all the engines are connected, and we can communicate between them: | |
21 |
|
21 | |||
@@ -34,7 +34,7 b' def send(client, sender, targets, msg_name, dest_name=None, block=None):' | |||||
34 | msg = globals()[m_name] |
|
34 | msg = globals()[m_name] | |
35 | return com.send(targets, msg) |
|
35 | return com.send(targets, msg) | |
36 |
|
36 | |||
37 |
client[sender].apply_async |
|
37 | client[sender].apply_async(_send, targets, msg_name) | |
38 |
|
38 | |||
39 | return client[targets].execute('%s=com.recv()'%dest_name, block=None) |
|
39 | return client[targets].execute('%s=com.recv()'%dest_name, block=None) | |
40 |
|
40 |
@@ -42,7 +42,7 b' print "done"' | |||||
42 |
|
42 | |||
43 | # Run 10m digits on 1 engine |
|
43 | # Run 10m digits on 1 engine | |
44 | t1 = clock() |
|
44 | t1 = clock() | |
45 |
freqs10m = c[id0].apply_sync |
|
45 | freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0]) | |
46 | t2 = clock() |
|
46 | t2 = clock() | |
47 | digits_per_second1 = 10.0e6/(t2-t1) |
|
47 | digits_per_second1 = 10.0e6/(t2-t1) | |
48 | print "Digits per second (1 core, 10m digits): ", digits_per_second1 |
|
48 | print "Digits per second (1 core, 10m digits): ", digits_per_second1 |
@@ -68,7 +68,7 b' constructed via list-access to the client:' | |||||
68 |
|
68 | |||
69 | .. seealso:: |
|
69 | .. seealso:: | |
70 |
|
70 | |||
71 |
For more information, see the in-depth explanation of :ref:`Views <parallel_ |
|
71 | For more information, see the in-depth explanation of :ref:`Views <parallel_details>`. | |
72 |
|
72 | |||
73 |
|
73 | |||
74 | Quick and easy parallelism |
|
74 | Quick and easy parallelism | |
@@ -232,8 +232,8 b' blocks until the engines are done executing the command:' | |||||
232 |
|
232 | |||
233 | In [5]: dview['b'] = 10 |
|
233 | In [5]: dview['b'] = 10 | |
234 |
|
234 | |||
235 |
In [6]: dview.apply_ |
|
235 | In [6]: dview.apply_sync(lambda x: a+b+x, 27) | |
236 |
Out[6]: [42, 42, 42, 42] |
|
236 | Out[6]: [42, 42, 42, 42] | |
237 |
|
237 | |||
238 | Python commands can be executed on specific engines by calling execute using the ``targets`` |
|
238 | Python commands can be executed on specific engines by calling execute using the ``targets`` | |
239 | keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by |
|
239 | keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by | |
@@ -265,7 +265,13 b' Bound and unbound execution' | |||||
265 |
|
265 | |||
266 | The previous example also shows one of the most important things about the IPython |
|
266 | The previous example also shows one of the most important things about the IPython | |
267 | engines: they have a persistent user namespaces. The :meth:`apply` method can |
|
267 | engines: they have a persistent user namespaces. The :meth:`apply` method can | |
268 |
be run in either a bound or unbound manner |
|
268 | be run in either a bound or unbound manner. | |
|
269 | ||||
|
270 | When applying a function in a `bound` manner, the first argument to that function | |||
|
271 | will be the Engine's namespace, which is a :class:`Namespace` object, a dictionary | |||
|
272 | also providing attribute-access to keys. | |||
|
273 | ||||
|
274 | In all (unbound and bound) execution | |||
269 |
|
275 | |||
270 | .. sourcecode:: ipython |
|
276 | .. sourcecode:: ipython | |
271 |
|
277 | |||
@@ -273,31 +279,16 b' be run in either a bound or unbound manner:' | |||||
273 |
|
279 | |||
274 | In [10]: v0 = rc[0] |
|
280 | In [10]: v0 = rc[0] | |
275 |
|
281 | |||
276 | In [12]: v0.apply_sync_bound(lambda : b) |
|
282 | # multiply b*2 inplace | |
277 | Out[12]: 5 |
|
283 | In [12]: v0.apply_sync_bound(lambda ns: ns.b*=2) | |
278 |
|
284 | |||
279 | In [13]: v0.apply_sync(lambda : b) |
|
285 | # b is still available in globals during unbound execution | |
280 | --------------------------------------------------------------------------- |
|
286 | In [13]: v0.apply_sync(lambda a: a*b, 3) | |
281 | RemoteError Traceback (most recent call last) |
|
287 | Out[13]: 30 | |
282 | /home/you/<ipython-input-34-21a468eb10f0> in <module>() |
|
|||
283 | ----> 1 v0.apply(lambda : b) |
|
|||
284 | ... |
|
|||
285 | RemoteError: NameError(global name 'b' is not defined) |
|
|||
286 | Traceback (most recent call last): |
|
|||
287 | File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request |
|
|||
288 | exec code in working, working |
|
|||
289 | File "<string>", line 1, in <module> |
|
|||
290 | File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda> |
|
|||
291 | NameError: global name 'b' is not defined |
|
|||
292 |
|
||||
293 |
|
||||
294 | Specifically, `bound=True` specifies that the engine's namespace is to be used |
|
|||
295 | as the `globals` when the function is called, and `bound=False` specifies that |
|
|||
296 | the engine's namespace is not to be used (hence, 'b' is undefined during unbound |
|
|||
297 | execution, since the function is called in an empty namespace). Unbound execution is |
|
|||
298 | often useful for large numbers of atomic tasks, which prevents bloating the engine's |
|
|||
299 | memory, while bound execution lets you build on your previous work. |
|
|||
300 |
|
288 | |||
|
289 | `bound=True` specifies that the engine's namespace is to be passed as the first argument when | |||
|
290 | the function is called, and the default `bound=False` specifies that the normal behavior, but | |||
|
291 | the engine's namespace will be available as the globals() when the function is called. | |||
301 |
|
292 | |||
302 | Non-blocking execution |
|
293 | Non-blocking execution | |
303 | ---------------------- |
|
294 | ---------------------- | |
@@ -469,7 +460,7 b' specifying the index of the result to be requested. It is simply a shortcut to t' | |||||
469 |
|
460 | |||
470 | .. sourcecode:: ipython |
|
461 | .. sourcecode:: ipython | |
471 |
|
462 | |||
472 |
In [29]: dv.apply_async |
|
463 | In [29]: dv.apply_async(lambda : ev) | |
473 |
|
464 | |||
474 | In [30]: %result |
|
465 | In [30]: %result | |
475 | Out[30]: [ [ 1.28167017 0.14197338], |
|
466 | Out[30]: [ [ 1.28167017 0.14197338], |
@@ -298,6 +298,8 b' The basic cases that are checked:' | |||||
298 | This analysis has not been proven to be rigorous, so it is likely possible for tasks |
|
298 | This analysis has not been proven to be rigorous, so it is likely possible for tasks | |
299 | to become impossible to run in obscure situations, so a timeout may be a good choice. |
|
299 | to become impossible to run in obscure situations, so a timeout may be a good choice. | |
300 |
|
300 | |||
|
301 | .. _parallel_schedulers: | |||
|
302 | ||||
301 | Schedulers |
|
303 | Schedulers | |
302 | ========== |
|
304 | ========== | |
303 |
|
305 | |||
@@ -309,6 +311,12 b' of a controller config object.' | |||||
309 |
|
311 | |||
310 | The built-in routing schemes: |
|
312 | The built-in routing schemes: | |
311 |
|
313 | |||
|
314 | To select one of these schemes, simply do:: | |||
|
315 | ||||
|
316 | $ ipcontrollerz --scheme <schemename> | |||
|
317 | for instance: | |||
|
318 | $ ipcontrollerz --scheme lru | |||
|
319 | ||||
312 | lru: Least Recently Used |
|
320 | lru: Least Recently Used | |
313 |
|
321 | |||
314 | Always assign work to the least-recently-used engine. A close relative of |
|
322 | Always assign work to the least-recently-used engine. A close relative of | |
@@ -316,11 +324,12 b' lru: Least Recently Used' | |||||
316 | with respect to runtime of each task. |
|
324 | with respect to runtime of each task. | |
317 |
|
325 | |||
318 | plainrandom: Plain Random |
|
326 | plainrandom: Plain Random | |
|
327 | ||||
319 | Randomly picks an engine on which to run. |
|
328 | Randomly picks an engine on which to run. | |
320 |
|
329 | |||
321 | twobin: Two-Bin Random |
|
330 | twobin: Two-Bin Random | |
322 |
|
331 | |||
323 |
** |
|
332 | **Requires numpy** | |
324 |
|
333 | |||
325 | Pick two engines at random, and use the LRU of the two. This is known to be better |
|
334 | Pick two engines at random, and use the LRU of the two. This is known to be better | |
326 | than plain random in many cases, but requires a small amount of computation. |
|
335 | than plain random in many cases, but requires a small amount of computation. | |
@@ -333,7 +342,7 b' leastload: Least Load' | |||||
333 |
|
342 | |||
334 | weighted: Weighted Two-Bin Random |
|
343 | weighted: Weighted Two-Bin Random | |
335 |
|
344 | |||
336 |
** |
|
345 | **Requires numpy** | |
337 |
|
346 | |||
338 | Pick two engines at random using the number of outstanding tasks as inverse weights, |
|
347 | Pick two engines at random using the number of outstanding tasks as inverse weights, | |
339 | and use the one with the lower load. |
|
348 | and use the one with the lower load. | |
@@ -360,7 +369,7 b' Disabled features when using the ZMQ Scheduler:' | |||||
360 | allows graceful handling of Engines coming and going. There is no way to know |
|
369 | allows graceful handling of Engines coming and going. There is no way to know | |
361 | where ZeroMQ messages have gone, so there is no way to know what tasks are on which |
|
370 | where ZeroMQ messages have gone, so there is no way to know what tasks are on which | |
362 | engine until they *finish*. This makes recovery from engine shutdown very difficult. |
|
371 | engine until they *finish*. This makes recovery from engine shutdown very difficult. | |
363 |
|
372 | |||
364 |
|
373 | |||
365 | .. note:: |
|
374 | .. note:: | |
366 |
|
375 |
General Comments 0
You need to be logged in to leave comments.
Login now