Show More
@@ -10,8 +10,6 b'' | |||
|
10 | 10 | # Imports |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | from __future__ import print_function | |
|
14 | ||
|
15 | 13 | import os |
|
16 | 14 | import time |
|
17 | 15 | from getpass import getpass |
@@ -57,7 +55,7 b' def _clear():' | |||
|
57 | 55 | """helper method for implementing `client.clear` via `client.apply`""" |
|
58 | 56 | globals().clear() |
|
59 | 57 | |
|
60 | def execute(code): | |
|
58 | def _execute(code): | |
|
61 | 59 | """helper method for implementing `client.execute` via `client.apply`""" |
|
62 | 60 | exec code in globals() |
|
63 | 61 | |
@@ -79,8 +77,10 b' def defaultblock(f, self, *args, **kwargs):' | |||
|
79 | 77 | block = self.block if block is None else block |
|
80 | 78 | saveblock = self.block |
|
81 | 79 | self.block = block |
|
82 | ret = f(self, *args, **kwargs) | |
|
83 | self.block = saveblock | |
|
80 | try: | |
|
81 | ret = f(self, *args, **kwargs) | |
|
82 | finally: | |
|
83 | self.block = saveblock | |
|
84 | 84 | return ret |
|
85 | 85 | |
|
86 | 86 | |
@@ -198,6 +198,7 b' class Client(object):' | |||
|
198 | 198 | results = None |
|
199 | 199 | history = None |
|
200 | 200 | debug = False |
|
201 | targets = None | |
|
201 | 202 | |
|
202 | 203 | def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False, |
|
203 | 204 | sshserver=None, sshkey=None, password=None, paramiko=None, |
@@ -205,6 +206,7 b' class Client(object):' | |||
|
205 | 206 | if context is None: |
|
206 | 207 | context = zmq.Context() |
|
207 | 208 | self.context = context |
|
209 | self.targets = 'all' | |
|
208 | 210 | self._addr = addr |
|
209 | 211 | self._ssh = bool(sshserver or sshkey or password) |
|
210 | 212 | if self._ssh and sshserver is None: |
@@ -478,7 +480,7 b' class Client(object):' | |||
|
478 | 480 | |
|
479 | 481 | Parameters |
|
480 | 482 | ---------- |
|
481 | msg_ids : int, str, or list of ints and/or strs | |
|
483 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects | |
|
482 | 484 | ints are indices to self.history |
|
483 | 485 | strs are msg_ids |
|
484 | 486 | default: wait on all outstanding messages |
@@ -495,13 +497,18 b' class Client(object):' | |||
|
495 | 497 | if msg_ids is None: |
|
496 | 498 | theids = self.outstanding |
|
497 | 499 | else: |
|
498 | if isinstance(msg_ids, (int, str)): | |
|
500 | if isinstance(msg_ids, (int, str, AsyncResult)): | |
|
499 | 501 | msg_ids = [msg_ids] |
|
500 | 502 | theids = set() |
|
501 | 503 | for msg_id in msg_ids: |
|
502 | 504 | if isinstance(msg_id, int): |
|
503 | 505 | msg_id = self.history[msg_id] |
|
506 | elif isinstance(msg_id, AsyncResult): | |
|
507 | map(theids.add, msg_id._msg_ids) | |
|
508 | continue | |
|
504 | 509 | theids.add(msg_id) |
|
510 | if not theids.intersection(self.outstanding): | |
|
511 | return True | |
|
505 | 512 | self.spin() |
|
506 | 513 | while theids.intersection(self.outstanding): |
|
507 | 514 | if timeout >= 0 and ( time.time()-tic ) > timeout: |
@@ -607,7 +614,7 b' class Client(object):' | |||
|
607 | 614 | whether or not to wait until done to return |
|
608 | 615 | default: self.block |
|
609 | 616 | """ |
|
610 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |
|
617 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) | |
|
611 | 618 | return result |
|
612 | 619 | |
|
613 | 620 | def run(self, code, block=None): |
@@ -796,11 +803,11 b' class Client(object):' | |||
|
796 | 803 | return pf.map(*sequences) |
|
797 | 804 | |
|
798 | 805 | def parallel(self, bound=True, targets='all', block=True): |
|
799 | """Decorator for making a ParallelFunction""" | |
|
806 | """Decorator for making a ParallelFunction.""" | |
|
800 | 807 | return parallel(self, bound=bound, targets=targets, block=block) |
|
801 | 808 | |
|
802 | 809 | def remote(self, bound=True, targets='all', block=True): |
|
803 | """Decorator for making a RemoteFunction""" | |
|
810 | """Decorator for making a RemoteFunction.""" | |
|
804 | 811 | return remote(self, bound=bound, targets=targets, block=block) |
|
805 | 812 | |
|
806 | 813 | #-------------------------------------------------------------------------- |
@@ -816,7 +823,7 b' class Client(object):' | |||
|
816 | 823 | return result |
|
817 | 824 | |
|
818 | 825 | @defaultblock |
|
819 |
def pull(self, keys, targets='all', block= |
|
|
826 | def pull(self, keys, targets='all', block=None): | |
|
820 | 827 | """Pull objects from `target`'s namespace by `keys`""" |
|
821 | 828 | if isinstance(keys, str): |
|
822 | 829 | pass |
@@ -827,11 +834,11 b' class Client(object):' | |||
|
827 | 834 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) |
|
828 | 835 | return result |
|
829 | 836 | |
|
830 | @defaultblock | |
|
831 | 837 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): |
|
832 | 838 | """ |
|
833 | 839 | Partition a Python sequence and send the partitions to a set of engines. |
|
834 | 840 | """ |
|
841 | block = block if block is not None else self.block | |
|
835 | 842 | targets = self._build_targets(targets)[-1] |
|
836 | 843 | mapObject = Map.dists[dist]() |
|
837 | 844 | nparts = len(targets) |
@@ -839,33 +846,31 b' class Client(object):' | |||
|
839 | 846 | for index, engineid in enumerate(targets): |
|
840 | 847 | partition = mapObject.getPartition(seq, index, nparts) |
|
841 | 848 | if flatten and len(partition) == 1: |
|
842 |
|
|
|
849 | r = self.push({key: partition[0]}, targets=engineid, block=False) | |
|
843 | 850 | else: |
|
844 |
|
|
|
845 |
msg_ids. |
|
|
851 | r = self.push({key: partition}, targets=engineid, block=False) | |
|
852 | msg_ids.extend(r._msg_ids) | |
|
846 | 853 | r = AsyncResult(self, msg_ids) |
|
847 | 854 | if block: |
|
848 |
r. |
|
|
849 | return | |
|
855 | return r.get() | |
|
850 | 856 | else: |
|
851 | 857 | return r |
|
852 | 858 | |
|
853 | @defaultblock | |
|
854 | def gather(self, key, dist='b', targets='all', block=True): | |
|
859 | def gather(self, key, dist='b', targets='all', block=None): | |
|
855 | 860 | """ |
|
856 | 861 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
857 | 862 | """ |
|
863 | block = block if block is not None else self.block | |
|
858 | 864 | |
|
859 | 865 | targets = self._build_targets(targets)[-1] |
|
860 | 866 | mapObject = Map.dists[dist]() |
|
861 | 867 | msg_ids = [] |
|
862 | 868 | for index, engineid in enumerate(targets): |
|
863 |
msg_ids. |
|
|
869 | msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids) | |
|
864 | 870 | |
|
865 | 871 | r = AsyncMapResult(self, msg_ids, mapObject) |
|
866 | 872 | if block: |
|
867 |
r. |
|
|
868 | return r.result | |
|
873 | return r.get() | |
|
869 | 874 | else: |
|
870 | 875 | return r |
|
871 | 876 | |
@@ -980,6 +985,35 b' class Client(object):' | |||
|
980 | 985 | if content['status'] != 'ok': |
|
981 | 986 | raise ss.unwrap_exception(content) |
|
982 | 987 | |
|
988 | #---------------------------------------- | |
|
989 | # activate for %px,%autopx magics | |
|
990 | #---------------------------------------- | |
|
991 | def activate(self): | |
|
992 | """Make this `View` active for parallel magic commands. | |
|
993 | ||
|
994 | IPython has a magic command syntax to work with `MultiEngineClient` objects. | |
|
995 | In a given IPython session there is a single active one. While | |
|
996 | there can be many `Views` created and used by the user, | |
|
997 | there is only one active one. The active `View` is used whenever | |
|
998 | the magic commands %px and %autopx are used. | |
|
999 | ||
|
1000 | The activate() method is called on a given `View` to make it | |
|
1001 | active. Once this has been done, the magic commands can be used. | |
|
1002 | """ | |
|
1003 | ||
|
1004 | try: | |
|
1005 | # This is injected into __builtins__. | |
|
1006 | ip = get_ipython() | |
|
1007 | except NameError: | |
|
1008 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |
|
1009 | else: | |
|
1010 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |
|
1011 | if pmagic is not None: | |
|
1012 | pmagic.active_multiengine_client = self | |
|
1013 | else: | |
|
1014 | print "You must first load the parallelmagic extension " \ | |
|
1015 | "by doing '%load_ext parallelmagic'" | |
|
1016 | ||
|
983 | 1017 | class AsynClient(Client): |
|
984 | 1018 | """An Asynchronous client, using the Tornado Event Loop. |
|
985 | 1019 | !!!unfinished!!!""" |
@@ -11,7 +11,7 b'' | |||
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | 13 | from IPython.external.decorator import decorator |
|
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction | |
|
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel | |
|
15 | 15 | |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | # Decorators |
@@ -22,8 +22,10 b' def myblock(f, self, *args, **kwargs):' | |||
|
22 | 22 | """override client.block with self.block during a call""" |
|
23 | 23 | block = self.client.block |
|
24 | 24 | self.client.block = self.block |
|
25 | ret = f(self, *args, **kwargs) | |
|
26 | self.client.block = block | |
|
25 | try: | |
|
26 | ret = f(self, *args, **kwargs) | |
|
27 | finally: | |
|
28 | self.client.block = block | |
|
27 | 29 | return ret |
|
28 | 30 | |
|
29 | 31 | @decorator |
@@ -65,7 +67,6 b' class View(object):' | |||
|
65 | 67 | Don't use this class, use subclasses. |
|
66 | 68 | """ |
|
67 | 69 | _targets = None |
|
68 | _ntargets = None | |
|
69 | 70 | block=None |
|
70 | 71 | bound=None |
|
71 | 72 | history=None |
@@ -75,7 +76,7 b' class View(object):' | |||
|
75 | 76 | self._targets = targets |
|
76 | 77 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) |
|
77 | 78 | self.block = client.block |
|
78 |
self.bound= |
|
|
79 | self.bound=False | |
|
79 | 80 | self.history = [] |
|
80 | 81 | self.outstanding = set() |
|
81 | 82 | self.results = {} |
@@ -92,7 +93,8 b' class View(object):' | |||
|
92 | 93 | |
|
93 | 94 | @targets.setter |
|
94 | 95 | def targets(self, value): |
|
95 | raise AttributeError("Cannot set my targets argument after construction!") | |
|
96 | self._targets = value | |
|
97 | # raise AttributeError("Cannot set my targets argument after construction!") | |
|
96 | 98 | |
|
97 | 99 | @sync_results |
|
98 | 100 | def spin(self): |
@@ -185,6 +187,10 b' class View(object):' | |||
|
185 | 187 | bound=True, targets=targets) |
|
186 | 188 | return pf.map(*sequences) |
|
187 | 189 | |
|
190 | def parallel(self, bound=True, block=True): | |
|
191 | """Decorator for making a ParallelFunction""" | |
|
192 | return parallel(self.client, bound=bound, targets=self.targets, block=block) | |
|
193 | ||
|
188 | 194 | def abort(self, msg_ids=None, block=None): |
|
189 | 195 | """Abort jobs on my engines. |
|
190 | 196 | |
@@ -202,11 +208,12 b' class View(object):' | |||
|
202 | 208 | """Fetch the Queue status of my engines""" |
|
203 | 209 | return self.client.queue_status(targets=self.targets, verbose=verbose) |
|
204 | 210 | |
|
205 | def purge_results(self, msg_ids=[],targets=[]): | |
|
211 | def purge_results(self, msg_ids=[], targets=[]): | |
|
206 | 212 | """Instruct the controller to forget specific results.""" |
|
207 | 213 | if targets is None or targets == 'all': |
|
208 | 214 | targets = self.targets |
|
209 | 215 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) |
|
216 | ||
|
210 | 217 | |
|
211 | 218 | |
|
212 | 219 | class DirectView(View): |
@@ -219,8 +226,16 b' class DirectView(View):' | |||
|
219 | 226 | >>> dv_even = client[::2] |
|
220 | 227 | >>> dv_some = client[1:3] |
|
221 | 228 | |
|
229 | This object provides dictionary access | |
|
230 | ||
|
222 | 231 | """ |
|
223 | 232 | |
|
233 | @sync_results | |
|
234 | @save_ids | |
|
235 | def execute(self, code, block=True): | |
|
236 | """execute some code on my targets.""" | |
|
237 | return self.client.execute(code, block=self.block, targets=self.targets) | |
|
238 | ||
|
224 | 239 | def update(self, ns): |
|
225 | 240 | """update remote namespace with dict `ns`""" |
|
226 | 241 | return self.client.push(ns, targets=self.targets, block=self.block) |
@@ -234,6 +249,8 b' class DirectView(View):' | |||
|
234 | 249 | # block = block if block is not None else self.block |
|
235 | 250 | return self.client.pull(key_s, block=True, targets=self.targets) |
|
236 | 251 | |
|
252 | @sync_results | |
|
253 | @save_ids | |
|
237 | 254 | def pull(self, key_s, block=True): |
|
238 | 255 | """get object(s) by `key_s` from remote namespace |
|
239 | 256 | will return one object if it is a key. |
@@ -252,6 +269,8 b' class DirectView(View):' | |||
|
252 | 269 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
253 | 270 | targets=targets, block=block) |
|
254 | 271 | |
|
272 | @sync_results | |
|
273 | @save_ids | |
|
255 | 274 | def gather(self, key, dist='b', targets=None, block=True): |
|
256 | 275 | """ |
|
257 | 276 | Gather a partitioned sequence on a set of engines as a single local seq. |
@@ -278,6 +297,36 b' class DirectView(View):' | |||
|
278 | 297 | block = block if block is not None else self.block |
|
279 | 298 | return self.client.kill(targets=self.targets, block=block) |
|
280 | 299 | |
|
300 | #---------------------------------------- | |
|
301 | # activate for %px,%autopx magics | |
|
302 | #---------------------------------------- | |
|
303 | def activate(self): | |
|
304 | """Make this `View` active for parallel magic commands. | |
|
305 | ||
|
306 | IPython has a magic command syntax to work with `MultiEngineClient` objects. | |
|
307 | In a given IPython session there is a single active one. While | |
|
308 | there can be many `Views` created and used by the user, | |
|
309 | there is only one active one. The active `View` is used whenever | |
|
310 | the magic commands %px and %autopx are used. | |
|
311 | ||
|
312 | The activate() method is called on a given `View` to make it | |
|
313 | active. Once this has been done, the magic commands can be used. | |
|
314 | """ | |
|
315 | ||
|
316 | try: | |
|
317 | # This is injected into __builtins__. | |
|
318 | ip = get_ipython() | |
|
319 | except NameError: | |
|
320 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |
|
321 | else: | |
|
322 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |
|
323 | if pmagic is not None: | |
|
324 | pmagic.active_multiengine_client = self | |
|
325 | else: | |
|
326 | print "You must first load the parallelmagic extension " \ | |
|
327 | "by doing '%load_ext parallelmagic'" | |
|
328 | ||
|
329 | ||
|
281 | 330 | class LoadBalancedView(View): |
|
282 | 331 | """An engine-agnostic View that only executes via the Task queue. |
|
283 | 332 |
General Comments 0
You need to be logged in to leave comments.
Login now