##// END OF EJS Templates
API update involving map and load-balancing
MinRK -
Show More
@@ -147,9 +147,9 b' class UnSerializeIt(UnSerialized):'
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
148 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
149 result.shape = self.serialized.metadata['shape']
149 result.shape = self.serialized.metadata['shape']
150 # This is a hack to make the array writable. We are working with
150 # numpy arrays with frombuffer are read-only. We are working with
151 # the numpy folks to address this issue.
151 # the numpy folks to address this issue.
152 result = result.copy()
152 # result = result.copy()
153 elif typeDescriptor == 'pickle':
153 elif typeDescriptor == 'pickle':
154 result = pickle.loads(self.serialized.getData())
154 result = pickle.loads(self.serialized.getData())
155 elif typeDescriptor in ('bytes', 'buffer'):
155 elif typeDescriptor in ('bytes', 'buffer'):
@@ -21,7 +21,7 b' from pprint import pprint'
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
@@ -203,6 +203,7 b' class Client(object):'
203
203
204 Attributes
204 Attributes
205 ----------
205 ----------
206
206 ids : set of int engine IDs
207 ids : set of int engine IDs
207 requesting the ids attribute always synchronizes
208 requesting the ids attribute always synchronizes
208 the registration state. To request ids without synchronization,
209 the registration state. To request ids without synchronization,
@@ -225,17 +226,23 b' class Client(object):'
225
226
226 Methods
227 Methods
227 -------
228 -------
228 spin : flushes incoming results and registration state changes
229 control methods spin, and requesting `ids` also ensures up to date
230
231 barrier : wait on one or more msg_ids
232
229
233 execution methods: apply/apply_bound/apply_to/apply_bound
230 spin
231 flushes incoming results and registration state changes
232 control methods spin, and requesting `ids` also ensures up to date
233
234 barrier
235 wait on one or more msg_ids
236
237 execution methods
238 apply
234 legacy: execute, run
239 legacy: execute, run
235
240
236 query methods: queue_status, get_result, purge
241 query methods
242 queue_status, get_result, purge
237
243
238 control methods: abort, kill
244 control methods
245 abort, shutdown
239
246
240 """
247 """
241
248
@@ -265,7 +272,6 b' class Client(object):'
265 if context is None:
272 if context is None:
266 context = zmq.Context()
273 context = zmq.Context()
267 self.context = context
274 self.context = context
268 self.targets = 'all'
269
275
270 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
276 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
271 if self._cd is not None:
277 if self._cd is not None:
@@ -634,42 +640,18 b' class Client(object):'
634 #--------------------------------------------------------------------------
640 #--------------------------------------------------------------------------
635
641
636 def __getitem__(self, key):
642 def __getitem__(self, key):
637 """Dict access returns DirectView multiplexer objects or,
643 """index access returns DirectView multiplexer objects
638 if key is None, a LoadBalancedView."""
639 if key is None:
640 return LoadBalancedView(self)
641 if isinstance(key, int):
642 if key not in self.ids:
643 raise IndexError("No such engine: %i"%key)
644 return DirectView(self, key)
645
646 if isinstance(key, slice):
647 indices = range(len(self.ids))[key]
648 ids = sorted(self._ids)
649 key = [ ids[i] for i in indices ]
650 # newkeys = sorted(self._ids)[thekeys[k]]
651
644
652 if isinstance(key, (tuple, list, xrange)):
645 Must be int, slice, or list/tuple/xrange of ints"""
653 _,targets = self._build_targets(list(key))
646 if not isinstance(key, (int, slice, tuple, list, xrange)):
654 return DirectView(self, targets)
647 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
655 else:
648 else:
656 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
649 return self.view(key, balanced=False)
657
650
658 #--------------------------------------------------------------------------
651 #--------------------------------------------------------------------------
659 # Begin public methods
652 # Begin public methods
660 #--------------------------------------------------------------------------
653 #--------------------------------------------------------------------------
661
654
662 @property
663 def remote(self):
664 """property for convenient RemoteFunction generation.
665
666 >>> @client.remote
667 ... def getpid():
668 import os
669 return os.getpid()
670 """
671 return remote(self, block=self.block)
672
673 def spin(self):
655 def spin(self):
674 """Flush any registration notifications and execution results
656 """Flush any registration notifications and execution results
675 waiting in the ZMQ queue.
657 waiting in the ZMQ queue.
@@ -690,6 +672,7 b' class Client(object):'
690
672
691 Parameters
673 Parameters
692 ----------
674 ----------
675
693 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
676 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
694 ints are indices to self.history
677 ints are indices to self.history
695 strs are msg_ids
678 strs are msg_ids
@@ -700,6 +683,7 b' class Client(object):'
700
683
701 Returns
684 Returns
702 -------
685 -------
686
703 True : when all msg_ids are done
687 True : when all msg_ids are done
704 False : timeout reached, some msg_ids still outstanding
688 False : timeout reached, some msg_ids still outstanding
705 """
689 """
@@ -815,6 +799,7 b' class Client(object):'
815
799
816 Parameters
800 Parameters
817 ----------
801 ----------
802
818 code : str
803 code : str
819 the code string to be executed
804 the code string to be executed
820 targets : int/str/list of ints/strs
805 targets : int/str/list of ints/strs
@@ -824,7 +809,7 b' class Client(object):'
824 whether or not to wait until done to return
809 whether or not to wait until done to return
825 default: self.block
810 default: self.block
826 """
811 """
827 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
812 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
828 return result
813 return result
829
814
830 def run(self, filename, targets='all', block=None):
815 def run(self, filename, targets='all', block=None):
@@ -834,6 +819,7 b' class Client(object):'
834
819
835 Parameters
820 Parameters
836 ----------
821 ----------
822
837 filename : str
823 filename : str
838 The path to the file
824 The path to the file
839 targets : int/str/list of ints/strs
825 targets : int/str/list of ints/strs
@@ -868,7 +854,8 b' class Client(object):'
868 return list(Dependency(dep))
854 return list(Dependency(dep))
869
855
870 @defaultblock
856 @defaultblock
871 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
857 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
858 targets=None, balanced=None,
872 after=None, follow=None, timeout=None):
859 after=None, follow=None, timeout=None):
873 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
860 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
874
861
@@ -905,11 +892,34 b' class Client(object):'
905 if int:
892 if int:
906 Run on single engine
893 Run on single engine
907
894
908 after,follow,timeout only used in `apply_balanced`. See that docstring
895 balanced : bool, default None
909 for details.
896 whether to load-balance. This will default to True
897 if targets is unspecified, or False if targets is specified.
898
899 The following arguments are only used when balanced is True:
900 after : Dependency or collection of msg_ids
901 Only for load-balanced execution (targets=None)
902 Specify a list of msg_ids as a time-based dependency.
903 This job will only be run *after* the dependencies
904 have been met.
905
906 follow : Dependency or collection of msg_ids
907 Only for load-balanced execution (targets=None)
908 Specify a list of msg_ids as a location-based dependency.
909 This job will only be run on an engine where this dependency
910 is met.
911
912 timeout : float/int or None
913 Only for load-balanced execution (targets=None)
914 Specify an amount of time (in seconds) for the scheduler to
915 wait for dependencies to be met before failing with a
916 DependencyTimeout.
917
918 after,follow,timeout only used if `balanced=True`.
910
919
911 Returns
920 Returns
912 -------
921 -------
922
913 if block is False:
923 if block is False:
914 return AsyncResult wrapping msg_ids
924 return AsyncResult wrapping msg_ids
915 output of AsyncResult.get() is identical to that of `apply(...block=True)`
925 output of AsyncResult.get() is identical to that of `apply(...block=True)`
@@ -921,10 +931,21 b' class Client(object):'
921 """
931 """
922
932
923 # defaults:
933 # defaults:
924 block = block if block is not None else self.block
925 args = args if args is not None else []
934 args = args if args is not None else []
926 kwargs = kwargs if kwargs is not None else {}
935 kwargs = kwargs if kwargs is not None else {}
927
936
937 if balanced is None:
938 if targets is None:
939 # default to balanced if targets unspecified
940 balanced = True
941 else:
942 # otherwise default to multiplexing
943 balanced = False
944
945 if targets is None and balanced is False:
946 # default to all if *not* balanced, and targets is unspecified
947 targets = 'all'
948
928 # enforce types of f,args,kwrags
949 # enforce types of f,args,kwrags
929 if not callable(f):
950 if not callable(f):
930 raise TypeError("f must be callable, not %s"%type(f))
951 raise TypeError("f must be callable, not %s"%type(f))
@@ -935,80 +956,27 b' class Client(object):'
935
956
936 options = dict(bound=bound, block=block, targets=targets)
957 options = dict(bound=bound, block=block, targets=targets)
937
958
938 if targets is None:
959 if balanced:
939 return self.apply_balanced(f, args, kwargs, timeout=timeout,
960 return self._apply_balanced(f, args, kwargs, timeout=timeout,
940 after=after, follow=follow, **options)
961 after=after, follow=follow, **options)
941 else:
962 elif follow or after or timeout:
942 if follow or after or timeout:
963 msg = "follow, after, and timeout args are only used for"
943 msg = "follow, after, and timeout args are only used for load-balanced"
964 msg += " load-balanced execution."
944 msg += "execution."
945 raise ValueError(msg)
965 raise ValueError(msg)
966 else:
946 return self._apply_direct(f, args, kwargs, **options)
967 return self._apply_direct(f, args, kwargs, **options)
947
968
948 @defaultblock
969 def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
949 def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
950 after=None, follow=None, timeout=None):
970 after=None, follow=None, timeout=None):
951 """call f(*args, **kwargs) remotely in a load-balanced manner.
971 """call f(*args, **kwargs) remotely in a load-balanced manner.
952
972
953 Parameters
973 This is a private method, see `apply` for details.
954 ----------
974 Not to be called directly!
955
956 f : function
957 The fuction to be called remotely
958 args : tuple/list
959 The positional arguments passed to `f`
960 kwargs : dict
961 The keyword arguments passed to `f`
962 bound : bool (default: True)
963 Whether to execute in the Engine(s) namespace, or in a clean
964 namespace not affecting the engine.
965 block : bool (default: self.block)
966 Whether to wait for the result, or return immediately.
967 False:
968 returns AsyncResult
969 True:
970 returns actual result(s) of f(*args, **kwargs)
971 if multiple targets:
972 list of results, matching `targets`
973 targets : int,list of ints, 'all', None
974 Specify the destination of the job.
975 if None:
976 Submit via Task queue for load-balancing.
977 if 'all':
978 Run on all active engines
979 if list:
980 Run on each specified engine
981 if int:
982 Run on single engine
983
984 after : Dependency or collection of msg_ids
985 Only for load-balanced execution (targets=None)
986 Specify a list of msg_ids as a time-based dependency.
987 This job will only be run *after* the dependencies
988 have been met.
989
990 follow : Dependency or collection of msg_ids
991 Only for load-balanced execution (targets=None)
992 Specify a list of msg_ids as a location-based dependency.
993 This job will only be run on an engine where this dependency
994 is met.
995
996 timeout : float/int or None
997 Only for load-balanced execution (targets=None)
998 Specify an amount of time (in seconds) for the scheduler to
999 wait for dependencies to be met before failing with a
1000 DependencyTimeout.
1001
1002 Returns
1003 -------
1004 if block is False:
1005 return AsyncResult wrapping msg_id
1006 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1007 else:
1008 wait for, and return actual result of `f(*args, **kwargs)`
1009
1010 """
975 """
1011
976
977 for kwarg in (bound, block, targets):
978 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
979
1012 if self._task_socket is None:
980 if self._task_socket is None:
1013 msg = "Task farming is disabled"
981 msg = "Task farming is disabled"
1014 if self._task_scheme == 'pure':
982 if self._task_scheme == 'pure':
@@ -1025,7 +993,6 b' class Client(object):'
1025 if isinstance(f, dependent):
993 if isinstance(f, dependent):
1026 # soft warn on functional dependencies
994 # soft warn on functional dependencies
1027 warnings.warn(msg, RuntimeWarning)
995 warnings.warn(msg, RuntimeWarning)
1028
1029
996
1030 # defaults:
997 # defaults:
1031 args = args if args is not None else []
998 args = args if args is not None else []
@@ -1036,14 +1003,6 b' class Client(object):'
1036 else:
1003 else:
1037 idents = []
1004 idents = []
1038
1005
1039 # enforce types of f,args,kwrags
1040 if not callable(f):
1041 raise TypeError("f must be callable, not %s"%type(f))
1042 if not isinstance(args, (tuple, list)):
1043 raise TypeError("args must be tuple or list, not %s"%type(args))
1044 if not isinstance(kwargs, dict):
1045 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1046
1047 after = self._build_dependency(after)
1006 after = self._build_dependency(after)
1048 follow = self._build_dependency(follow)
1007 follow = self._build_dependency(follow)
1049 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1008 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
@@ -1064,13 +1023,17 b' class Client(object):'
1064 else:
1023 else:
1065 return ar
1024 return ar
1066
1025
1067 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
1026 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1068 """Then underlying method for applying functions to specific engines
1027 """Then underlying method for applying functions to specific engines
1069 via the MUX queue.
1028 via the MUX queue.
1070
1029
1030 This is a private method, see `apply` for details.
1071 Not to be called directly!
1031 Not to be called directly!
1072 """
1032 """
1073
1033
1034 for kwarg in (bound, block, targets):
1035 assert kwarg is not None, "kwarg %r must be specified!"%kwarg
1036
1074 idents,targets = self._build_targets(targets)
1037 idents,targets = self._build_targets(targets)
1075
1038
1076 subheader = {}
1039 subheader = {}
@@ -1095,103 +1058,46 b' class Client(object):'
1095 return ar
1058 return ar
1096
1059
1097 #--------------------------------------------------------------------------
1060 #--------------------------------------------------------------------------
1098 # Map and decorators
1061 # decorators
1099 #--------------------------------------------------------------------------
1062 #--------------------------------------------------------------------------
1100
1063
1101 def map(self, f, *sequences, **kwargs):
1064 @defaultblock
1102 """Parallel version of builtin `map`, using all our engines.
1065 def parallel(self, bound=True, targets='all', block=None):
1103
1104 `block` and `targets` can be passed as keyword arguments only.
1105
1106 There will be one task per target, so work will be chunked
1107 if the sequences are longer than `targets`.
1108
1109 Results can be iterated as they are ready, but will become available in chunks.
1110
1111 Parameters
1112 ----------
1113
1114 f : callable
1115 function to be mapped
1116 *sequences: one or more sequences of matching length
1117 the sequences to be distributed and passed to `f`
1118 block : bool
1119 whether to wait for the result or not [default self.block]
1120 targets : valid targets
1121 targets to be used [default self.targets]
1122
1123 Returns
1124 -------
1125
1126 if block=False:
1127 AsyncMapResult
1128 An object like AsyncResult, but which reassembles the sequence of results
1129 into a single list. AsyncMapResults can be iterated through before all
1130 results are complete.
1131 else:
1132 the result of map(f,*sequences)
1133
1134 """
1135 block = kwargs.get('block', self.block)
1136 targets = kwargs.get('targets', self.targets)
1137 assert len(sequences) > 0, "must have some sequences to map onto!"
1138 pf = ParallelFunction(self, f, block=block,
1139 bound=True, targets=targets)
1140 return pf.map(*sequences)
1141
1142 def imap(self, f, *sequences, **kwargs):
1143 """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
1144
1145 Each element will be a separate task, and will be load-balanced. This
1146 lets individual elements be ready for iteration as soon as they come.
1147
1148 Parameters
1149 ----------
1150
1151 f : callable
1152 function to be mapped
1153 *sequences: one or more sequences of matching length
1154 the sequences to be distributed and passed to `f`
1155 block : bool
1156 whether to wait for the result or not [default self.block]
1157
1158 Returns
1159 -------
1160
1161 if block=False:
1162 AsyncMapResult
1163 An object like AsyncResult, but which reassembles the sequence of results
1164 into a single list. AsyncMapResults can be iterated through before all
1165 results are complete.
1166 else:
1167 the result of map(f,*sequences)
1168
1169 """
1170
1171 block = kwargs.get('block', self.block)
1172
1173 assert len(sequences) > 0, "must have some sequences to map onto!"
1174
1175 pf = ParallelFunction(self, f, block=self.block,
1176 bound=True, targets=None)
1177 return pf.map(*sequences)
1178
1179 def parallel(self, bound=True, targets='all', block=True):
1180 """Decorator for making a ParallelFunction."""
1066 """Decorator for making a ParallelFunction."""
1181 return parallel(self, bound=bound, targets=targets, block=block)
1067 return parallel(self, bound=bound, targets=targets, block=block)
1182
1068
1183 def remote(self, bound=True, targets='all', block=True):
1069 @defaultblock
1070 def remote(self, bound=True, targets='all', block=None):
1184 """Decorator for making a RemoteFunction."""
1071 """Decorator for making a RemoteFunction."""
1185 return remote(self, bound=bound, targets=targets, block=block)
1072 return remote(self, bound=bound, targets=targets, block=block)
1186
1073
1187 def view(self, targets=None, balanced=False):
1074 def view(self, targets=None, balanced=False):
1188 """Method for constructing View objects"""
1075 """Method for constructing View objects"""
1189 if not balanced:
1076 if targets is None:
1190 if not targets:
1077 if balanced:
1078 return LoadBalancedView(client=self)
1079 else:
1191 targets = slice(None)
1080 targets = slice(None)
1192 return self[targets]
1081
1082 if balanced:
1083 view_class = LoadBalancedView
1084 else:
1085 view_class = DirectView
1086 if isinstance(targets, int):
1087 if targets not in self.ids:
1088 raise IndexError("No such engine: %i"%targets)
1089 return view_class(client=self, targets=targets)
1090
1091 if isinstance(targets, slice):
1092 indices = range(len(self.ids))[targets]
1093 ids = sorted(self._ids)
1094 targets = [ ids[i] for i in indices ]
1095
1096 if isinstance(targets, (tuple, list, xrange)):
1097 _,targets = self._build_targets(list(targets))
1098 return view_class(client=self, targets=targets)
1193 else:
1099 else:
1194 return LoadBalancedView(self, targets)
1100 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1195
1101
1196 #--------------------------------------------------------------------------
1102 #--------------------------------------------------------------------------
1197 # Data movement
1103 # Data movement
@@ -1202,7 +1108,7 b' class Client(object):'
1202 """Push the contents of `ns` into the namespace on `target`"""
1108 """Push the contents of `ns` into the namespace on `target`"""
1203 if not isinstance(ns, dict):
1109 if not isinstance(ns, dict):
1204 raise TypeError("Must be a dict, not %s"%type(ns))
1110 raise TypeError("Must be a dict, not %s"%type(ns))
1205 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1111 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1206 return result
1112 return result
1207
1113
1208 @defaultblock
1114 @defaultblock
@@ -1214,14 +1120,14 b' class Client(object):'
1214 for key in keys:
1120 for key in keys:
1215 if not isinstance(key, str):
1121 if not isinstance(key, str):
1216 raise TypeError
1122 raise TypeError
1217 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1123 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1218 return result
1124 return result
1219
1125
1126 @defaultblock
1220 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1127 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1221 """
1128 """
1222 Partition a Python sequence and send the partitions to a set of engines.
1129 Partition a Python sequence and send the partitions to a set of engines.
1223 """
1130 """
1224 block = block if block is not None else self.block
1225 targets = self._build_targets(targets)[-1]
1131 targets = self._build_targets(targets)[-1]
1226 mapObject = Map.dists[dist]()
1132 mapObject = Map.dists[dist]()
1227 nparts = len(targets)
1133 nparts = len(targets)
@@ -1239,11 +1145,11 b' class Client(object):'
1239 else:
1145 else:
1240 return r
1146 return r
1241
1147
1148 @defaultblock
1242 def gather(self, key, dist='b', targets='all', block=None):
1149 def gather(self, key, dist='b', targets='all', block=None):
1243 """
1150 """
1244 Gather a partitioned sequence on a set of engines as a single local seq.
1151 Gather a partitioned sequence on a set of engines as a single local seq.
1245 """
1152 """
1246 block = block if block is not None else self.block
1247
1153
1248 targets = self._build_targets(targets)[-1]
1154 targets = self._build_targets(targets)[-1]
1249 mapObject = Map.dists[dist]()
1155 mapObject = Map.dists[dist]()
@@ -1267,6 +1173,7 b' class Client(object):'
1267
1173
1268 Parameters
1174 Parameters
1269 ----------
1175 ----------
1176
1270 msg_ids : list of ints or msg_ids
1177 msg_ids : list of ints or msg_ids
1271 if int:
1178 if int:
1272 Passed as index to self.history for convenience.
1179 Passed as index to self.history for convenience.
@@ -1351,13 +1258,14 b' class Client(object):'
1351 return content
1258 return content
1352
1259
1353 @spinfirst
1260 @spinfirst
1354 def queue_status(self, targets=None, verbose=False):
1261 def queue_status(self, targets='all', verbose=False):
1355 """Fetch the status of engine queues.
1262 """Fetch the status of engine queues.
1356
1263
1357 Parameters
1264 Parameters
1358 ----------
1265 ----------
1266
1359 targets : int/str/list of ints/strs
1267 targets : int/str/list of ints/strs
1360 the engines on which to execute
1268 the engines whose states are to be queried.
1361 default : all
1269 default : all
1362 verbose : bool
1270 verbose : bool
1363 Whether to return lengths only, or lists of ids for each element
1271 Whether to return lengths only, or lists of ids for each element
@@ -1383,6 +1291,7 b' class Client(object):'
1383
1291
1384 Parameters
1292 Parameters
1385 ----------
1293 ----------
1294
1386 msg_ids : str or list of strs
1295 msg_ids : str or list of strs
1387 the msg_ids whose results should be forgotten.
1296 the msg_ids whose results should be forgotten.
1388 targets : int/str/list of ints/strs
1297 targets : int/str/list of ints/strs
@@ -1404,59 +1313,6 b' class Client(object):'
1404 if content['status'] != 'ok':
1313 if content['status'] != 'ok':
1405 raise ss.unwrap_exception(content)
1314 raise ss.unwrap_exception(content)
1406
1315
1407 #----------------------------------------
1408 # activate for %px,%autopx magics
1409 #----------------------------------------
1410 def activate(self):
1411 """Make this `View` active for parallel magic commands.
1412
1413 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1414 In a given IPython session there is a single active one. While
1415 there can be many `Views` created and used by the user,
1416 there is only one active one. The active `View` is used whenever
1417 the magic commands %px and %autopx are used.
1418
1419 The activate() method is called on a given `View` to make it
1420 active. Once this has been done, the magic commands can be used.
1421 """
1422
1423 try:
1424 # This is injected into __builtins__.
1425 ip = get_ipython()
1426 except NameError:
1427 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1428 else:
1429 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1430 if pmagic is not None:
1431 pmagic.active_multiengine_client = self
1432 else:
1433 print "You must first load the parallelmagic extension " \
1434 "by doing '%load_ext parallelmagic'"
1435
1436 class AsynClient(Client):
1437 """An Asynchronous client, using the Tornado Event Loop.
1438 !!!unfinished!!!"""
1439 io_loop = None
1440 _queue_stream = None
1441 _notifier_stream = None
1442 _task_stream = None
1443 _control_stream = None
1444
1445 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1446 Client.__init__(self, addr, context, username, debug)
1447 if io_loop is None:
1448 io_loop = ioloop.IOLoop.instance()
1449 self.io_loop = io_loop
1450
1451 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1452 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1453 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1454 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1455
1456 def spin(self):
1457 for stream in (self.queue_stream, self.notifier_stream,
1458 self.task_stream, self.control_stream):
1459 stream.flush()
1460
1316
1461 __all__ = [ 'Client',
1317 __all__ = [ 'Client',
1462 'depend',
1318 'depend',
@@ -17,7 +17,7 b' from asyncresult import AsyncMapResult'
17 # Decorators
17 # Decorators
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 def remote(client, bound=False, block=None, targets=None):
20 def remote(client, bound=False, block=None, targets=None, balanced=None):
21 """Turn a function into a remote function.
21 """Turn a function into a remote function.
22
22
23 This method can be used for map:
23 This method can be used for map:
@@ -26,10 +26,10 b' def remote(client, bound=False, block=None, targets=None):'
26 def func(a)
26 def func(a)
27 """
27 """
28 def remote_function(f):
28 def remote_function(f):
29 return RemoteFunction(client, f, bound, block, targets)
29 return RemoteFunction(client, f, bound, block, targets, balanced)
30 return remote_function
30 return remote_function
31
31
32 def parallel(client, dist='b', bound=False, block=None, targets='all'):
32 def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
33 """Turn a function into a parallel remote function.
33 """Turn a function into a parallel remote function.
34
34
35 This method can be used for map:
35 This method can be used for map:
@@ -38,7 +38,7 b" def parallel(client, dist='b', bound=False, block=None, targets='all'):"
38 def func(a)
38 def func(a)
39 """
39 """
40 def parallel_function(f):
40 def parallel_function(f):
41 return ParallelFunction(client, f, dist, bound, block, targets)
41 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
42 return parallel_function
42 return parallel_function
43
43
44 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
@@ -62,6 +62,8 b' class RemoteFunction(object):'
62 to use the current `block` attribute of `client`
62 to use the current `block` attribute of `client`
63 targets : valid target list [default: all]
63 targets : valid target list [default: all]
64 The targets on which to execute.
64 The targets on which to execute.
65 balanced : bool
66 Whether to load-balance with the Task scheduler or not
65 """
67 """
66
68
67 client = None # the remote connection
69 client = None # the remote connection
@@ -69,23 +71,30 b' class RemoteFunction(object):'
69 block = None # whether to block
71 block = None # whether to block
70 bound = None # whether to affect the namespace
72 bound = None # whether to affect the namespace
71 targets = None # where to execute
73 targets = None # where to execute
74 balanced = None # whether to load-balance
72
75
73 def __init__(self, client, f, bound=False, block=None, targets=None):
76 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
74 self.client = client
77 self.client = client
75 self.func = f
78 self.func = f
76 self.block=block
79 self.block=block
77 self.bound=bound
80 self.bound=bound
78 self.targets=targets
81 self.targets=targets
82 if balanced is None:
83 if targets is None:
84 balanced = True
85 else:
86 balanced = False
87 self.balanced = balanced
79
88
80 def __call__(self, *args, **kwargs):
89 def __call__(self, *args, **kwargs):
81 return self.client.apply(self.func, args=args, kwargs=kwargs,
90 return self.client.apply(self.func, args=args, kwargs=kwargs,
82 block=self.block, targets=self.targets, bound=self.bound)
91 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
83
92
84
93
85 class ParallelFunction(RemoteFunction):
94 class ParallelFunction(RemoteFunction):
86 """Class for mapping a function to sequences."""
95 """Class for mapping a function to sequences."""
87 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
96 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None):
88 super(ParallelFunction, self).__init__(client,f,bound,block,targets)
97 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
89 mapClass = Map.dists[dist]
98 mapClass = Map.dists[dist]
90 self.mapObject = mapClass()
99 self.mapObject = mapClass()
91
100
@@ -93,21 +102,19 b' class ParallelFunction(RemoteFunction):'
93 len_0 = len(sequences[0])
102 len_0 = len(sequences[0])
94 for s in sequences:
103 for s in sequences:
95 if len(s)!=len_0:
104 if len(s)!=len_0:
96 raise ValueError('all sequences must have equal length')
105 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
106 raise ValueError(msg)
97
107
98 if self.targets is None:
108 if self.balanced:
99 # load-balanced:
109 targets = [self.targets]*len_0
100 engines = [None]*len_0
101 elif isinstance(self.targets, int):
102 engines = [None]*self.targets
103 else:
110 else:
104 # multiplexed:
111 # multiplexed:
105 engines = self.client._build_targets(self.targets)[-1]
112 targets = self.client._build_targets(self.targets)[-1]
106
113
107 nparts = len(engines)
114 nparts = len(targets)
108 msg_ids = []
115 msg_ids = []
109 # my_f = lambda *a: map(self.func, *a)
116 # my_f = lambda *a: map(self.func, *a)
110 for index, engineid in enumerate(engines):
117 for index, t in enumerate(targets):
111 args = []
118 args = []
112 for seq in sequences:
119 for seq in sequences:
113 part = self.mapObject.getPartition(seq, index, nparts)
120 part = self.mapObject.getPartition(seq, index, nparts)
@@ -124,22 +131,26 b' class ParallelFunction(RemoteFunction):'
124 args = [self.func]+args
131 args = [self.func]+args
125 else:
132 else:
126 f=self.func
133 f=self.func
127 mid = self.client.apply(f, args=args, block=False,
134 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
128 bound=self.bound,
135 targets=targets, balanced=self.balanced)
129 targets=engineid).msg_ids[0]
136
130 msg_ids.append(mid)
137 msg_ids.append(ar.msg_ids[0])
131
138
132 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
139 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
133 if self.block:
140 if self.block:
134 r.wait()
141 try:
135 return r.result
142 return r.get()
143 except KeyboardInterrupt:
144 return r
136 else:
145 else:
137 return r
146 return r
138
147
139 def map(self, *sequences):
148 def map(self, *sequences):
140 """call a function on each element of a sequence remotely."""
149 """call a function on each element of a sequence remotely."""
141 self._map = True
150 self._map = True
142 ret = self.__call__(*sequences)
151 try:
143 del self._map
152 ret = self.__call__(*sequences)
153 finally:
154 del self._map
144 return ret
155 return ret
145
156
@@ -59,7 +59,7 b' def validate_url(url):'
59 except ValueError:
59 except ValueError:
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
61
61
62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
63
63
64 else:
64 else:
65 # only validate tcp urls currently
65 # only validate tcp urls currently
@@ -10,7 +10,11 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
13 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
19
16 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
@@ -61,30 +65,29 b' def spin_after(f, self, *args, **kwargs):'
61 # Classes
65 # Classes
62 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
63
67
64 class View(object):
68 class View(HasTraits):
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66
70
67 Don't use this class, use subclasses.
71 Don't use this class, use subclasses.
68 """
72 """
69 block=None
73 block=Bool(False)
70 bound=None
74 bound=Bool(False)
71 history=None
75 history=List()
72 outstanding = set()
76 outstanding = Set()
73 results = {}
77 results = Dict()
74
78 client = Instance('IPython.zmq.parallel.client.Client')
79
80 _ntargets = Int(1)
81 _balanced = Bool(False)
82 _default_names = List(['block', 'bound'])
75 _targets = None
83 _targets = None
76 _apply_name = 'apply'
77 _default_names = ['targets', 'block']
78
84
79 def __init__(self, client, targets=None):
85 def __init__(self, client=None, targets=None):
80 self.client = client
86 super(View, self).__init__(client=client)
81 self._targets = targets
87 self._targets = targets
82 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
83 self.block = client.block
89 self.block = client.block
84 self.bound=False
90
85 self.history = []
86 self.outstanding = set()
87 self.results = {}
88 for name in self._default_names:
91 for name in self._default_names:
89 setattr(self, name, getattr(self, name, None))
92 setattr(self, name, getattr(self, name, None))
90
93
@@ -101,26 +104,46 b' class View(object):'
101
104
102 @targets.setter
105 @targets.setter
103 def targets(self, value):
106 def targets(self, value):
104 self._targets = value
107 raise AttributeError("Cannot set View `targets` after construction!")
105 # raise AttributeError("Cannot set my targets argument after construction!")
106
108
107 def _defaults(self, *excludes):
109 def _defaults(self, *excludes):
108 """return dict of our default attributes, excluding names given."""
110 """return dict of our default attributes, excluding names given."""
109 d = {}
111 d = dict(balanced=self._balanced, targets=self.targets)
110 for name in self._default_names:
112 for name in self._default_names:
111 if name not in excludes:
113 if name not in excludes:
112 d[name] = getattr(self, name)
114 d[name] = getattr(self, name)
113 return d
115 return d
114
116
117 def set_flags(self, **kwargs):
118 """set my attribute flags by keyword.
119
120 A View is a wrapper for the Client's apply method, but
121 with attributes that specify keyword arguments, those attributes
122 can be set by keyword argument with this method.
123
124 Parameters
125 ----------
126
127 block : bool
128 whether to wait for results
129 bound : bool
130 whether to use the client's namespace
131 """
132 for key in kwargs:
133 if key not in self._default_names:
134 raise KeyError("Invalid name: %r"%key)
135 for name in ('block', 'bound'):
136 if name in kwargs:
137 setattr(self, name, kwargs)
138
139 #----------------------------------------------------------------
140 # wrappers for client methods:
141 #----------------------------------------------------------------
115 @sync_results
142 @sync_results
116 def spin(self):
143 def spin(self):
117 """spin the client, and sync"""
144 """spin the client, and sync"""
118 self.client.spin()
145 self.client.spin()
119
146
120 @property
121 def _apply(self):
122 return getattr(self.client, self._apply_name)
123
124 @sync_results
147 @sync_results
125 @save_ids
148 @save_ids
126 def apply(self, f, *args, **kwargs):
149 def apply(self, f, *args, **kwargs):
@@ -133,7 +156,7 b' class View(object):'
133 else:
156 else:
134 returns actual result of f(*args, **kwargs)
157 returns actual result of f(*args, **kwargs)
135 """
158 """
136 return self._apply(f, args, kwargs, **self._defaults())
159 return self.client.apply(f, args, kwargs, **self._defaults())
137
160
138 @save_ids
161 @save_ids
139 def apply_async(self, f, *args, **kwargs):
162 def apply_async(self, f, *args, **kwargs):
@@ -144,7 +167,7 b' class View(object):'
144 returns msg_id
167 returns msg_id
145 """
168 """
146 d = self._defaults('block', 'bound')
169 d = self._defaults('block', 'bound')
147 return self._apply(f,args,kwargs, block=False, bound=False, **d)
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
148
171
149 @spin_after
172 @spin_after
150 @save_ids
173 @save_ids
@@ -157,7 +180,7 b' class View(object):'
157 returns: actual result of f(*args, **kwargs)
180 returns: actual result of f(*args, **kwargs)
158 """
181 """
159 d = self._defaults('block', 'bound')
182 d = self._defaults('block', 'bound')
160 return self._apply(f,args,kwargs, block=True, bound=False, **d)
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
161
184
162 @sync_results
185 @sync_results
163 @save_ids
186 @save_ids
@@ -173,7 +196,7 b' class View(object):'
173
196
174 """
197 """
175 d = self._defaults('bound')
198 d = self._defaults('bound')
176 return self._apply(f, args, kwargs, bound=True, **d)
199 return self.client.apply(f, args, kwargs, bound=True, **d)
177
200
178 @sync_results
201 @sync_results
179 @save_ids
202 @save_ids
@@ -187,7 +210,7 b' class View(object):'
187
210
188 """
211 """
189 d = self._defaults('block', 'bound')
212 d = self._defaults('block', 'bound')
190 return self._apply(f, args, kwargs, block=False, bound=True, **d)
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
191
214
192 @spin_after
215 @spin_after
193 @save_ids
216 @save_ids
@@ -200,23 +223,7 b' class View(object):'
200
223
201 """
224 """
202 d = self._defaults('block', 'bound')
225 d = self._defaults('block', 'bound')
203 return self._apply(f, args, kwargs, block=True, bound=True, **d)
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
204
205 @spin_after
206 @save_ids
207 def map(self, f, *sequences):
208 """Parallel version of builtin `map`, using this view's engines."""
209 if isinstance(self.targets, int):
210 targets = [self.targets]
211 else:
212 targets = self.targets
213 pf = ParallelFunction(self.client, f, block=self.block,
214 bound=True, targets=targets)
215 return pf.map(*sequences)
216
217 def parallel(self, bound=True, block=True):
218 """Decorator for making a ParallelFunction"""
219 return parallel(self.client, bound=bound, targets=self.targets, block=block)
220
227
221 def abort(self, msg_ids=None, block=None):
228 def abort(self, msg_ids=None, block=None):
222 """Abort jobs on my engines.
229 """Abort jobs on my engines.
@@ -240,6 +247,17 b' class View(object):'
240 if targets is None or targets == 'all':
247 if targets is None or targets == 'all':
241 targets = self.targets
248 targets = self.targets
242 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
250
251 #-------------------------------------------------------------------
252 # Decorators
253 #-------------------------------------------------------------------
254 def parallel(self, bound=True, block=True):
255 """Decorator for making a ParallelFunction"""
256 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
257
258 def remote(self, bound=True, block=True):
259 """Decorator for making a RemoteFunction"""
260 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
243
261
244
262
245
263
@@ -262,6 +280,62 b' class DirectView(View):'
262
280
263 """
281 """
264
282
283 def __init__(self, client=None, targets=None):
284 super(DirectView, self).__init__(client=client, targets=targets)
285 self._balanced = False
286
287 @spin_after
288 @save_ids
289 def map(self, f, *sequences, **kwargs):
290 """Parallel version of builtin `map`, using this View's `targets`.
291
292 There will be one task per target, so work will be chunked
293 if the sequences are longer than `targets`.
294
295 Results can be iterated as they are ready, but will become available in chunks.
296
297 Parameters
298 ----------
299
300 f : callable
301 function to be mapped
302 *sequences: one or more sequences of matching length
303 the sequences to be distributed and passed to `f`
304 block : bool
305 whether to wait for the result or not [default self.block]
306 bound : bool
307 whether to wait for the result or not [default self.bound]
308
309 Returns
310 -------
311
312 if block=False:
313 AsyncMapResult
314 An object like AsyncResult, but which reassembles the sequence of results
315 into a single list. AsyncMapResults can be iterated through before all
316 results are complete.
317 else:
318 the result of map(f,*sequences)
319 """
320
321 block = kwargs.get('block', self.block)
322 bound = kwargs.get('bound', self.bound)
323 for k in kwargs.keys():
324 if k not in ['block', 'bound']:
325 raise TypeError("invalid keyword arg, %r"%k)
326
327 assert len(sequences) > 0, "must have some sequences to map onto!"
328 pf = ParallelFunction(self.client, f, block=block,
329 bound=bound, targets=self.targets, balanced=False)
330 return pf.map(*sequences)
331
332 def map_async(self, f, *sequences, **kwargs):
333 """Parallel version of builtin `map`, using this view's engines."""
334 if 'block' in kwargs:
335 raise TypeError("map_async doesn't take a `block` keyword argument.")
336 kwargs['block'] = True
337 return self.map(f,*sequences,**kwargs)
338
265 @sync_results
339 @sync_results
266 @save_ids
340 @save_ids
267 def execute(self, code, block=True):
341 def execute(self, code, block=True):
@@ -358,14 +432,13 b' class DirectView(View):'
358
432
359
433
360 class LoadBalancedView(View):
434 class LoadBalancedView(View):
361 """An engine-agnostic View that only executes via the Task queue.
435 """An load-balancing View that only executes via the Task scheduler.
362
436
363 Typically created via:
437 Load-balanced views can be created with the client's `view` method:
364
438
365 >>> v = client[None]
439 >>> v = client.view(balanced=True)
366 <LoadBalancedView None>
367
440
368 but can also be created with:
441 or targets can be specified, to restrict the potential destinations:
369
442
370 >>> v = client.view([1,3],balanced=True)
443 >>> v = client.view([1,3],balanced=True)
371
444
@@ -374,10 +447,126 b' class LoadBalancedView(View):'
374 """
447 """
375
448
376 _apply_name = 'apply_balanced'
449 _apply_name = 'apply_balanced'
377 _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout']
450 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
378
451
379 def __init__(self, client, targets=None):
452 def __init__(self, client=None, targets=None):
380 super(LoadBalancedView, self).__init__(client, targets)
453 super(LoadBalancedView, self).__init__(client=client, targets=targets)
381 self._ntargets = 1
454 self._ntargets = 1
382 self._apply_name = 'apply_balanced'
455
456 def _validate_dependency(self, dep):
457 """validate a dependency.
458
459 For use in `set_flags`.
460 """
461 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
462 return True
463 elif isinstance(dep, (list,set, tuple)):
464 for d in dep:
465 if not isinstance(d, str, AsyncResult):
466 return False
467 elif isinstance(dep, dict):
468 if set(dep.keys()) != set(Dependency().as_dict().keys()):
469 return False
470 if not isinstance(dep['msg_ids'], list):
471 return False
472 for d in dep['msg_ids']:
473 if not isinstance(d, str):
474 return False
475 else:
476 return False
477
478 def set_flags(self, **kwargs):
479 """set my attribute flags by keyword.
480
481 A View is a wrapper for the Client's apply method, but
482 with attributes that specify keyword arguments, those attributes
483 can be set by keyword argument with this method.
484
485 Parameters
486 ----------
487
488 block : bool
489 whether to wait for results
490 bound : bool
491 whether to use the engine's namespace
492 follow : Dependency, list, msg_id, AsyncResult
493 the location dependencies of tasks
494 after : Dependency, list, msg_id, AsyncResult
495 the time dependencies of tasks
496 timeout : int,None
497 the timeout to be used for tasks
498 """
499
500 super(LoadBalancedView, self).set_flags(**kwargs)
501 for name in ('follow', 'after'):
502 if name in kwargs:
503 value = kwargs[name]
504 if self._validate_dependency(value):
505 setattr(self, name, value)
506 else:
507 raise ValueError("Invalid dependency: %r"%value)
508 if 'timeout' in kwargs:
509 t = kwargs['timeout']
510 if not isinstance(t, (int, long, float, None)):
511 raise TypeError("Invalid type for timeout: %r"%type(t))
512 if t is not None:
513 if t < 0:
514 raise ValueError("Invalid timeout: %s"%t)
515 self.timeout = t
516
517 @spin_after
518 @save_ids
519 def map(self, f, *sequences, **kwargs):
520 """Parallel version of builtin `map`, load-balanced by this View.
521
522 Each element will be a separate task, and will be load-balanced. This
523 lets individual elements be available for iteration as soon as they arrive.
524
525 Parameters
526 ----------
527
528 f : callable
529 function to be mapped
530 *sequences: one or more sequences of matching length
531 the sequences to be distributed and passed to `f`
532 block : bool
533 whether to wait for the result or not [default self.block]
534 bound : bool
535 whether to use the engine's namespace
536
537 Returns
538 -------
539
540 if block=False:
541 AsyncMapResult
542 An object like AsyncResult, but which reassembles the sequence of results
543 into a single list. AsyncMapResults can be iterated through before all
544 results are complete.
545 else:
546 the result of map(f,*sequences)
547
548 """
549
550 block = kwargs.get('block', self.block)
551 bound = kwargs.get('bound', self.bound)
552
553 assert len(sequences) > 0, "must have some sequences to map onto!"
554
555 pf = ParallelFunction(self.client, f, block=block, bound=bound,
556 targets=self.targets, balanced=True)
557 return pf.map(*sequences)
558
559 def map_async(self, f, *sequences, **kwargs):
560 """Parallel version of builtin `map`, using this view's engines.
561
562 This is equivalent to map(...block=False)
563
564 See `map` for details.
565 """
566
567 if 'block' in kwargs:
568 raise TypeError("map_async doesn't take a `block` keyword argument.")
569 kwargs['block'] = True
570 return self.map(f,*sequences,**kwargs)
571
383
572
@@ -37,7 +37,7 b' module and then create a :class:`.Client` instance:'
37 In [2]: rc = client.Client()
37 In [2]: rc = client.Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
@@ -45,17 +45,17 b' file to the client machine, or enter its contents as arguments to the Client con'
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
48 # for a remote controller at 10.0.1.5, visible from my.server.com:
48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
50
50
51
51
52 To make sure there are engines connected to the controller, use can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: set([0, 1, 2, 3])
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
@@ -73,24 +73,25 b' Parallel map'
73 Python's builtin :func:`map` functions allows a function to be applied to a
73 Python's builtin :func:`map` functions allows a function to be applied to a
74 sequence element-by-element. This type of code is typically trivial to
74 sequence element-by-element. This type of code is typically trivial to
75 parallelize. In fact, since IPython's interface is all about functions anyway,
75 parallelize. In fact, since IPython's interface is all about functions anyway,
76 you can just use the builtin :func:`map`, or a client's :meth:`map` method:
76 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
77 DirectView's :meth:`map` method:
77
78
78 .. sourcecode:: ipython
79 .. sourcecode:: ipython
79
80
80 In [62]: serial_result = map(lambda x:x**10, range(32))
81 In [62]: serial_result = map(lambda x:x**10, range(32))
81
82
82 In [66]: parallel_result = rc.map(lambda x: x**10, range(32))
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
83
84
84 In [67]: serial_result==parallel_result
85 In [67]: serial_result==parallel_result.get()
85 Out[67]: True
86 Out[67]: True
86
87
87
88
88 .. note::
89 .. note::
89
90
90 The client's own version of :meth:`map` or that of :class:`.DirectView` do
91 The :class:`DirectView`'s version of :meth:`map` does
91 not do any load balancing. For a load balanced version, use a
92 not do any load balancing. For a load balanced version, use a
92 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
93 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
93 `targets=None`.
94 `balanced=True`.
94
95
95 .. seealso::
96 .. seealso::
96
97
@@ -105,16 +106,18 b' some decorators:'
105
106
106 .. sourcecode:: ipython
107 .. sourcecode:: ipython
107
108
108 In [10]: @rc.remote(block=True)
109 In [10]: @rc.remote(block=True, targets=0)
109 ....: def f(x):
110 ....: def f(x):
110 ....: return 10.0*x**4
111 ....: return 10.0*x**4
111 ....:
112 ....:
112
113
113 In [11]: map(f, range(32)) # this is done in parallel
114 In [11]: map(f, range(32)) # this is done on engine 0
114 Out[11]: [0.0,10.0,160.0,...]
115 Out[11]: [0.0,10.0,160.0,...]
115
116
116 See the docstring for the :func:`parallel` and :func:`remote` decorators for
117 .. seealso::
117 options.
118
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
120 options.
118
121
119 Calling Python functions
122 Calling Python functions
120 ========================
123 ========================
@@ -141,7 +144,7 b' Instead, they provide the signature::'
141 In order to provide the nicer interface, we have :class:`View` classes, which wrap
144 In order to provide the nicer interface, we have :class:`View` classes, which wrap
142 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
145 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
143 the extra arguments. For instance, performing index-access on a client creates a
146 the extra arguments. For instance, performing index-access on a client creates a
144 :class:`.LoadBalancedView`.
147 :class:`.DirectView`.
145
148
146 .. sourcecode:: ipython
149 .. sourcecode:: ipython
147
150
@@ -218,7 +221,7 b' unbound, unless called by the :meth:`apply_bound` method:'
218
221
219 .. sourcecode:: ipython
222 .. sourcecode:: ipython
220
223
221 In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere
224 In [9]: dview['b'] = 5 # assign b to 5 everywhere
222
225
223 In [10]: v0 = rc[0]
226 In [10]: v0 = rc[0]
224
227
@@ -277,14 +280,14 b' local Python/IPython session:'
277 ...: return time.time()-tic
280 ...: return time.time()-tic
278
281
279 # In non-blocking mode
282 # In non-blocking mode
280 In [7]: pr = rc[:].apply_async(wait, 2)
283 In [7]: pr = dview.apply_async(wait, 2)
281
284
282 # Now block for the result
285 # Now block for the result
283 In [8]: pr.get()
286 In [8]: pr.get()
284 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
287 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
285
288
286 # Again in non-blocking mode
289 # Again in non-blocking mode
287 In [9]: pr = rc[:].apply_async(wait, 10)
290 In [9]: pr = dview.apply_async(wait, 10)
288
291
289 # Poll to see if the result is ready
292 # Poll to see if the result is ready
290 In [10]: pr.ready()
293 In [10]: pr.ready()
@@ -321,7 +324,7 b' associated results are ready:'
321 In [72]: rc.block=False
324 In [72]: rc.block=False
322
325
323 # A trivial list of AsyncResults objects
326 # A trivial list of AsyncResults objects
324 In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)]
327 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
325
328
326 # Wait until all of them are done
329 # Wait until all of them are done
327 In [74]: rc.barrier(pr_list)
330 In [74]: rc.barrier(pr_list)
@@ -332,63 +335,38 b' associated results are ready:'
332
335
333
336
334
337
335 The ``block`` and ``targets`` keyword arguments and attributes
338 The ``block`` keyword argument and attributes
336 --------------------------------------------------------------
339 ---------------------------------------------
337
338 .. warning::
339
340 This is different now, I haven't updated this section.
341 -MinRK
342
340
343 Most methods(like :meth:`apply`) accept
341 Most methods(like :meth:`apply`) accept
344 ``block`` and ``targets`` as keyword arguments. As we have seen above, these
342 ``block`` as a keyword argument. As we have seen above, these
345 keyword arguments control the blocking mode and which engines the command is
343 keyword arguments control the blocking mode . The :class:`Client` class also has
346 applied to. The :class:`Client` class also has :attr:`block` and
344 a :attr:`block` attribute that controls the default behavior when the keyword
347 :attr:`targets` attributes that control the default behavior when the keyword
345 argument is not provided. Thus the following logic is used for :attr:`block`:
348 arguments are not provided. Thus the following logic is used for :attr:`block`
349 and :attr:`targets`:
350
346
351 * If no keyword argument is provided, the instance attributes are used.
347 * If no keyword argument is provided, the instance attributes are used.
352 * Keyword argument, if provided override the instance attributes.
348 * Keyword argument, if provided override the instance attributes for
349 the duration of a single call.
353
350
354 The following examples demonstrate how to use the instance attributes:
351 The following examples demonstrate how to use the instance attributes:
355
352
356 .. sourcecode:: ipython
353 .. sourcecode:: ipython
357
354
358 In [16]: rc.targets = [0,2]
359
360 In [17]: rc.block = False
355 In [17]: rc.block = False
361
356
362 In [18]: pr = rc.execute('a=5')
357 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
363
364 In [19]: pr.r
365 Out[19]:
366 <Results List>
367 [0] In [6]: a=5
368 [2] In [6]: a=5
369
358
370 # Note targets='all' means all engines
359 In [19]: ar.get()
371 In [20]: rc.targets = 'all'
360 Out[19]: [10,10]
372
361
373 In [21]: rc.block = True
362 In [21]: rc.block = True
374
363
375 In [22]: rc.execute('b=10; print b')
364 # Note targets='all' means all engines
376 Out[22]:
365 In [22]: rc.apply(lambda : 42, targets='all')
377 <Results List>
366 Out[22]: [42, 42, 42, 42]
378 [0] In [7]: b=10; print b
379 [0] Out[7]: 10
380
381 [1] In [6]: b=10; print b
382 [1] Out[6]: 10
383
384 [2] In [7]: b=10; print b
385 [2] Out[7]: 10
386
387 [3] In [6]: b=10; print b
388 [3] Out[6]: 10
389
367
390 The :attr:`block` and :attr:`targets` instance attributes also determine the
368 The :attr:`block` and :attr:`targets` instance attributes of the
391 behavior of the parallel magic commands.
369 :class:`.DirectView` also determine the behavior of the parallel magic commands.
392
370
393
371
394 Parallel magic commands
372 Parallel magic commands
@@ -567,9 +545,9 b' appear as a local dictionary. Underneath, this uses :meth:`push` and'
567
545
568 In [50]: rc.block=True
546 In [50]: rc.block=True
569
547
570 In [51]: rc[:]['a']=['foo','bar']
548 In [51]: dview['a']=['foo','bar']
571
549
572 In [52]: rc[:]['a']
550 In [52]: dview['a']
573 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
551 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
574
552
575 Scatter and gather
553 Scatter and gather
@@ -585,13 +563,13 b' between engines, MPI should be used:'
585
563
586 .. sourcecode:: ipython
564 .. sourcecode:: ipython
587
565
588 In [58]: rc.scatter('a',range(16))
566 In [58]: dview.scatter('a',range(16))
589 Out[58]: [None,None,None,None]
567 Out[58]: [None,None,None,None]
590
568
591 In [59]: rc[:]['a']
569 In [59]: dview['a']
592 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
570 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
593
571
594 In [60]: rc.gather('a')
572 In [60]: dview.gather('a')
595 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
573 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
596
574
597 Other things to look at
575 Other things to look at
@@ -606,14 +584,14 b' basic effect using :meth:`scatter` and :meth:`gather`:'
606
584
607 .. sourcecode:: ipython
585 .. sourcecode:: ipython
608
586
609 In [66]: rc.scatter('x',range(64))
587 In [66]: dview.scatter('x',range(64))
610 Out[66]: [None,None,None,None]
588 Out[66]: [None,None,None,None]
611
589
612 In [67]: px y = [i**10 for i in x]
590 In [67]: px y = [i**10 for i in x]
613 Parallel execution on engines: [0, 1, 2, 3]
591 Parallel execution on engines: [0, 1, 2, 3]
614 Out[67]:
592 Out[67]:
615
593
616 In [68]: y = rc.gather('y')
594 In [68]: y = dview.gather('y')
617
595
618 In [69]: print y
596 In [69]: print y
619 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
597 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
@@ -41,8 +41,8 b' module and then create a :class:`.Client` instance:'
41
41
42 In [2]: rc = client.Client()
42 In [2]: rc = client.Client()
43
43
44 In [3]: lview = rc[None]
44 In [3]: lview = rc.view(balanced=True)
45 Out[3]: <LoadBalancedView tcp://127.0.0.1:10101>
45 Out[3]: <LoadBalancedView None>
46
46
47
47
48 This form assumes that the controller was started on localhost with default
48 This form assumes that the controller was started on localhost with default
@@ -66,7 +66,7 b' objects, but *in parallel*. Like the multiengine interface, these can be'
66 implemented via the task interface. The exact same tools can perform these
66 implemented via the task interface. The exact same tools can perform these
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
69 argument `targets=None`, then they are dynamically load balanced. Thus, if the
69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
70 execution time per item varies significantly, you should use the versions in
70 execution time per item varies significantly, you should use the versions in
71 the task interface.
71 the task interface.
72
72
@@ -80,7 +80,7 b' for the ``None`` element:'
80
80
81 In [63]: serial_result = map(lambda x:x**10, range(32))
81 In [63]: serial_result = map(lambda x:x**10, range(32))
82
82
83 In [64]: parallel_result = tc[None].map(lambda x:x**10, range(32))
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84
84
85 In [65]: serial_result==parallel_result
85 In [65]: serial_result==parallel_result
86 Out[65]: True
86 Out[65]: True
@@ -258,13 +258,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
258
258
259 In [14]: client.block=False
259 In [14]: client.block=False
260
260
261 In [15]: ar = client.apply(f, args, kwargs, targets=None)
261 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
262
262
263 In [16]: ar2 = client.apply(f2, targets=None)
263 In [16]: ar2 = client.apply(f2, balanced=True)
264
264
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2])
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
266
266
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
268
268
269
269
270 .. seealso::
270 .. seealso::
@@ -383,7 +383,7 b' The following is an overview of how to use these classes together:'
383 1. Create a :class:`Client`.
383 1. Create a :class:`Client`.
384 2. Define some functions to be run as tasks
384 2. Define some functions to be run as tasks
385 3. Submit your tasks to using the :meth:`apply` method of your
385 3. Submit your tasks to using the :meth:`apply` method of your
386 :class:`Client` instance, specifying `targets=None`. This signals
386 :class:`Client` instance, specifying `balanced=True`. This signals
387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
388 4. Use :meth:`Client.get_results` to get the results of the
388 4. Use :meth:`Client.get_results` to get the results of the
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
General Comments 0
You need to be logged in to leave comments. Login now