##// END OF EJS Templates
tweaks related to docs + add activate() for magics
MinRK -
Show More
@@ -10,8 +10,6 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 from __future__ import print_function
14
15 13 import os
16 14 import time
17 15 from getpass import getpass
@@ -57,7 +55,7 b' def _clear():'
57 55 """helper method for implementing `client.clear` via `client.apply`"""
58 56 globals().clear()
59 57
60 def execute(code):
58 def _execute(code):
61 59 """helper method for implementing `client.execute` via `client.apply`"""
62 60 exec code in globals()
63 61
@@ -79,8 +77,10 b' def defaultblock(f, self, *args, **kwargs):'
79 77 block = self.block if block is None else block
80 78 saveblock = self.block
81 79 self.block = block
82 ret = f(self, *args, **kwargs)
83 self.block = saveblock
80 try:
81 ret = f(self, *args, **kwargs)
82 finally:
83 self.block = saveblock
84 84 return ret
85 85
86 86
@@ -198,6 +198,7 b' class Client(object):'
198 198 results = None
199 199 history = None
200 200 debug = False
201 targets = None
201 202
202 203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 204 sshserver=None, sshkey=None, password=None, paramiko=None,
@@ -205,6 +206,7 b' class Client(object):'
205 206 if context is None:
206 207 context = zmq.Context()
207 208 self.context = context
209 self.targets = 'all'
208 210 self._addr = addr
209 211 self._ssh = bool(sshserver or sshkey or password)
210 212 if self._ssh and sshserver is None:
@@ -478,7 +480,7 b' class Client(object):'
478 480
479 481 Parameters
480 482 ----------
481 msg_ids : int, str, or list of ints and/or strs
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
482 484 ints are indices to self.history
483 485 strs are msg_ids
484 486 default: wait on all outstanding messages
@@ -495,13 +497,18 b' class Client(object):'
495 497 if msg_ids is None:
496 498 theids = self.outstanding
497 499 else:
498 if isinstance(msg_ids, (int, str)):
500 if isinstance(msg_ids, (int, str, AsyncResult)):
499 501 msg_ids = [msg_ids]
500 502 theids = set()
501 503 for msg_id in msg_ids:
502 504 if isinstance(msg_id, int):
503 505 msg_id = self.history[msg_id]
506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
508 continue
504 509 theids.add(msg_id)
510 if not theids.intersection(self.outstanding):
511 return True
505 512 self.spin()
506 513 while theids.intersection(self.outstanding):
507 514 if timeout >= 0 and ( time.time()-tic ) > timeout:
@@ -607,7 +614,7 b' class Client(object):'
607 614 whether or not to wait until done to return
608 615 default: self.block
609 616 """
610 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
611 618 return result
612 619
613 620 def run(self, code, block=None):
@@ -796,11 +803,11 b' class Client(object):'
796 803 return pf.map(*sequences)
797 804
798 805 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
806 """Decorator for making a ParallelFunction."""
800 807 return parallel(self, bound=bound, targets=targets, block=block)
801 808
802 809 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
810 """Decorator for making a RemoteFunction."""
804 811 return remote(self, bound=bound, targets=targets, block=block)
805 812
806 813 #--------------------------------------------------------------------------
@@ -816,7 +823,7 b' class Client(object):'
816 823 return result
817 824
818 825 @defaultblock
819 def pull(self, keys, targets='all', block=True):
826 def pull(self, keys, targets='all', block=None):
820 827 """Pull objects from `target`'s namespace by `keys`"""
821 828 if isinstance(keys, str):
822 829 pass
@@ -827,11 +834,11 b' class Client(object):'
827 834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
828 835 return result
829 836
830 @defaultblock
831 837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
832 838 """
833 839 Partition a Python sequence and send the partitions to a set of engines.
834 840 """
841 block = block if block is not None else self.block
835 842 targets = self._build_targets(targets)[-1]
836 843 mapObject = Map.dists[dist]()
837 844 nparts = len(targets)
@@ -839,33 +846,31 b' class Client(object):'
839 846 for index, engineid in enumerate(targets):
840 847 partition = mapObject.getPartition(seq, index, nparts)
841 848 if flatten and len(partition) == 1:
842 mid = self.push({key: partition[0]}, targets=engineid, block=False)
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
843 850 else:
844 mid = self.push({key: partition}, targets=engineid, block=False)
845 msg_ids.append(mid)
851 r = self.push({key: partition}, targets=engineid, block=False)
852 msg_ids.extend(r._msg_ids)
846 853 r = AsyncResult(self, msg_ids)
847 854 if block:
848 r.wait()
849 return
855 return r.get()
850 856 else:
851 857 return r
852 858
853 @defaultblock
854 def gather(self, key, dist='b', targets='all', block=True):
859 def gather(self, key, dist='b', targets='all', block=None):
855 860 """
856 861 Gather a partitioned sequence on a set of engines as a single local seq.
857 862 """
863 block = block if block is not None else self.block
858 864
859 865 targets = self._build_targets(targets)[-1]
860 866 mapObject = Map.dists[dist]()
861 867 msg_ids = []
862 868 for index, engineid in enumerate(targets):
863 msg_ids.append(self.pull(key, targets=engineid,block=False))
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
864 870
865 871 r = AsyncMapResult(self, msg_ids, mapObject)
866 872 if block:
867 r.wait()
868 return r.result
873 return r.get()
869 874 else:
870 875 return r
871 876
@@ -980,6 +985,35 b' class Client(object):'
980 985 if content['status'] != 'ok':
981 986 raise ss.unwrap_exception(content)
982 987
988 #----------------------------------------
989 # activate for %px,%autopx magics
990 #----------------------------------------
991 def activate(self):
992 """Make this `View` active for parallel magic commands.
993
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 In a given IPython session there is a single active one. While
996 there can be many `Views` created and used by the user,
997 there is only one active one. The active `View` is used whenever
998 the magic commands %px and %autopx are used.
999
1000 The activate() method is called on a given `View` to make it
1001 active. Once this has been done, the magic commands can be used.
1002 """
1003
1004 try:
1005 # This is injected into __builtins__.
1006 ip = get_ipython()
1007 except NameError:
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 else:
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 if pmagic is not None:
1012 pmagic.active_multiengine_client = self
1013 else:
1014 print "You must first load the parallelmagic extension " \
1015 "by doing '%load_ext parallelmagic'"
1016
983 1017 class AsynClient(Client):
984 1018 """An Asynchronous client, using the Tornado Event Loop.
985 1019 !!!unfinished!!!"""
@@ -11,7 +11,7 b''
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Decorators
@@ -22,8 +22,10 b' def myblock(f, self, *args, **kwargs):'
22 22 """override client.block with self.block during a call"""
23 23 block = self.client.block
24 24 self.client.block = self.block
25 ret = f(self, *args, **kwargs)
26 self.client.block = block
25 try:
26 ret = f(self, *args, **kwargs)
27 finally:
28 self.client.block = block
27 29 return ret
28 30
29 31 @decorator
@@ -65,7 +67,6 b' class View(object):'
65 67 Don't use this class, use subclasses.
66 68 """
67 69 _targets = None
68 _ntargets = None
69 70 block=None
70 71 bound=None
71 72 history=None
@@ -75,7 +76,7 b' class View(object):'
75 76 self._targets = targets
76 77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 78 self.block = client.block
78 self.bound=True
79 self.bound=False
79 80 self.history = []
80 81 self.outstanding = set()
81 82 self.results = {}
@@ -92,7 +93,8 b' class View(object):'
92 93
93 94 @targets.setter
94 95 def targets(self, value):
95 raise AttributeError("Cannot set my targets argument after construction!")
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
96 98
97 99 @sync_results
98 100 def spin(self):
@@ -185,6 +187,10 b' class View(object):'
185 187 bound=True, targets=targets)
186 188 return pf.map(*sequences)
187 189
190 def parallel(self, bound=True, block=True):
191 """Decorator for making a ParallelFunction"""
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193
188 194 def abort(self, msg_ids=None, block=None):
189 195 """Abort jobs on my engines.
190 196
@@ -202,11 +208,12 b' class View(object):'
202 208 """Fetch the Queue status of my engines"""
203 209 return self.client.queue_status(targets=self.targets, verbose=verbose)
204 210
205 def purge_results(self, msg_ids=[],targets=[]):
211 def purge_results(self, msg_ids=[], targets=[]):
206 212 """Instruct the controller to forget specific results."""
207 213 if targets is None or targets == 'all':
208 214 targets = self.targets
209 215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216
210 217
211 218
212 219 class DirectView(View):
@@ -219,8 +226,16 b' class DirectView(View):'
219 226 >>> dv_even = client[::2]
220 227 >>> dv_some = client[1:3]
221 228
229 This object provides dictionary access
230
222 231 """
223 232
233 @sync_results
234 @save_ids
235 def execute(self, code, block=True):
236 """execute some code on my targets."""
237 return self.client.execute(code, block=self.block, targets=self.targets)
238
224 239 def update(self, ns):
225 240 """update remote namespace with dict `ns`"""
226 241 return self.client.push(ns, targets=self.targets, block=self.block)
@@ -234,6 +249,8 b' class DirectView(View):'
234 249 # block = block if block is not None else self.block
235 250 return self.client.pull(key_s, block=True, targets=self.targets)
236 251
252 @sync_results
253 @save_ids
237 254 def pull(self, key_s, block=True):
238 255 """get object(s) by `key_s` from remote namespace
239 256 will return one object if it is a key.
@@ -252,6 +269,8 b' class DirectView(View):'
252 269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
253 270 targets=targets, block=block)
254 271
272 @sync_results
273 @save_ids
255 274 def gather(self, key, dist='b', targets=None, block=True):
256 275 """
257 276 Gather a partitioned sequence on a set of engines as a single local seq.
@@ -278,6 +297,36 b' class DirectView(View):'
278 297 block = block if block is not None else self.block
279 298 return self.client.kill(targets=self.targets, block=block)
280 299
300 #----------------------------------------
301 # activate for %px,%autopx magics
302 #----------------------------------------
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
311
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
314 """
315
316 try:
317 # This is injected into __builtins__.
318 ip = get_ipython()
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
325 else:
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
328
329
281 330 class LoadBalancedView(View):
282 331 """An engine-agnostic View that only executes via the Task queue.
283 332
General Comments 0
You need to be logged in to leave comments. Login now