##// END OF EJS Templates
tweaks related to docs + add activate() for magics
MinRK -
Show More
@@ -10,8 +10,6 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
14
15 import os
13 import os
16 import time
14 import time
17 from getpass import getpass
15 from getpass import getpass
@@ -57,7 +55,7 b' def _clear():'
57 """helper method for implementing `client.clear` via `client.apply`"""
55 """helper method for implementing `client.clear` via `client.apply`"""
58 globals().clear()
56 globals().clear()
59
57
60 def execute(code):
58 def _execute(code):
61 """helper method for implementing `client.execute` via `client.apply`"""
59 """helper method for implementing `client.execute` via `client.apply`"""
62 exec code in globals()
60 exec code in globals()
63
61
@@ -79,8 +77,10 b' def defaultblock(f, self, *args, **kwargs):'
79 block = self.block if block is None else block
77 block = self.block if block is None else block
80 saveblock = self.block
78 saveblock = self.block
81 self.block = block
79 self.block = block
82 ret = f(self, *args, **kwargs)
80 try:
83 self.block = saveblock
81 ret = f(self, *args, **kwargs)
82 finally:
83 self.block = saveblock
84 return ret
84 return ret
85
85
86
86
@@ -198,6 +198,7 b' class Client(object):'
198 results = None
198 results = None
199 history = None
199 history = None
200 debug = False
200 debug = False
201 targets = None
201
202
202 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
203 sshserver=None, sshkey=None, password=None, paramiko=None,
204 sshserver=None, sshkey=None, password=None, paramiko=None,
@@ -205,6 +206,7 b' class Client(object):'
205 if context is None:
206 if context is None:
206 context = zmq.Context()
207 context = zmq.Context()
207 self.context = context
208 self.context = context
209 self.targets = 'all'
208 self._addr = addr
210 self._addr = addr
209 self._ssh = bool(sshserver or sshkey or password)
211 self._ssh = bool(sshserver or sshkey or password)
210 if self._ssh and sshserver is None:
212 if self._ssh and sshserver is None:
@@ -478,7 +480,7 b' class Client(object):'
478
480
479 Parameters
481 Parameters
480 ----------
482 ----------
481 msg_ids : int, str, or list of ints and/or strs
483 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
482 ints are indices to self.history
484 ints are indices to self.history
483 strs are msg_ids
485 strs are msg_ids
484 default: wait on all outstanding messages
486 default: wait on all outstanding messages
@@ -495,13 +497,18 b' class Client(object):'
495 if msg_ids is None:
497 if msg_ids is None:
496 theids = self.outstanding
498 theids = self.outstanding
497 else:
499 else:
498 if isinstance(msg_ids, (int, str)):
500 if isinstance(msg_ids, (int, str, AsyncResult)):
499 msg_ids = [msg_ids]
501 msg_ids = [msg_ids]
500 theids = set()
502 theids = set()
501 for msg_id in msg_ids:
503 for msg_id in msg_ids:
502 if isinstance(msg_id, int):
504 if isinstance(msg_id, int):
503 msg_id = self.history[msg_id]
505 msg_id = self.history[msg_id]
506 elif isinstance(msg_id, AsyncResult):
507 map(theids.add, msg_id._msg_ids)
508 continue
504 theids.add(msg_id)
509 theids.add(msg_id)
510 if not theids.intersection(self.outstanding):
511 return True
505 self.spin()
512 self.spin()
506 while theids.intersection(self.outstanding):
513 while theids.intersection(self.outstanding):
507 if timeout >= 0 and ( time.time()-tic ) > timeout:
514 if timeout >= 0 and ( time.time()-tic ) > timeout:
@@ -607,7 +614,7 b' class Client(object):'
607 whether or not to wait until done to return
614 whether or not to wait until done to return
608 default: self.block
615 default: self.block
609 """
616 """
610 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
617 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
611 return result
618 return result
612
619
613 def run(self, code, block=None):
620 def run(self, code, block=None):
@@ -796,11 +803,11 b' class Client(object):'
796 return pf.map(*sequences)
803 return pf.map(*sequences)
797
804
798 def parallel(self, bound=True, targets='all', block=True):
805 def parallel(self, bound=True, targets='all', block=True):
799 """Decorator for making a ParallelFunction"""
806 """Decorator for making a ParallelFunction."""
800 return parallel(self, bound=bound, targets=targets, block=block)
807 return parallel(self, bound=bound, targets=targets, block=block)
801
808
802 def remote(self, bound=True, targets='all', block=True):
809 def remote(self, bound=True, targets='all', block=True):
803 """Decorator for making a RemoteFunction"""
810 """Decorator for making a RemoteFunction."""
804 return remote(self, bound=bound, targets=targets, block=block)
811 return remote(self, bound=bound, targets=targets, block=block)
805
812
806 #--------------------------------------------------------------------------
813 #--------------------------------------------------------------------------
@@ -816,7 +823,7 b' class Client(object):'
816 return result
823 return result
817
824
818 @defaultblock
825 @defaultblock
819 def pull(self, keys, targets='all', block=True):
826 def pull(self, keys, targets='all', block=None):
820 """Pull objects from `target`'s namespace by `keys`"""
827 """Pull objects from `target`'s namespace by `keys`"""
821 if isinstance(keys, str):
828 if isinstance(keys, str):
822 pass
829 pass
@@ -827,11 +834,11 b' class Client(object):'
827 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
834 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
828 return result
835 return result
829
836
830 @defaultblock
831 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
837 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
832 """
838 """
833 Partition a Python sequence and send the partitions to a set of engines.
839 Partition a Python sequence and send the partitions to a set of engines.
834 """
840 """
841 block = block if block is not None else self.block
835 targets = self._build_targets(targets)[-1]
842 targets = self._build_targets(targets)[-1]
836 mapObject = Map.dists[dist]()
843 mapObject = Map.dists[dist]()
837 nparts = len(targets)
844 nparts = len(targets)
@@ -839,33 +846,31 b' class Client(object):'
839 for index, engineid in enumerate(targets):
846 for index, engineid in enumerate(targets):
840 partition = mapObject.getPartition(seq, index, nparts)
847 partition = mapObject.getPartition(seq, index, nparts)
841 if flatten and len(partition) == 1:
848 if flatten and len(partition) == 1:
842 mid = self.push({key: partition[0]}, targets=engineid, block=False)
849 r = self.push({key: partition[0]}, targets=engineid, block=False)
843 else:
850 else:
844 mid = self.push({key: partition}, targets=engineid, block=False)
851 r = self.push({key: partition}, targets=engineid, block=False)
845 msg_ids.append(mid)
852 msg_ids.extend(r._msg_ids)
846 r = AsyncResult(self, msg_ids)
853 r = AsyncResult(self, msg_ids)
847 if block:
854 if block:
848 r.wait()
855 return r.get()
849 return
850 else:
856 else:
851 return r
857 return r
852
858
853 @defaultblock
859 def gather(self, key, dist='b', targets='all', block=None):
854 def gather(self, key, dist='b', targets='all', block=True):
855 """
860 """
856 Gather a partitioned sequence on a set of engines as a single local seq.
861 Gather a partitioned sequence on a set of engines as a single local seq.
857 """
862 """
863 block = block if block is not None else self.block
858
864
859 targets = self._build_targets(targets)[-1]
865 targets = self._build_targets(targets)[-1]
860 mapObject = Map.dists[dist]()
866 mapObject = Map.dists[dist]()
861 msg_ids = []
867 msg_ids = []
862 for index, engineid in enumerate(targets):
868 for index, engineid in enumerate(targets):
863 msg_ids.append(self.pull(key, targets=engineid,block=False))
869 msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
864
870
865 r = AsyncMapResult(self, msg_ids, mapObject)
871 r = AsyncMapResult(self, msg_ids, mapObject)
866 if block:
872 if block:
867 r.wait()
873 return r.get()
868 return r.result
869 else:
874 else:
870 return r
875 return r
871
876
@@ -980,6 +985,35 b' class Client(object):'
980 if content['status'] != 'ok':
985 if content['status'] != 'ok':
981 raise ss.unwrap_exception(content)
986 raise ss.unwrap_exception(content)
982
987
988 #----------------------------------------
989 # activate for %px,%autopx magics
990 #----------------------------------------
991 def activate(self):
992 """Make this `View` active for parallel magic commands.
993
994 IPython has a magic command syntax to work with `MultiEngineClient` objects.
995 In a given IPython session there is a single active one. While
996 there can be many `Views` created and used by the user,
997 there is only one active one. The active `View` is used whenever
998 the magic commands %px and %autopx are used.
999
1000 The activate() method is called on a given `View` to make it
1001 active. Once this has been done, the magic commands can be used.
1002 """
1003
1004 try:
1005 # This is injected into __builtins__.
1006 ip = get_ipython()
1007 except NameError:
1008 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1009 else:
1010 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1011 if pmagic is not None:
1012 pmagic.active_multiengine_client = self
1013 else:
1014 print "You must first load the parallelmagic extension " \
1015 "by doing '%load_ext parallelmagic'"
1016
983 class AsynClient(Client):
1017 class AsynClient(Client):
984 """An Asynchronous client, using the Tornado Event Loop.
1018 """An Asynchronous client, using the Tornado Event Loop.
985 !!!unfinished!!!"""
1019 !!!unfinished!!!"""
@@ -11,7 +11,7 b''
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
@@ -22,8 +22,10 b' def myblock(f, self, *args, **kwargs):'
22 """override client.block with self.block during a call"""
22 """override client.block with self.block during a call"""
23 block = self.client.block
23 block = self.client.block
24 self.client.block = self.block
24 self.client.block = self.block
25 ret = f(self, *args, **kwargs)
25 try:
26 self.client.block = block
26 ret = f(self, *args, **kwargs)
27 finally:
28 self.client.block = block
27 return ret
29 return ret
28
30
29 @decorator
31 @decorator
@@ -65,7 +67,6 b' class View(object):'
65 Don't use this class, use subclasses.
67 Don't use this class, use subclasses.
66 """
68 """
67 _targets = None
69 _targets = None
68 _ntargets = None
69 block=None
70 block=None
70 bound=None
71 bound=None
71 history=None
72 history=None
@@ -75,7 +76,7 b' class View(object):'
75 self._targets = targets
76 self._targets = targets
76 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self.block = client.block
78 self.block = client.block
78 self.bound=True
79 self.bound=False
79 self.history = []
80 self.history = []
80 self.outstanding = set()
81 self.outstanding = set()
81 self.results = {}
82 self.results = {}
@@ -92,7 +93,8 b' class View(object):'
92
93
93 @targets.setter
94 @targets.setter
94 def targets(self, value):
95 def targets(self, value):
95 raise AttributeError("Cannot set my targets argument after construction!")
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
96
98
97 @sync_results
99 @sync_results
98 def spin(self):
100 def spin(self):
@@ -185,6 +187,10 b' class View(object):'
185 bound=True, targets=targets)
187 bound=True, targets=targets)
186 return pf.map(*sequences)
188 return pf.map(*sequences)
187
189
190 def parallel(self, bound=True, block=True):
191 """Decorator for making a ParallelFunction"""
192 return parallel(self.client, bound=bound, targets=self.targets, block=block)
193
188 def abort(self, msg_ids=None, block=None):
194 def abort(self, msg_ids=None, block=None):
189 """Abort jobs on my engines.
195 """Abort jobs on my engines.
190
196
@@ -202,11 +208,12 b' class View(object):'
202 """Fetch the Queue status of my engines"""
208 """Fetch the Queue status of my engines"""
203 return self.client.queue_status(targets=self.targets, verbose=verbose)
209 return self.client.queue_status(targets=self.targets, verbose=verbose)
204
210
205 def purge_results(self, msg_ids=[],targets=[]):
211 def purge_results(self, msg_ids=[], targets=[]):
206 """Instruct the controller to forget specific results."""
212 """Instruct the controller to forget specific results."""
207 if targets is None or targets == 'all':
213 if targets is None or targets == 'all':
208 targets = self.targets
214 targets = self.targets
209 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
215 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
216
210
217
211
218
212 class DirectView(View):
219 class DirectView(View):
@@ -219,8 +226,16 b' class DirectView(View):'
219 >>> dv_even = client[::2]
226 >>> dv_even = client[::2]
220 >>> dv_some = client[1:3]
227 >>> dv_some = client[1:3]
221
228
229 This object provides dictionary access
230
222 """
231 """
223
232
233 @sync_results
234 @save_ids
235 def execute(self, code, block=True):
236 """execute some code on my targets."""
237 return self.client.execute(code, block=self.block, targets=self.targets)
238
224 def update(self, ns):
239 def update(self, ns):
225 """update remote namespace with dict `ns`"""
240 """update remote namespace with dict `ns`"""
226 return self.client.push(ns, targets=self.targets, block=self.block)
241 return self.client.push(ns, targets=self.targets, block=self.block)
@@ -234,6 +249,8 b' class DirectView(View):'
234 # block = block if block is not None else self.block
249 # block = block if block is not None else self.block
235 return self.client.pull(key_s, block=True, targets=self.targets)
250 return self.client.pull(key_s, block=True, targets=self.targets)
236
251
252 @sync_results
253 @save_ids
237 def pull(self, key_s, block=True):
254 def pull(self, key_s, block=True):
238 """get object(s) by `key_s` from remote namespace
255 """get object(s) by `key_s` from remote namespace
239 will return one object if it is a key.
256 will return one object if it is a key.
@@ -252,6 +269,8 b' class DirectView(View):'
252 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
269 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
253 targets=targets, block=block)
270 targets=targets, block=block)
254
271
272 @sync_results
273 @save_ids
255 def gather(self, key, dist='b', targets=None, block=True):
274 def gather(self, key, dist='b', targets=None, block=True):
256 """
275 """
257 Gather a partitioned sequence on a set of engines as a single local seq.
276 Gather a partitioned sequence on a set of engines as a single local seq.
@@ -278,6 +297,36 b' class DirectView(View):'
278 block = block if block is not None else self.block
297 block = block if block is not None else self.block
279 return self.client.kill(targets=self.targets, block=block)
298 return self.client.kill(targets=self.targets, block=block)
280
299
300 #----------------------------------------
301 # activate for %px,%autopx magics
302 #----------------------------------------
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
311
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
314 """
315
316 try:
317 # This is injected into __builtins__.
318 ip = get_ipython()
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
325 else:
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
328
329
281 class LoadBalancedView(View):
330 class LoadBalancedView(View):
282 """An engine-agnostic View that only executes via the Task queue.
331 """An engine-agnostic View that only executes via the Task queue.
283
332
General Comments 0
You need to be logged in to leave comments. Login now