Show More
@@ -10,8 +10,6 b'' | |||||
10 | # Imports |
|
10 | # Imports | |
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
13 | from __future__ import print_function |
|
|||
14 |
|
||||
15 | import os |
|
13 | import os | |
16 | import time |
|
14 | import time | |
17 | from getpass import getpass |
|
15 | from getpass import getpass | |
@@ -57,7 +55,7 b' def _clear():' | |||||
57 | """helper method for implementing `client.clear` via `client.apply`""" |
|
55 | """helper method for implementing `client.clear` via `client.apply`""" | |
58 | globals().clear() |
|
56 | globals().clear() | |
59 |
|
57 | |||
60 | def execute(code): |
|
58 | def _execute(code): | |
61 | """helper method for implementing `client.execute` via `client.apply`""" |
|
59 | """helper method for implementing `client.execute` via `client.apply`""" | |
62 | exec code in globals() |
|
60 | exec code in globals() | |
63 |
|
61 | |||
@@ -79,8 +77,10 b' def defaultblock(f, self, *args, **kwargs):' | |||||
79 | block = self.block if block is None else block |
|
77 | block = self.block if block is None else block | |
80 | saveblock = self.block |
|
78 | saveblock = self.block | |
81 | self.block = block |
|
79 | self.block = block | |
82 | ret = f(self, *args, **kwargs) |
|
80 | try: | |
83 | self.block = saveblock |
|
81 | ret = f(self, *args, **kwargs) | |
|
82 | finally: | |||
|
83 | self.block = saveblock | |||
84 | return ret |
|
84 | return ret | |
85 |
|
85 | |||
86 |
|
86 | |||
@@ -198,6 +198,7 b' class Client(object):' | |||||
198 | results = None |
|
198 | results = None | |
199 | history = None |
|
199 | history = None | |
200 | debug = False |
|
200 | debug = False | |
|
201 | targets = None | |||
201 |
|
202 | |||
202 | def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False, |
|
203 | def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False, | |
203 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
204 | sshserver=None, sshkey=None, password=None, paramiko=None, | |
@@ -205,6 +206,7 b' class Client(object):' | |||||
205 | if context is None: |
|
206 | if context is None: | |
206 | context = zmq.Context() |
|
207 | context = zmq.Context() | |
207 | self.context = context |
|
208 | self.context = context | |
|
209 | self.targets = 'all' | |||
208 | self._addr = addr |
|
210 | self._addr = addr | |
209 | self._ssh = bool(sshserver or sshkey or password) |
|
211 | self._ssh = bool(sshserver or sshkey or password) | |
210 | if self._ssh and sshserver is None: |
|
212 | if self._ssh and sshserver is None: | |
@@ -478,7 +480,7 b' class Client(object):' | |||||
478 |
|
480 | |||
479 | Parameters |
|
481 | Parameters | |
480 | ---------- |
|
482 | ---------- | |
481 | msg_ids : int, str, or list of ints and/or strs |
|
483 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects | |
482 | ints are indices to self.history |
|
484 | ints are indices to self.history | |
483 | strs are msg_ids |
|
485 | strs are msg_ids | |
484 | default: wait on all outstanding messages |
|
486 | default: wait on all outstanding messages | |
@@ -495,13 +497,18 b' class Client(object):' | |||||
495 | if msg_ids is None: |
|
497 | if msg_ids is None: | |
496 | theids = self.outstanding |
|
498 | theids = self.outstanding | |
497 | else: |
|
499 | else: | |
498 | if isinstance(msg_ids, (int, str)): |
|
500 | if isinstance(msg_ids, (int, str, AsyncResult)): | |
499 | msg_ids = [msg_ids] |
|
501 | msg_ids = [msg_ids] | |
500 | theids = set() |
|
502 | theids = set() | |
501 | for msg_id in msg_ids: |
|
503 | for msg_id in msg_ids: | |
502 | if isinstance(msg_id, int): |
|
504 | if isinstance(msg_id, int): | |
503 | msg_id = self.history[msg_id] |
|
505 | msg_id = self.history[msg_id] | |
|
506 | elif isinstance(msg_id, AsyncResult): | |||
|
507 | map(theids.add, msg_id._msg_ids) | |||
|
508 | continue | |||
504 | theids.add(msg_id) |
|
509 | theids.add(msg_id) | |
|
510 | if not theids.intersection(self.outstanding): | |||
|
511 | return True | |||
505 | self.spin() |
|
512 | self.spin() | |
506 | while theids.intersection(self.outstanding): |
|
513 | while theids.intersection(self.outstanding): | |
507 | if timeout >= 0 and ( time.time()-tic ) > timeout: |
|
514 | if timeout >= 0 and ( time.time()-tic ) > timeout: | |
@@ -607,7 +614,7 b' class Client(object):' | |||||
607 | whether or not to wait until done to return |
|
614 | whether or not to wait until done to return | |
608 | default: self.block |
|
615 | default: self.block | |
609 | """ |
|
616 | """ | |
610 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) |
|
617 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) | |
611 | return result |
|
618 | return result | |
612 |
|
619 | |||
613 | def run(self, code, block=None): |
|
620 | def run(self, code, block=None): | |
@@ -796,11 +803,11 b' class Client(object):' | |||||
796 | return pf.map(*sequences) |
|
803 | return pf.map(*sequences) | |
797 |
|
804 | |||
798 | def parallel(self, bound=True, targets='all', block=True): |
|
805 | def parallel(self, bound=True, targets='all', block=True): | |
799 | """Decorator for making a ParallelFunction""" |
|
806 | """Decorator for making a ParallelFunction.""" | |
800 | return parallel(self, bound=bound, targets=targets, block=block) |
|
807 | return parallel(self, bound=bound, targets=targets, block=block) | |
801 |
|
808 | |||
802 | def remote(self, bound=True, targets='all', block=True): |
|
809 | def remote(self, bound=True, targets='all', block=True): | |
803 | """Decorator for making a RemoteFunction""" |
|
810 | """Decorator for making a RemoteFunction.""" | |
804 | return remote(self, bound=bound, targets=targets, block=block) |
|
811 | return remote(self, bound=bound, targets=targets, block=block) | |
805 |
|
812 | |||
806 | #-------------------------------------------------------------------------- |
|
813 | #-------------------------------------------------------------------------- | |
@@ -816,7 +823,7 b' class Client(object):' | |||||
816 | return result |
|
823 | return result | |
817 |
|
824 | |||
818 | @defaultblock |
|
825 | @defaultblock | |
819 |
def pull(self, keys, targets='all', block= |
|
826 | def pull(self, keys, targets='all', block=None): | |
820 | """Pull objects from `target`'s namespace by `keys`""" |
|
827 | """Pull objects from `target`'s namespace by `keys`""" | |
821 | if isinstance(keys, str): |
|
828 | if isinstance(keys, str): | |
822 | pass |
|
829 | pass | |
@@ -827,11 +834,11 b' class Client(object):' | |||||
827 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) |
|
834 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |
828 | return result |
|
835 | return result | |
829 |
|
836 | |||
830 | @defaultblock |
|
|||
831 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): |
|
837 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): | |
832 | """ |
|
838 | """ | |
833 | Partition a Python sequence and send the partitions to a set of engines. |
|
839 | Partition a Python sequence and send the partitions to a set of engines. | |
834 | """ |
|
840 | """ | |
|
841 | block = block if block is not None else self.block | |||
835 | targets = self._build_targets(targets)[-1] |
|
842 | targets = self._build_targets(targets)[-1] | |
836 | mapObject = Map.dists[dist]() |
|
843 | mapObject = Map.dists[dist]() | |
837 | nparts = len(targets) |
|
844 | nparts = len(targets) | |
@@ -839,33 +846,31 b' class Client(object):' | |||||
839 | for index, engineid in enumerate(targets): |
|
846 | for index, engineid in enumerate(targets): | |
840 | partition = mapObject.getPartition(seq, index, nparts) |
|
847 | partition = mapObject.getPartition(seq, index, nparts) | |
841 | if flatten and len(partition) == 1: |
|
848 | if flatten and len(partition) == 1: | |
842 |
|
|
849 | r = self.push({key: partition[0]}, targets=engineid, block=False) | |
843 | else: |
|
850 | else: | |
844 |
|
|
851 | r = self.push({key: partition}, targets=engineid, block=False) | |
845 |
msg_ids. |
|
852 | msg_ids.extend(r._msg_ids) | |
846 | r = AsyncResult(self, msg_ids) |
|
853 | r = AsyncResult(self, msg_ids) | |
847 | if block: |
|
854 | if block: | |
848 |
r. |
|
855 | return r.get() | |
849 | return |
|
|||
850 | else: |
|
856 | else: | |
851 | return r |
|
857 | return r | |
852 |
|
858 | |||
853 | @defaultblock |
|
859 | def gather(self, key, dist='b', targets='all', block=None): | |
854 | def gather(self, key, dist='b', targets='all', block=True): |
|
|||
855 | """ |
|
860 | """ | |
856 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
861 | Gather a partitioned sequence on a set of engines as a single local seq. | |
857 | """ |
|
862 | """ | |
|
863 | block = block if block is not None else self.block | |||
858 |
|
864 | |||
859 | targets = self._build_targets(targets)[-1] |
|
865 | targets = self._build_targets(targets)[-1] | |
860 | mapObject = Map.dists[dist]() |
|
866 | mapObject = Map.dists[dist]() | |
861 | msg_ids = [] |
|
867 | msg_ids = [] | |
862 | for index, engineid in enumerate(targets): |
|
868 | for index, engineid in enumerate(targets): | |
863 |
msg_ids. |
|
869 | msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids) | |
864 |
|
870 | |||
865 | r = AsyncMapResult(self, msg_ids, mapObject) |
|
871 | r = AsyncMapResult(self, msg_ids, mapObject) | |
866 | if block: |
|
872 | if block: | |
867 |
r. |
|
873 | return r.get() | |
868 | return r.result |
|
|||
869 | else: |
|
874 | else: | |
870 | return r |
|
875 | return r | |
871 |
|
876 | |||
@@ -980,6 +985,35 b' class Client(object):' | |||||
980 | if content['status'] != 'ok': |
|
985 | if content['status'] != 'ok': | |
981 | raise ss.unwrap_exception(content) |
|
986 | raise ss.unwrap_exception(content) | |
982 |
|
987 | |||
|
988 | #---------------------------------------- | |||
|
989 | # activate for %px,%autopx magics | |||
|
990 | #---------------------------------------- | |||
|
991 | def activate(self): | |||
|
992 | """Make this `View` active for parallel magic commands. | |||
|
993 | ||||
|
994 | IPython has a magic command syntax to work with `MultiEngineClient` objects. | |||
|
995 | In a given IPython session there is a single active one. While | |||
|
996 | there can be many `Views` created and used by the user, | |||
|
997 | there is only one active one. The active `View` is used whenever | |||
|
998 | the magic commands %px and %autopx are used. | |||
|
999 | ||||
|
1000 | The activate() method is called on a given `View` to make it | |||
|
1001 | active. Once this has been done, the magic commands can be used. | |||
|
1002 | """ | |||
|
1003 | ||||
|
1004 | try: | |||
|
1005 | # This is injected into __builtins__. | |||
|
1006 | ip = get_ipython() | |||
|
1007 | except NameError: | |||
|
1008 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |||
|
1009 | else: | |||
|
1010 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |||
|
1011 | if pmagic is not None: | |||
|
1012 | pmagic.active_multiengine_client = self | |||
|
1013 | else: | |||
|
1014 | print "You must first load the parallelmagic extension " \ | |||
|
1015 | "by doing '%load_ext parallelmagic'" | |||
|
1016 | ||||
983 | class AsynClient(Client): |
|
1017 | class AsynClient(Client): | |
984 | """An Asynchronous client, using the Tornado Event Loop. |
|
1018 | """An Asynchronous client, using the Tornado Event Loop. | |
985 | !!!unfinished!!!""" |
|
1019 | !!!unfinished!!!""" |
@@ -11,7 +11,7 b'' | |||||
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
13 | from IPython.external.decorator import decorator |
|
13 | from IPython.external.decorator import decorator | |
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction |
|
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel | |
15 |
|
15 | |||
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 | # Decorators |
|
17 | # Decorators | |
@@ -22,8 +22,10 b' def myblock(f, self, *args, **kwargs):' | |||||
22 | """override client.block with self.block during a call""" |
|
22 | """override client.block with self.block during a call""" | |
23 | block = self.client.block |
|
23 | block = self.client.block | |
24 | self.client.block = self.block |
|
24 | self.client.block = self.block | |
25 | ret = f(self, *args, **kwargs) |
|
25 | try: | |
26 | self.client.block = block |
|
26 | ret = f(self, *args, **kwargs) | |
|
27 | finally: | |||
|
28 | self.client.block = block | |||
27 | return ret |
|
29 | return ret | |
28 |
|
30 | |||
29 | @decorator |
|
31 | @decorator | |
@@ -65,7 +67,6 b' class View(object):' | |||||
65 | Don't use this class, use subclasses. |
|
67 | Don't use this class, use subclasses. | |
66 | """ |
|
68 | """ | |
67 | _targets = None |
|
69 | _targets = None | |
68 | _ntargets = None |
|
|||
69 | block=None |
|
70 | block=None | |
70 | bound=None |
|
71 | bound=None | |
71 | history=None |
|
72 | history=None | |
@@ -75,7 +76,7 b' class View(object):' | |||||
75 | self._targets = targets |
|
76 | self._targets = targets | |
76 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) |
|
77 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | |
77 | self.block = client.block |
|
78 | self.block = client.block | |
78 |
self.bound= |
|
79 | self.bound=False | |
79 | self.history = [] |
|
80 | self.history = [] | |
80 | self.outstanding = set() |
|
81 | self.outstanding = set() | |
81 | self.results = {} |
|
82 | self.results = {} | |
@@ -92,7 +93,8 b' class View(object):' | |||||
92 |
|
93 | |||
93 | @targets.setter |
|
94 | @targets.setter | |
94 | def targets(self, value): |
|
95 | def targets(self, value): | |
95 | raise AttributeError("Cannot set my targets argument after construction!") |
|
96 | self._targets = value | |
|
97 | # raise AttributeError("Cannot set my targets argument after construction!") | |||
96 |
|
98 | |||
97 | @sync_results |
|
99 | @sync_results | |
98 | def spin(self): |
|
100 | def spin(self): | |
@@ -185,6 +187,10 b' class View(object):' | |||||
185 | bound=True, targets=targets) |
|
187 | bound=True, targets=targets) | |
186 | return pf.map(*sequences) |
|
188 | return pf.map(*sequences) | |
187 |
|
189 | |||
|
190 | def parallel(self, bound=True, block=True): | |||
|
191 | """Decorator for making a ParallelFunction""" | |||
|
192 | return parallel(self.client, bound=bound, targets=self.targets, block=block) | |||
|
193 | ||||
188 | def abort(self, msg_ids=None, block=None): |
|
194 | def abort(self, msg_ids=None, block=None): | |
189 | """Abort jobs on my engines. |
|
195 | """Abort jobs on my engines. | |
190 |
|
196 | |||
@@ -202,11 +208,12 b' class View(object):' | |||||
202 | """Fetch the Queue status of my engines""" |
|
208 | """Fetch the Queue status of my engines""" | |
203 | return self.client.queue_status(targets=self.targets, verbose=verbose) |
|
209 | return self.client.queue_status(targets=self.targets, verbose=verbose) | |
204 |
|
210 | |||
205 | def purge_results(self, msg_ids=[],targets=[]): |
|
211 | def purge_results(self, msg_ids=[], targets=[]): | |
206 | """Instruct the controller to forget specific results.""" |
|
212 | """Instruct the controller to forget specific results.""" | |
207 | if targets is None or targets == 'all': |
|
213 | if targets is None or targets == 'all': | |
208 | targets = self.targets |
|
214 | targets = self.targets | |
209 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) |
|
215 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) | |
|
216 | ||||
210 |
|
217 | |||
211 |
|
218 | |||
212 | class DirectView(View): |
|
219 | class DirectView(View): | |
@@ -219,8 +226,16 b' class DirectView(View):' | |||||
219 | >>> dv_even = client[::2] |
|
226 | >>> dv_even = client[::2] | |
220 | >>> dv_some = client[1:3] |
|
227 | >>> dv_some = client[1:3] | |
221 |
|
228 | |||
|
229 | This object provides dictionary access | |||
|
230 | ||||
222 | """ |
|
231 | """ | |
223 |
|
232 | |||
|
233 | @sync_results | |||
|
234 | @save_ids | |||
|
235 | def execute(self, code, block=True): | |||
|
236 | """execute some code on my targets.""" | |||
|
237 | return self.client.execute(code, block=self.block, targets=self.targets) | |||
|
238 | ||||
224 | def update(self, ns): |
|
239 | def update(self, ns): | |
225 | """update remote namespace with dict `ns`""" |
|
240 | """update remote namespace with dict `ns`""" | |
226 | return self.client.push(ns, targets=self.targets, block=self.block) |
|
241 | return self.client.push(ns, targets=self.targets, block=self.block) | |
@@ -234,6 +249,8 b' class DirectView(View):' | |||||
234 | # block = block if block is not None else self.block |
|
249 | # block = block if block is not None else self.block | |
235 | return self.client.pull(key_s, block=True, targets=self.targets) |
|
250 | return self.client.pull(key_s, block=True, targets=self.targets) | |
236 |
|
251 | |||
|
252 | @sync_results | |||
|
253 | @save_ids | |||
237 | def pull(self, key_s, block=True): |
|
254 | def pull(self, key_s, block=True): | |
238 | """get object(s) by `key_s` from remote namespace |
|
255 | """get object(s) by `key_s` from remote namespace | |
239 | will return one object if it is a key. |
|
256 | will return one object if it is a key. | |
@@ -252,6 +269,8 b' class DirectView(View):' | |||||
252 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
269 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, | |
253 | targets=targets, block=block) |
|
270 | targets=targets, block=block) | |
254 |
|
271 | |||
|
272 | @sync_results | |||
|
273 | @save_ids | |||
255 | def gather(self, key, dist='b', targets=None, block=True): |
|
274 | def gather(self, key, dist='b', targets=None, block=True): | |
256 | """ |
|
275 | """ | |
257 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
276 | Gather a partitioned sequence on a set of engines as a single local seq. | |
@@ -278,6 +297,36 b' class DirectView(View):' | |||||
278 | block = block if block is not None else self.block |
|
297 | block = block if block is not None else self.block | |
279 | return self.client.kill(targets=self.targets, block=block) |
|
298 | return self.client.kill(targets=self.targets, block=block) | |
280 |
|
299 | |||
|
300 | #---------------------------------------- | |||
|
301 | # activate for %px,%autopx magics | |||
|
302 | #---------------------------------------- | |||
|
303 | def activate(self): | |||
|
304 | """Make this `View` active for parallel magic commands. | |||
|
305 | ||||
|
306 | IPython has a magic command syntax to work with `MultiEngineClient` objects. | |||
|
307 | In a given IPython session there is a single active one. While | |||
|
308 | there can be many `Views` created and used by the user, | |||
|
309 | there is only one active one. The active `View` is used whenever | |||
|
310 | the magic commands %px and %autopx are used. | |||
|
311 | ||||
|
312 | The activate() method is called on a given `View` to make it | |||
|
313 | active. Once this has been done, the magic commands can be used. | |||
|
314 | """ | |||
|
315 | ||||
|
316 | try: | |||
|
317 | # This is injected into __builtins__. | |||
|
318 | ip = get_ipython() | |||
|
319 | except NameError: | |||
|
320 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |||
|
321 | else: | |||
|
322 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |||
|
323 | if pmagic is not None: | |||
|
324 | pmagic.active_multiengine_client = self | |||
|
325 | else: | |||
|
326 | print "You must first load the parallelmagic extension " \ | |||
|
327 | "by doing '%load_ext parallelmagic'" | |||
|
328 | ||||
|
329 | ||||
281 | class LoadBalancedView(View): |
|
330 | class LoadBalancedView(View): | |
282 | """An engine-agnostic View that only executes via the Task queue. |
|
331 | """An engine-agnostic View that only executes via the Task queue. | |
283 |
|
332 |
General Comments 0
You need to be logged in to leave comments.
Login now