##// END OF EJS Templates
API update involving map and load-balancing
MinRK -
Show More
@@ -147,9 +147,9 b' class UnSerializeIt(UnSerialized):'
147 147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 148 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
149 149 result.shape = self.serialized.metadata['shape']
150 # This is a hack to make the array writable. We are working with
150 # numpy arrays with frombuffer are read-only. We are working with
151 151 # the numpy folks to address this issue.
152 result = result.copy()
152 # result = result.copy()
153 153 elif typeDescriptor == 'pickle':
154 154 result = pickle.loads(self.serialized.getData())
155 155 elif typeDescriptor in ('bytes', 'buffer'):
@@ -21,7 +21,7 b' from pprint import pprint'
21 21 pjoin = os.path.join
22 22
23 23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25 25
26 26 from IPython.utils.path import get_ipython_dir
27 27 from IPython.external.decorator import decorator
@@ -203,6 +203,7 b' class Client(object):'
203 203
204 204 Attributes
205 205 ----------
206
206 207 ids : set of int engine IDs
207 208 requesting the ids attribute always synchronizes
208 209 the registration state. To request ids without synchronization,
@@ -225,17 +226,23 b' class Client(object):'
225 226
226 227 Methods
227 228 -------
228 spin : flushes incoming results and registration state changes
229 control methods spin, and requesting `ids` also ensures up to date
230
231 barrier : wait on one or more msg_ids
232 229
233 execution methods: apply/apply_bound/apply_to/apply_bound
230 spin
231 flushes incoming results and registration state changes
232 control methods spin, and requesting `ids` also ensures up to date
233
234 barrier
235 wait on one or more msg_ids
236
237 execution methods
238 apply
234 239 legacy: execute, run
235 240
236 query methods: queue_status, get_result, purge
241 query methods
242 queue_status, get_result, purge
237 243
238 control methods: abort, kill
244 control methods
245 abort, shutdown
239 246
240 247 """
241 248
@@ -265,7 +272,6 b' class Client(object):'
265 272 if context is None:
266 273 context = zmq.Context()
267 274 self.context = context
268 self.targets = 'all'
269 275
270 276 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
271 277 if self._cd is not None:
@@ -634,42 +640,18 b' class Client(object):'
634 640 #--------------------------------------------------------------------------
635 641
636 642 def __getitem__(self, key):
637 """Dict access returns DirectView multiplexer objects or,
638 if key is None, a LoadBalancedView."""
639 if key is None:
640 return LoadBalancedView(self)
641 if isinstance(key, int):
642 if key not in self.ids:
643 raise IndexError("No such engine: %i"%key)
644 return DirectView(self, key)
645
646 if isinstance(key, slice):
647 indices = range(len(self.ids))[key]
648 ids = sorted(self._ids)
649 key = [ ids[i] for i in indices ]
650 # newkeys = sorted(self._ids)[thekeys[k]]
643 """index access returns DirectView multiplexer objects
651 644
652 if isinstance(key, (tuple, list, xrange)):
653 _,targets = self._build_targets(list(key))
654 return DirectView(self, targets)
645 Must be int, slice, or list/tuple/xrange of ints"""
646 if not isinstance(key, (int, slice, tuple, list, xrange)):
647 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
655 648 else:
656 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
649 return self.view(key, balanced=False)
657 650
658 651 #--------------------------------------------------------------------------
659 652 # Begin public methods
660 653 #--------------------------------------------------------------------------
661 654
662 @property
663 def remote(self):
664 """property for convenient RemoteFunction generation.
665
666 >>> @client.remote
667 ... def getpid():
668 import os
669 return os.getpid()
670 """
671 return remote(self, block=self.block)
672
673 655 def spin(self):
674 656 """Flush any registration notifications and execution results
675 657 waiting in the ZMQ queue.
@@ -690,6 +672,7 b' class Client(object):'
690 672
691 673 Parameters
692 674 ----------
675
693 676 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
694 677 ints are indices to self.history
695 678 strs are msg_ids
@@ -700,6 +683,7 b' class Client(object):'
700 683
701 684 Returns
702 685 -------
686
703 687 True : when all msg_ids are done
704 688 False : timeout reached, some msg_ids still outstanding
705 689 """
@@ -815,6 +799,7 b' class Client(object):'
815 799
816 800 Parameters
817 801 ----------
802
818 803 code : str
819 804 the code string to be executed
820 805 targets : int/str/list of ints/strs
@@ -824,7 +809,7 b' class Client(object):'
824 809 whether or not to wait until done to return
825 810 default: self.block
826 811 """
827 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
812 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
828 813 return result
829 814
830 815 def run(self, filename, targets='all', block=None):
@@ -834,6 +819,7 b' class Client(object):'
834 819
835 820 Parameters
836 821 ----------
822
837 823 filename : str
838 824 The path to the file
839 825 targets : int/str/list of ints/strs
@@ -868,7 +854,8 b' class Client(object):'
868 854 return list(Dependency(dep))
869 855
870 856 @defaultblock
871 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
857 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
858 targets=None, balanced=None,
872 859 after=None, follow=None, timeout=None):
873 860 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
874 861
@@ -905,11 +892,34 b' class Client(object):'
905 892 if int:
906 893 Run on single engine
907 894
908 after,follow,timeout only used in `apply_balanced`. See that docstring
909 for details.
895 balanced : bool, default None
896 whether to load-balance. This will default to True
897 if targets is unspecified, or False if targets is specified.
898
899 The following arguments are only used when balanced is True:
900 after : Dependency or collection of msg_ids
901 Only for load-balanced execution (targets=None)
902 Specify a list of msg_ids as a time-based dependency.
903 This job will only be run *after* the dependencies
904 have been met.
905
906 follow : Dependency or collection of msg_ids
907 Only for load-balanced execution (targets=None)
908 Specify a list of msg_ids as a location-based dependency.
909 This job will only be run on an engine where this dependency
910 is met.
911
912 timeout : float/int or None
913 Only for load-balanced execution (targets=None)
914 Specify an amount of time (in seconds) for the scheduler to
915 wait for dependencies to be met before failing with a
916 DependencyTimeout.
917
918 after,follow,timeout only used if `balanced=True`.
910 919
911 920 Returns
912 921 -------
922
913 923 if block is False:
914 924 return AsyncResult wrapping msg_ids
915 925 output of AsyncResult.get() is identical to that of `apply(...block=True)`
@@ -921,10 +931,21 b' class Client(object):'
921 931 """
922 932
923 933 # defaults:
924 block = block if block is not None else self.block
925 934 args = args if args is not None else []
926 935 kwargs = kwargs if kwargs is not None else {}
927 936
937 if balanced is None:
938 if targets is None:
939 # default to balanced if targets unspecified
940 balanced = True
941 else:
942 # otherwise default to multiplexing
943 balanced = False
944
945 if targets is None and balanced is False:
946 # default to all if *not* balanced, and targets is unspecified
947 targets = 'all'
948
928 949 # enforce types of f,args,kwrags
929 950 if not callable(f):
930 951 raise TypeError("f must be callable, not %s"%type(f))
@@ -935,80 +956,27 b' class Client(object):'
935 956
936 957 options = dict(bound=bound, block=block, targets=targets)
937 958
938 if targets is None:
939 return self.apply_balanced(f, args, kwargs, timeout=timeout,
959 if balanced:
960 return self._apply_balanced(f, args, kwargs, timeout=timeout,
940 961 after=after, follow=follow, **options)
941 else:
942 if follow or after or timeout:
943 msg = "follow, after, and timeout args are only used for load-balanced"
944 msg += "execution."
962 elif follow or after or timeout:
963 msg = "follow, after, and timeout args are only used for"
964 msg += " load-balanced execution."
945 965 raise ValueError(msg)
966 else:
946 967 return self._apply_direct(f, args, kwargs, **options)
947 968
948 @defaultblock
949 def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
969 def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
950 970 after=None, follow=None, timeout=None):
951 971 """call f(*args, **kwargs) remotely in a load-balanced manner.
952 972
953 Parameters
954 ----------
955
956 f : function
957 The fuction to be called remotely
958 args : tuple/list
959 The positional arguments passed to `f`
960 kwargs : dict
961 The keyword arguments passed to `f`
962 bound : bool (default: True)
963 Whether to execute in the Engine(s) namespace, or in a clean
964 namespace not affecting the engine.
965 block : bool (default: self.block)
966 Whether to wait for the result, or return immediately.
967 False:
968 returns AsyncResult
969 True:
970 returns actual result(s) of f(*args, **kwargs)
971 if multiple targets:
972 list of results, matching `targets`
973 targets : int,list of ints, 'all', None
974 Specify the destination of the job.
975 if None:
976 Submit via Task queue for load-balancing.
977 if 'all':
978 Run on all active engines
979 if list:
980 Run on each specified engine
981 if int:
982 Run on single engine
983
984 after : Dependency or collection of msg_ids
985 Only for load-balanced execution (targets=None)
986 Specify a list of msg_ids as a time-based dependency.
987 This job will only be run *after* the dependencies
988 have been met.
989
990 follow : Dependency or collection of msg_ids
991 Only for load-balanced execution (targets=None)
992 Specify a list of msg_ids as a location-based dependency.
993 This job will only be run on an engine where this dependency
994 is met.
995
996 timeout : float/int or None
997 Only for load-balanced execution (targets=None)
998 Specify an amount of time (in seconds) for the scheduler to
999 wait for dependencies to be met before failing with a
1000 DependencyTimeout.
1001
1002 Returns
1003 -------
1004 if block is False:
1005 return AsyncResult wrapping msg_id
1006 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1007 else:
1008 wait for, and return actual result of `f(*args, **kwargs)`
1009
973 This is a private method, see `apply` for details.
974 Not to be called directly!
1010 975 """
1011 976
977 for kwarg in (bound, block, targets):
978 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
979
1012 980 if self._task_socket is None:
1013 981 msg = "Task farming is disabled"
1014 982 if self._task_scheme == 'pure':
@@ -1025,7 +993,6 b' class Client(object):'
1025 993 if isinstance(f, dependent):
1026 994 # soft warn on functional dependencies
1027 995 warnings.warn(msg, RuntimeWarning)
1028
1029 996
1030 997 # defaults:
1031 998 args = args if args is not None else []
@@ -1036,14 +1003,6 b' class Client(object):'
1036 1003 else:
1037 1004 idents = []
1038 1005
1039 # enforce types of f,args,kwrags
1040 if not callable(f):
1041 raise TypeError("f must be callable, not %s"%type(f))
1042 if not isinstance(args, (tuple, list)):
1043 raise TypeError("args must be tuple or list, not %s"%type(args))
1044 if not isinstance(kwargs, dict):
1045 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1046
1047 1006 after = self._build_dependency(after)
1048 1007 follow = self._build_dependency(follow)
1049 1008 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
@@ -1064,13 +1023,17 b' class Client(object):'
1064 1023 else:
1065 1024 return ar
1066 1025
1067 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
1026 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1068 1027 """Then underlying method for applying functions to specific engines
1069 1028 via the MUX queue.
1070 1029
1030 This is a private method, see `apply` for details.
1071 1031 Not to be called directly!
1072 1032 """
1073 1033
1034 for kwarg in (bound, block, targets):
1035 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
1036
1074 1037 idents,targets = self._build_targets(targets)
1075 1038
1076 1039 subheader = {}
@@ -1095,103 +1058,46 b' class Client(object):'
1095 1058 return ar
1096 1059
1097 1060 #--------------------------------------------------------------------------
1098 # Map and decorators
1061 # decorators
1099 1062 #--------------------------------------------------------------------------
1100 1063
1101 def map(self, f, *sequences, **kwargs):
1102 """Parallel version of builtin `map`, using all our engines.
1103
1104 `block` and `targets` can be passed as keyword arguments only.
1105
1106 There will be one task per target, so work will be chunked
1107 if the sequences are longer than `targets`.
1108
1109 Results can be iterated as they are ready, but will become available in chunks.
1110
1111 Parameters
1112 ----------
1113
1114 f : callable
1115 function to be mapped
1116 *sequences: one or more sequences of matching length
1117 the sequences to be distributed and passed to `f`
1118 block : bool
1119 whether to wait for the result or not [default self.block]
1120 targets : valid targets
1121 targets to be used [default self.targets]
1122
1123 Returns
1124 -------
1125
1126 if block=False:
1127 AsyncMapResult
1128 An object like AsyncResult, but which reassembles the sequence of results
1129 into a single list. AsyncMapResults can be iterated through before all
1130 results are complete.
1131 else:
1132 the result of map(f,*sequences)
1133
1134 """
1135 block = kwargs.get('block', self.block)
1136 targets = kwargs.get('targets', self.targets)
1137 assert len(sequences) > 0, "must have some sequences to map onto!"
1138 pf = ParallelFunction(self, f, block=block,
1139 bound=True, targets=targets)
1140 return pf.map(*sequences)
1141
1142 def imap(self, f, *sequences, **kwargs):
1143 """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
1144
1145 Each element will be a separate task, and will be load-balanced. This
1146 lets individual elements be ready for iteration as soon as they come.
1147
1148 Parameters
1149 ----------
1150
1151 f : callable
1152 function to be mapped
1153 *sequences: one or more sequences of matching length
1154 the sequences to be distributed and passed to `f`
1155 block : bool
1156 whether to wait for the result or not [default self.block]
1157
1158 Returns
1159 -------
1160
1161 if block=False:
1162 AsyncMapResult
1163 An object like AsyncResult, but which reassembles the sequence of results
1164 into a single list. AsyncMapResults can be iterated through before all
1165 results are complete.
1166 else:
1167 the result of map(f,*sequences)
1168
1169 """
1170
1171 block = kwargs.get('block', self.block)
1172
1173 assert len(sequences) > 0, "must have some sequences to map onto!"
1174
1175 pf = ParallelFunction(self, f, block=self.block,
1176 bound=True, targets=None)
1177 return pf.map(*sequences)
1178
1179 def parallel(self, bound=True, targets='all', block=True):
1064 @defaultblock
1065 def parallel(self, bound=True, targets='all', block=None):
1180 1066 """Decorator for making a ParallelFunction."""
1181 1067 return parallel(self, bound=bound, targets=targets, block=block)
1182 1068
1183 def remote(self, bound=True, targets='all', block=True):
1069 @defaultblock
1070 def remote(self, bound=True, targets='all', block=None):
1184 1071 """Decorator for making a RemoteFunction."""
1185 1072 return remote(self, bound=bound, targets=targets, block=block)
1186 1073
1187 1074 def view(self, targets=None, balanced=False):
1188 1075 """Method for constructing View objects"""
1189 if not balanced:
1190 if not targets:
1076 if targets is None:
1077 if balanced:
1078 return LoadBalancedView(client=self)
1079 else:
1191 1080 targets = slice(None)
1192 return self[targets]
1081
1082 if balanced:
1083 view_class = LoadBalancedView
1084 else:
1085 view_class = DirectView
1086 if isinstance(targets, int):
1087 if targets not in self.ids:
1088 raise IndexError("No such engine: %i"%targets)
1089 return view_class(client=self, targets=targets)
1090
1091 if isinstance(targets, slice):
1092 indices = range(len(self.ids))[targets]
1093 ids = sorted(self._ids)
1094 targets = [ ids[i] for i in indices ]
1095
1096 if isinstance(targets, (tuple, list, xrange)):
1097 _,targets = self._build_targets(list(targets))
1098 return view_class(client=self, targets=targets)
1193 1099 else:
1194 return LoadBalancedView(self, targets)
1100 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1195 1101
1196 1102 #--------------------------------------------------------------------------
1197 1103 # Data movement
@@ -1202,7 +1108,7 b' class Client(object):'
1202 1108 """Push the contents of `ns` into the namespace on `target`"""
1203 1109 if not isinstance(ns, dict):
1204 1110 raise TypeError("Must be a dict, not %s"%type(ns))
1205 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1111 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1206 1112 return result
1207 1113
1208 1114 @defaultblock
@@ -1214,14 +1120,14 b' class Client(object):'
1214 1120 for key in keys:
1215 1121 if not isinstance(key, str):
1216 1122 raise TypeError
1217 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1123 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1218 1124 return result
1219 1125
1126 @defaultblock
1220 1127 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1221 1128 """
1222 1129 Partition a Python sequence and send the partitions to a set of engines.
1223 1130 """
1224 block = block if block is not None else self.block
1225 1131 targets = self._build_targets(targets)[-1]
1226 1132 mapObject = Map.dists[dist]()
1227 1133 nparts = len(targets)
@@ -1239,11 +1145,11 b' class Client(object):'
1239 1145 else:
1240 1146 return r
1241 1147
1148 @defaultblock
1242 1149 def gather(self, key, dist='b', targets='all', block=None):
1243 1150 """
1244 1151 Gather a partitioned sequence on a set of engines as a single local seq.
1245 1152 """
1246 block = block if block is not None else self.block
1247 1153
1248 1154 targets = self._build_targets(targets)[-1]
1249 1155 mapObject = Map.dists[dist]()
@@ -1267,6 +1173,7 b' class Client(object):'
1267 1173
1268 1174 Parameters
1269 1175 ----------
1176
1270 1177 msg_ids : list of ints or msg_ids
1271 1178 if int:
1272 1179 Passed as index to self.history for convenience.
@@ -1351,13 +1258,14 b' class Client(object):'
1351 1258 return content
1352 1259
1353 1260 @spinfirst
1354 def queue_status(self, targets=None, verbose=False):
1261 def queue_status(self, targets='all', verbose=False):
1355 1262 """Fetch the status of engine queues.
1356 1263
1357 1264 Parameters
1358 1265 ----------
1266
1359 1267 targets : int/str/list of ints/strs
1360 the engines on which to execute
1268 the engines whose states are to be queried.
1361 1269 default : all
1362 1270 verbose : bool
1363 1271 Whether to return lengths only, or lists of ids for each element
@@ -1383,6 +1291,7 b' class Client(object):'
1383 1291
1384 1292 Parameters
1385 1293 ----------
1294
1386 1295 msg_ids : str or list of strs
1387 1296 the msg_ids whose results should be forgotten.
1388 1297 targets : int/str/list of ints/strs
@@ -1404,59 +1313,6 b' class Client(object):'
1404 1313 if content['status'] != 'ok':
1405 1314 raise ss.unwrap_exception(content)
1406 1315
1407 #----------------------------------------
1408 # activate for %px,%autopx magics
1409 #----------------------------------------
1410 def activate(self):
1411 """Make this `View` active for parallel magic commands.
1412
1413 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1414 In a given IPython session there is a single active one. While
1415 there can be many `Views` created and used by the user,
1416 there is only one active one. The active `View` is used whenever
1417 the magic commands %px and %autopx are used.
1418
1419 The activate() method is called on a given `View` to make it
1420 active. Once this has been done, the magic commands can be used.
1421 """
1422
1423 try:
1424 # This is injected into __builtins__.
1425 ip = get_ipython()
1426 except NameError:
1427 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1428 else:
1429 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1430 if pmagic is not None:
1431 pmagic.active_multiengine_client = self
1432 else:
1433 print "You must first load the parallelmagic extension " \
1434 "by doing '%load_ext parallelmagic'"
1435
1436 class AsynClient(Client):
1437 """An Asynchronous client, using the Tornado Event Loop.
1438 !!!unfinished!!!"""
1439 io_loop = None
1440 _queue_stream = None
1441 _notifier_stream = None
1442 _task_stream = None
1443 _control_stream = None
1444
1445 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1446 Client.__init__(self, addr, context, username, debug)
1447 if io_loop is None:
1448 io_loop = ioloop.IOLoop.instance()
1449 self.io_loop = io_loop
1450
1451 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1452 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1453 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1454 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1455
1456 def spin(self):
1457 for stream in (self.queue_stream, self.notifier_stream,
1458 self.task_stream, self.control_stream):
1459 stream.flush()
1460 1316
1461 1317 __all__ = [ 'Client',
1462 1318 'depend',
@@ -17,7 +17,7 b' from asyncresult import AsyncMapResult'
17 17 # Decorators
18 18 #-----------------------------------------------------------------------------
19 19
20 def remote(client, bound=False, block=None, targets=None):
20 def remote(client, bound=False, block=None, targets=None, balanced=None):
21 21 """Turn a function into a remote function.
22 22
23 23 This method can be used for map:
@@ -26,10 +26,10 b' def remote(client, bound=False, block=None, targets=None):'
26 26 def func(a)
27 27 """
28 28 def remote_function(f):
29 return RemoteFunction(client, f, bound, block, targets)
29 return RemoteFunction(client, f, bound, block, targets, balanced)
30 30 return remote_function
31 31
32 def parallel(client, dist='b', bound=False, block=None, targets='all'):
32 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
33 33 """Turn a function into a parallel remote function.
34 34
35 35 This method can be used for map:
@@ -38,7 +38,7 b" def parallel(client, dist='b', bound=False, block=None, targets='all'):"
38 38 def func(a)
39 39 """
40 40 def parallel_function(f):
41 return ParallelFunction(client, f, dist, bound, block, targets)
41 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
42 42 return parallel_function
43 43
44 44 #--------------------------------------------------------------------------
@@ -62,6 +62,8 b' class RemoteFunction(object):'
62 62 to use the current `block` attribute of `client`
63 63 targets : valid target list [default: all]
64 64 The targets on which to execute.
65 balanced : bool
66 Whether to load-balance with the Task scheduler or not
65 67 """
66 68
67 69 client = None # the remote connection
@@ -69,23 +71,30 b' class RemoteFunction(object):'
69 71 block = None # whether to block
70 72 bound = None # whether to affect the namespace
71 73 targets = None # where to execute
74 balanced = None # whether to load-balance
72 75
73 def __init__(self, client, f, bound=False, block=None, targets=None):
76 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
74 77 self.client = client
75 78 self.func = f
76 79 self.block=block
77 80 self.bound=bound
78 81 self.targets=targets
82 if balanced is None:
83 if targets is None:
84 balanced = True
85 else:
86 balanced = False
87 self.balanced = balanced
79 88
80 89 def __call__(self, *args, **kwargs):
81 90 return self.client.apply(self.func, args=args, kwargs=kwargs,
82 block=self.block, targets=self.targets, bound=self.bound)
91 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
83 92
84 93
85 94 class ParallelFunction(RemoteFunction):
86 95 """Class for mapping a function to sequences."""
87 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
88 super(ParallelFunction, self).__init__(client,f,bound,block,targets)
96 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None):
97 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
89 98 mapClass = Map.dists[dist]
90 99 self.mapObject = mapClass()
91 100
@@ -93,21 +102,19 b' class ParallelFunction(RemoteFunction):'
93 102 len_0 = len(sequences[0])
94 103 for s in sequences:
95 104 if len(s)!=len_0:
96 raise ValueError('all sequences must have equal length')
105 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
106 raise ValueError(msg)
97 107
98 if self.targets is None:
99 # load-balanced:
100 engines = [None]*len_0
101 elif isinstance(self.targets, int):
102 engines = [None]*self.targets
108 if self.balanced:
109 targets = [self.targets]*len_0
103 110 else:
104 111 # multiplexed:
105 engines = self.client._build_targets(self.targets)[-1]
112 targets = self.client._build_targets(self.targets)[-1]
106 113
107 nparts = len(engines)
114 nparts = len(targets)
108 115 msg_ids = []
109 116 # my_f = lambda *a: map(self.func, *a)
110 for index, engineid in enumerate(engines):
117 for index, t in enumerate(targets):
111 118 args = []
112 119 for seq in sequences:
113 120 part = self.mapObject.getPartition(seq, index, nparts)
@@ -124,22 +131,26 b' class ParallelFunction(RemoteFunction):'
124 131 args = [self.func]+args
125 132 else:
126 133 f=self.func
127 mid = self.client.apply(f, args=args, block=False,
128 bound=self.bound,
129 targets=engineid).msg_ids[0]
130 msg_ids.append(mid)
134 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
135 targets=targets, balanced=self.balanced)
136
137 msg_ids.append(ar.msg_ids[0])
131 138
132 139 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
133 140 if self.block:
134 r.wait()
135 return r.result
141 try:
142 return r.get()
143 except KeyboardInterrupt:
144 return r
136 145 else:
137 146 return r
138 147
139 148 def map(self, *sequences):
140 149 """call a function on each element of a sequence remotely."""
141 150 self._map = True
142 ret = self.__call__(*sequences)
143 del self._map
151 try:
152 ret = self.__call__(*sequences)
153 finally:
154 del self._map
144 155 return ret
145 156
@@ -59,7 +59,7 b' def validate_url(url):'
59 59 except ValueError:
60 60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
61 61
62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
63 63
64 64 else:
65 65 # only validate tcp urls currently
@@ -10,7 +10,11 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
13 15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
14 18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15 19
16 20 #-----------------------------------------------------------------------------
@@ -61,30 +65,29 b' def spin_after(f, self, *args, **kwargs):'
61 65 # Classes
62 66 #-----------------------------------------------------------------------------
63 67
64 class View(object):
68 class View(HasTraits):
65 69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66 70
67 71 Don't use this class, use subclasses.
68 72 """
69 block=None
70 bound=None
71 history=None
72 outstanding = set()
73 results = {}
74
73 block=Bool(False)
74 bound=Bool(False)
75 history=List()
76 outstanding = Set()
77 results = Dict()
78 client = Instance('IPython.zmq.parallel.client.Client')
79
80 _ntargets = Int(1)
81 _balanced = Bool(False)
82 _default_names = List(['block', 'bound'])
75 83 _targets = None
76 _apply_name = 'apply'
77 _default_names = ['targets', 'block']
78 84
79 def __init__(self, client, targets=None):
80 self.client = client
85 def __init__(self, client=None, targets=None):
86 super(View, self).__init__(client=client)
81 87 self._targets = targets
82 88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
83 89 self.block = client.block
84 self.bound=False
85 self.history = []
86 self.outstanding = set()
87 self.results = {}
90
88 91 for name in self._default_names:
89 92 setattr(self, name, getattr(self, name, None))
90 93
@@ -101,26 +104,46 b' class View(object):'
101 104
102 105 @targets.setter
103 106 def targets(self, value):
104 self._targets = value
105 # raise AttributeError("Cannot set my targets argument after construction!")
107 raise AttributeError("Cannot set View `targets` after construction!")
106 108
107 109 def _defaults(self, *excludes):
108 110 """return dict of our default attributes, excluding names given."""
109 d = {}
111 d = dict(balanced=self._balanced, targets=self.targets)
110 112 for name in self._default_names:
111 113 if name not in excludes:
112 114 d[name] = getattr(self, name)
113 115 return d
114 116
117 def set_flags(self, **kwargs):
118 """set my attribute flags by keyword.
119
120 A View is a wrapper for the Client's apply method, but
121 with attributes that specify keyword arguments, those attributes
122 can be set by keyword argument with this method.
123
124 Parameters
125 ----------
126
127 block : bool
128 whether to wait for results
129 bound : bool
130 whether to use the client's namespace
131 """
132 for key in kwargs:
133 if key not in self._default_names:
134 raise KeyError("Invalid name: %r"%key)
135 for name in ('block', 'bound'):
136 if name in kwargs:
137 setattr(self, name, kwargs)
138
139 #----------------------------------------------------------------
140 # wrappers for client methods:
141 #----------------------------------------------------------------
115 142 @sync_results
116 143 def spin(self):
117 144 """spin the client, and sync"""
118 145 self.client.spin()
119 146
120 @property
121 def _apply(self):
122 return getattr(self.client, self._apply_name)
123
124 147 @sync_results
125 148 @save_ids
126 149 def apply(self, f, *args, **kwargs):
@@ -133,7 +156,7 b' class View(object):'
133 156 else:
134 157 returns actual result of f(*args, **kwargs)
135 158 """
136 return self._apply(f, args, kwargs, **self._defaults())
159 return self.client.apply(f, args, kwargs, **self._defaults())
137 160
138 161 @save_ids
139 162 def apply_async(self, f, *args, **kwargs):
@@ -144,7 +167,7 b' class View(object):'
144 167 returns msg_id
145 168 """
146 169 d = self._defaults('block', 'bound')
147 return self._apply(f,args,kwargs, block=False, bound=False, **d)
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
148 171
149 172 @spin_after
150 173 @save_ids
@@ -157,7 +180,7 b' class View(object):'
157 180 returns: actual result of f(*args, **kwargs)
158 181 """
159 182 d = self._defaults('block', 'bound')
160 return self._apply(f,args,kwargs, block=True, bound=False, **d)
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
161 184
162 185 @sync_results
163 186 @save_ids
@@ -173,7 +196,7 b' class View(object):'
173 196
174 197 """
175 198 d = self._defaults('bound')
176 return self._apply(f, args, kwargs, bound=True, **d)
199 return self.client.apply(f, args, kwargs, bound=True, **d)
177 200
178 201 @sync_results
179 202 @save_ids
@@ -187,7 +210,7 b' class View(object):'
187 210
188 211 """
189 212 d = self._defaults('block', 'bound')
190 return self._apply(f, args, kwargs, block=False, bound=True, **d)
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
191 214
192 215 @spin_after
193 216 @save_ids
@@ -200,23 +223,7 b' class View(object):'
200 223
201 224 """
202 225 d = self._defaults('block', 'bound')
203 return self._apply(f, args, kwargs, block=True, bound=True, **d)
204
205 @spin_after
206 @save_ids
207 def map(self, f, *sequences):
208 """Parallel version of builtin `map`, using this view's engines."""
209 if isinstance(self.targets, int):
210 targets = [self.targets]
211 else:
212 targets = self.targets
213 pf = ParallelFunction(self.client, f, block=self.block,
214 bound=True, targets=targets)
215 return pf.map(*sequences)
216
217 def parallel(self, bound=True, block=True):
218 """Decorator for making a ParallelFunction"""
219 return parallel(self.client, bound=bound, targets=self.targets, block=block)
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
220 227
221 228 def abort(self, msg_ids=None, block=None):
222 229 """Abort jobs on my engines.
@@ -240,6 +247,17 b' class View(object):'
240 247 if targets is None or targets == 'all':
241 248 targets = self.targets
242 249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
250
251 #-------------------------------------------------------------------
252 # Decorators
253 #-------------------------------------------------------------------
254 def parallel(self, bound=True, block=True):
255 """Decorator for making a ParallelFunction"""
256 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
257
258 def remote(self, bound=True, block=True):
259 """Decorator for making a RemoteFunction"""
260 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
243 261
244 262
245 263
@@ -262,6 +280,62 b' class DirectView(View):'
262 280
263 281 """
264 282
283 def __init__(self, client=None, targets=None):
284 super(DirectView, self).__init__(client=client, targets=targets)
285 self._balanced = False
286
287 @spin_after
288 @save_ids
289 def map(self, f, *sequences, **kwargs):
290 """Parallel version of builtin `map`, using this View's `targets`.
291
292 There will be one task per target, so work will be chunked
293 if the sequences are longer than `targets`.
294
295 Results can be iterated as they are ready, but will become available in chunks.
296
297 Parameters
298 ----------
299
300 f : callable
301 function to be mapped
302 *sequences: one or more sequences of matching length
303 the sequences to be distributed and passed to `f`
304 block : bool
305 whether to wait for the result or not [default self.block]
306 bound : bool
307 whether to wait for the result or not [default self.bound]
308
309 Returns
310 -------
311
312 if block=False:
313 AsyncMapResult
314 An object like AsyncResult, but which reassembles the sequence of results
315 into a single list. AsyncMapResults can be iterated through before all
316 results are complete.
317 else:
318 the result of map(f,*sequences)
319 """
320
321 block = kwargs.get('block', self.block)
322 bound = kwargs.get('bound', self.bound)
323 for k in kwargs.keys():
324 if k not in ['block', 'bound']:
325 raise TypeError("invalid keyword arg, %r"%k)
326
327 assert len(sequences) > 0, "must have some sequences to map onto!"
328 pf = ParallelFunction(self.client, f, block=block,
329 bound=bound, targets=self.targets, balanced=False)
330 return pf.map(*sequences)
331
332 def map_async(self, f, *sequences, **kwargs):
333 """Parallel version of builtin `map`, using this view's engines."""
334 if 'block' in kwargs:
335 raise TypeError("map_async doesn't take a `block` keyword argument.")
336 kwargs['block'] = True
337 return self.map(f,*sequences,**kwargs)
338
265 339 @sync_results
266 340 @save_ids
267 341 def execute(self, code, block=True):
@@ -358,14 +432,13 b' class DirectView(View):'
358 432
359 433
360 434 class LoadBalancedView(View):
361 """An engine-agnostic View that only executes via the Task queue.
435 """An load-balancing View that only executes via the Task scheduler.
362 436
363 Typically created via:
437 Load-balanced views can be created with the client's `view` method:
364 438
365 >>> v = client[None]
366 <LoadBalancedView None>
439 >>> v = client.view(balanced=True)
367 440
368 but can also be created with:
441 or targets can be specified, to restrict the potential destinations:
369 442
370 443 >>> v = client.view([1,3],balanced=True)
371 444
@@ -374,10 +447,126 b' class LoadBalancedView(View):'
374 447 """
375 448
376 449 _apply_name = 'apply_balanced'
377 _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout']
450 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
378 451
379 def __init__(self, client, targets=None):
380 super(LoadBalancedView, self).__init__(client, targets)
452 def __init__(self, client=None, targets=None):
453 super(LoadBalancedView, self).__init__(client=client, targets=targets)
381 454 self._ntargets = 1
382 self._apply_name = 'apply_balanced'
455
456 def _validate_dependency(self, dep):
457 """validate a dependency.
458
459 For use in `set_flags`.
460 """
461 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
462 return True
463 elif isinstance(dep, (list,set, tuple)):
464 for d in dep:
465 if not isinstance(d, str, AsyncResult):
466 return False
467 elif isinstance(dep, dict):
468 if set(dep.keys()) != set(Dependency().as_dict().keys()):
469 return False
470 if not isinstance(dep['msg_ids'], list):
471 return False
472 for d in dep['msg_ids']:
473 if not isinstance(d, str):
474 return False
475 else:
476 return False
477
478 def set_flags(self, **kwargs):
479 """set my attribute flags by keyword.
480
481 A View is a wrapper for the Client's apply method, but
482 with attributes that specify keyword arguments, those attributes
483 can be set by keyword argument with this method.
484
485 Parameters
486 ----------
487
488 block : bool
489 whether to wait for results
490 bound : bool
491 whether to use the engine's namespace
492 follow : Dependency, list, msg_id, AsyncResult
493 the location dependencies of tasks
494 after : Dependency, list, msg_id, AsyncResult
495 the time dependencies of tasks
496 timeout : int,None
497 the timeout to be used for tasks
498 """
499
500 super(LoadBalancedView, self).set_flags(**kwargs)
501 for name in ('follow', 'after'):
502 if name in kwargs:
503 value = kwargs[name]
504 if self._validate_dependency(value):
505 setattr(self, name, value)
506 else:
507 raise ValueError("Invalid dependency: %r"%value)
508 if 'timeout' in kwargs:
509 t = kwargs['timeout']
510 if not isinstance(t, (int, long, float, None)):
511 raise TypeError("Invalid type for timeout: %r"%type(t))
512 if t is not None:
513 if t < 0:
514 raise ValueError("Invalid timeout: %s"%t)
515 self.timeout = t
516
517 @spin_after
518 @save_ids
519 def map(self, f, *sequences, **kwargs):
520 """Parallel version of builtin `map`, load-balanced by this View.
521
522 Each element will be a separate task, and will be load-balanced. This
523 lets individual elements be available for iteration as soon as they arrive.
524
525 Parameters
526 ----------
527
528 f : callable
529 function to be mapped
530 *sequences: one or more sequences of matching length
531 the sequences to be distributed and passed to `f`
532 block : bool
533 whether to wait for the result or not [default self.block]
534 bound : bool
535 whether to use the engine's namespace
536
537 Returns
538 -------
539
540 if block=False:
541 AsyncMapResult
542 An object like AsyncResult, but which reassembles the sequence of results
543 into a single list. AsyncMapResults can be iterated through before all
544 results are complete.
545 else:
546 the result of map(f,*sequences)
547
548 """
549
550 block = kwargs.get('block', self.block)
551 bound = kwargs.get('bound', self.bound)
552
553 assert len(sequences) > 0, "must have some sequences to map onto!"
554
555 pf = ParallelFunction(self.client, f, block=block, bound=bound,
556 targets=self.targets, balanced=True)
557 return pf.map(*sequences)
558
559 def map_async(self, f, *sequences, **kwargs):
560 """Parallel version of builtin `map`, using this view's engines.
561
562 This is equivalent to map(...block=False)
563
564 See `map` for details.
565 """
566
567 if 'block' in kwargs:
568 raise TypeError("map_async doesn't take a `block` keyword argument.")
569 kwargs['block'] = True
570 return self.map(f,*sequences,**kwargs)
571
383 572
@@ -37,7 +37,7 b' module and then create a :class:`.Client` instance:'
37 37 In [2]: rc = client.Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
@@ -45,17 +45,17 b' file to the client machine, or enter its contents as arguments to the Client con'
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
48 # for a remote controller at 10.0.1.5, visible from my.server.com:
48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
49 49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
50 50
51 51
52 To make sure there are engines connected to the controller, use can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 Out[3]: set([0, 1, 2, 3])
58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
@@ -73,24 +73,25 b' Parallel map'
73 73 Python's builtin :func:`map` functions allows a function to be applied to a
74 74 sequence element-by-element. This type of code is typically trivial to
75 75 parallelize. In fact, since IPython's interface is all about functions anyway,
76 you can just use the builtin :func:`map`, or a client's :meth:`map` method:
76 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
77 DirectView's :meth:`map` method:
77 78
78 79 .. sourcecode:: ipython
79 80
80 81 In [62]: serial_result = map(lambda x:x**10, range(32))
81 82
82 In [66]: parallel_result = rc.map(lambda x: x**10, range(32))
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
83 84
84 In [67]: serial_result==parallel_result
85 In [67]: serial_result==parallel_result.get()
85 86 Out[67]: True
86 87
87 88
88 89 .. note::
89 90
90 The client's own version of :meth:`map` or that of :class:`.DirectView` do
91 The :class:`DirectView`'s version of :meth:`map` does
91 92 not do any load balancing. For a load balanced version, use a
92 93 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
93 `targets=None`.
94 `balanced=True`.
94 95
95 96 .. seealso::
96 97
@@ -105,16 +106,18 b' some decorators:'
105 106
106 107 .. sourcecode:: ipython
107 108
108 In [10]: @rc.remote(block=True)
109 In [10]: @rc.remote(block=True, targets=0)
109 110 ....: def f(x):
110 111 ....: return 10.0*x**4
111 112 ....:
112 113
113 In [11]: map(f, range(32)) # this is done in parallel
114 In [11]: map(f, range(32)) # this is done on engine 0
114 115 Out[11]: [0.0,10.0,160.0,...]
115 116
116 See the docstring for the :func:`parallel` and :func:`remote` decorators for
117 options.
117 .. seealso::
118
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
120 options.
118 121
119 122 Calling Python functions
120 123 ========================
@@ -141,7 +144,7 b' Instead, they provide the signature::'
141 144 In order to provide the nicer interface, we have :class:`View` classes, which wrap
142 145 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
143 146 the extra arguments. For instance, performing index-access on a client creates a
144 :class:`.LoadBalancedView`.
147 :class:`.DirectView`.
145 148
146 149 .. sourcecode:: ipython
147 150
@@ -218,7 +221,7 b' unbound, unless called by the :meth:`apply_bound` method:'
218 221
219 222 .. sourcecode:: ipython
220 223
221 In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere
224 In [9]: dview['b'] = 5 # assign b to 5 everywhere
222 225
223 226 In [10]: v0 = rc[0]
224 227
@@ -277,14 +280,14 b' local Python/IPython session:'
277 280 ...: return time.time()-tic
278 281
279 282 # In non-blocking mode
280 In [7]: pr = rc[:].apply_async(wait, 2)
283 In [7]: pr = dview.apply_async(wait, 2)
281 284
282 285 # Now block for the result
283 286 In [8]: pr.get()
284 287 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
285 288
286 289 # Again in non-blocking mode
287 In [9]: pr = rc[:].apply_async(wait, 10)
290 In [9]: pr = dview.apply_async(wait, 10)
288 291
289 292 # Poll to see if the result is ready
290 293 In [10]: pr.ready()
@@ -321,7 +324,7 b' associated results are ready:'
321 324 In [72]: rc.block=False
322 325
323 326 # A trivial list of AsyncResults objects
324 In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)]
327 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
325 328
326 329 # Wait until all of them are done
327 330 In [74]: rc.barrier(pr_list)
@@ -332,63 +335,38 b' associated results are ready:'
332 335
333 336
334 337
335 The ``block`` and ``targets`` keyword arguments and attributes
336 --------------------------------------------------------------
337
338 .. warning::
339
340 This is different now, I haven't updated this section.
341 -MinRK
338 The ``block`` keyword argument and attributes
339 ---------------------------------------------
342 340
343 341 Most methods(like :meth:`apply`) accept
344 ``block`` and ``targets`` as keyword arguments. As we have seen above, these
345 keyword arguments control the blocking mode and which engines the command is
346 applied to. The :class:`Client` class also has :attr:`block` and
347 :attr:`targets` attributes that control the default behavior when the keyword
348 arguments are not provided. Thus the following logic is used for :attr:`block`
349 and :attr:`targets`:
342 ``block`` as a keyword argument. As we have seen above, these
343 keyword arguments control the blocking mode . The :class:`Client` class also has
344 a :attr:`block` attribute that controls the default behavior when the keyword
345 argument is not provided. Thus the following logic is used for :attr:`block`:
350 346
351 347 * If no keyword argument is provided, the instance attributes are used.
352 * Keyword argument, if provided override the instance attributes.
348 * Keyword argument, if provided override the instance attributes for
349 the duration of a single call.
353 350
354 351 The following examples demonstrate how to use the instance attributes:
355 352
356 353 .. sourcecode:: ipython
357 354
358 In [16]: rc.targets = [0,2]
359
360 355 In [17]: rc.block = False
361 356
362 In [18]: pr = rc.execute('a=5')
363
364 In [19]: pr.r
365 Out[19]:
366 <Results List>
367 [0] In [6]: a=5
368 [2] In [6]: a=5
357 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
369 358
370 # Note targets='all' means all engines
371 In [20]: rc.targets = 'all'
359 In [19]: ar.get()
360 Out[19]: [10,10]
372 361
373 362 In [21]: rc.block = True
374 363
375 In [22]: rc.execute('b=10; print b')
376 Out[22]:
377 <Results List>
378 [0] In [7]: b=10; print b
379 [0] Out[7]: 10
380
381 [1] In [6]: b=10; print b
382 [1] Out[6]: 10
383
384 [2] In [7]: b=10; print b
385 [2] Out[7]: 10
386
387 [3] In [6]: b=10; print b
388 [3] Out[6]: 10
364 # Note targets='all' means all engines
365 In [22]: rc.apply(lambda : 42, targets='all')
366 Out[22]: [42, 42, 42, 42]
389 367
390 The :attr:`block` and :attr:`targets` instance attributes also determine the
391 behavior of the parallel magic commands.
368 The :attr:`block` and :attr:`targets` instance attributes of the
369 :class:`.DirectView` also determine the behavior of the parallel magic commands.
392 370
393 371
394 372 Parallel magic commands
@@ -567,9 +545,9 b' appear as a local dictionary. Underneath, this uses :meth:`push` and'
567 545
568 546 In [50]: rc.block=True
569 547
570 In [51]: rc[:]['a']=['foo','bar']
548 In [51]: dview['a']=['foo','bar']
571 549
572 In [52]: rc[:]['a']
550 In [52]: dview['a']
573 551 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
574 552
575 553 Scatter and gather
@@ -585,13 +563,13 b' between engines, MPI should be used:'
585 563
586 564 .. sourcecode:: ipython
587 565
588 In [58]: rc.scatter('a',range(16))
566 In [58]: dview.scatter('a',range(16))
589 567 Out[58]: [None,None,None,None]
590 568
591 In [59]: rc[:]['a']
569 In [59]: dview['a']
592 570 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
593 571
594 In [60]: rc.gather('a')
572 In [60]: dview.gather('a')
595 573 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
596 574
597 575 Other things to look at
@@ -606,14 +584,14 b' basic effect using :meth:`scatter` and :meth:`gather`:'
606 584
607 585 .. sourcecode:: ipython
608 586
609 In [66]: rc.scatter('x',range(64))
587 In [66]: dview.scatter('x',range(64))
610 588 Out[66]: [None,None,None,None]
611 589
612 590 In [67]: px y = [i**10 for i in x]
613 591 Parallel execution on engines: [0, 1, 2, 3]
614 592 Out[67]:
615 593
616 In [68]: y = rc.gather('y')
594 In [68]: y = dview.gather('y')
617 595
618 596 In [69]: print y
619 597 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
@@ -41,8 +41,8 b' module and then create a :class:`.Client` instance:'
41 41
42 42 In [2]: rc = client.Client()
43 43
44 In [3]: lview = rc[None]
45 Out[3]: <LoadBalancedView tcp://127.0.0.1:10101>
44 In [3]: lview = rc.view(balanced=True)
45 Out[3]: <LoadBalancedView None>
46 46
47 47
48 48 This form assumes that the controller was started on localhost with default
@@ -66,7 +66,7 b' objects, but *in parallel*. Like the multiengine interface, these can be'
66 66 implemented via the task interface. The exact same tools can perform these
67 67 actions in load-balanced ways as well as multiplexed ways: a parallel version
68 68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
69 argument `targets=None`, then they are dynamically load balanced. Thus, if the
69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
70 70 execution time per item varies significantly, you should use the versions in
71 71 the task interface.
72 72
@@ -80,7 +80,7 b' for the ``None`` element:'
80 80
81 81 In [63]: serial_result = map(lambda x:x**10, range(32))
82 82
83 In [64]: parallel_result = tc[None].map(lambda x:x**10, range(32))
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84 84
85 85 In [65]: serial_result==parallel_result
86 86 Out[65]: True
@@ -258,13 +258,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
258 258
259 259 In [14]: client.block=False
260 260
261 In [15]: ar = client.apply(f, args, kwargs, targets=None)
261 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
262 262
263 In [16]: ar2 = client.apply(f2, targets=None)
263 In [16]: ar2 = client.apply(f2, balanced=True)
264 264
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2])
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
266 266
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
268 268
269 269
270 270 .. seealso::
@@ -383,7 +383,7 b' The following is an overview of how to use these classes together:'
383 383 1. Create a :class:`Client`.
384 384 2. Define some functions to be run as tasks
385 385 3. Submit your tasks to using the :meth:`apply` method of your
386 :class:`Client` instance, specifying `targets=None`. This signals
386 :class:`Client` instance, specifying `balanced=True`. This signals
387 387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
388 388 4. Use :meth:`Client.get_results` to get the results of the
389 389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
General Comments 0
You need to be logged in to leave comments. Login now