Show More
@@ -147,9 +147,9 b' class UnSerializeIt(UnSerialized):' | |||
|
147 | 147 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': |
|
148 | 148 | result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) |
|
149 | 149 | result.shape = self.serialized.metadata['shape'] |
|
150 |
# |
|
|
150 | # numpy arrays with frombuffer are read-only. We are working with | |
|
151 | 151 | # the numpy folks to address this issue. |
|
152 | result = result.copy() | |
|
152 | # result = result.copy() | |
|
153 | 153 | elif typeDescriptor == 'pickle': |
|
154 | 154 | result = pickle.loads(self.serialized.getData()) |
|
155 | 155 | elif typeDescriptor in ('bytes', 'buffer'): |
@@ -21,7 +21,7 b' from pprint import pprint' | |||
|
21 | 21 | pjoin = os.path.join |
|
22 | 22 | |
|
23 | 23 | import zmq |
|
24 | from zmq.eventloop import ioloop, zmqstream | |
|
24 | # from zmq.eventloop import ioloop, zmqstream | |
|
25 | 25 | |
|
26 | 26 | from IPython.utils.path import get_ipython_dir |
|
27 | 27 | from IPython.external.decorator import decorator |
@@ -203,6 +203,7 b' class Client(object):' | |||
|
203 | 203 | |
|
204 | 204 | Attributes |
|
205 | 205 | ---------- |
|
206 | ||
|
206 | 207 | ids : set of int engine IDs |
|
207 | 208 | requesting the ids attribute always synchronizes |
|
208 | 209 | the registration state. To request ids without synchronization, |
@@ -225,17 +226,23 b' class Client(object):' | |||
|
225 | 226 | |
|
226 | 227 | Methods |
|
227 | 228 | ------- |
|
228 | spin : flushes incoming results and registration state changes | |
|
229 | control methods spin, and requesting `ids` also ensures up to date | |
|
230 | ||
|
231 | barrier : wait on one or more msg_ids | |
|
232 | 229 | |
|
233 | execution methods: apply/apply_bound/apply_to/apply_bound | |
|
230 | spin | |
|
231 | flushes incoming results and registration state changes | |
|
232 | control methods spin, and requesting `ids` also ensures up to date | |
|
233 | ||
|
234 | barrier | |
|
235 | wait on one or more msg_ids | |
|
236 | ||
|
237 | execution methods | |
|
238 | apply | |
|
234 | 239 | legacy: execute, run |
|
235 | 240 | |
|
236 | query methods: queue_status, get_result, purge | |
|
241 | query methods | |
|
242 | queue_status, get_result, purge | |
|
237 | 243 | |
|
238 |
control methods |
|
|
244 | control methods | |
|
245 | abort, shutdown | |
|
239 | 246 | |
|
240 | 247 | """ |
|
241 | 248 | |
@@ -265,7 +272,6 b' class Client(object):' | |||
|
265 | 272 | if context is None: |
|
266 | 273 | context = zmq.Context() |
|
267 | 274 | self.context = context |
|
268 | self.targets = 'all' | |
|
269 | 275 | |
|
270 | 276 | self._setup_cluster_dir(profile, cluster_dir, ipython_dir) |
|
271 | 277 | if self._cd is not None: |
@@ -634,42 +640,18 b' class Client(object):' | |||
|
634 | 640 | #-------------------------------------------------------------------------- |
|
635 | 641 | |
|
636 | 642 | def __getitem__(self, key): |
|
637 |
""" |
|
|
638 | if key is None, a LoadBalancedView.""" | |
|
639 | if key is None: | |
|
640 | return LoadBalancedView(self) | |
|
641 | if isinstance(key, int): | |
|
642 | if key not in self.ids: | |
|
643 | raise IndexError("No such engine: %i"%key) | |
|
644 | return DirectView(self, key) | |
|
645 | ||
|
646 | if isinstance(key, slice): | |
|
647 | indices = range(len(self.ids))[key] | |
|
648 | ids = sorted(self._ids) | |
|
649 | key = [ ids[i] for i in indices ] | |
|
650 | # newkeys = sorted(self._ids)[thekeys[k]] | |
|
643 | """index access returns DirectView multiplexer objects | |
|
651 | 644 |
|
|
652 | if isinstance(key, (tuple, list, xrange)): | |
|
653 | _,targets = self._build_targets(list(key)) | |
|
654 | return DirectView(self, targets) | |
|
645 | Must be int, slice, or list/tuple/xrange of ints""" | |
|
646 | if not isinstance(key, (int, slice, tuple, list, xrange)): | |
|
647 | raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | |
|
655 | 648 | else: |
|
656 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | |
|
649 | return self.view(key, balanced=False) | |
|
657 | 650 | |
|
658 | 651 | #-------------------------------------------------------------------------- |
|
659 | 652 | # Begin public methods |
|
660 | 653 | #-------------------------------------------------------------------------- |
|
661 | 654 | |
|
662 | @property | |
|
663 | def remote(self): | |
|
664 | """property for convenient RemoteFunction generation. | |
|
665 | ||
|
666 | >>> @client.remote | |
|
667 | ... def getpid(): | |
|
668 | import os | |
|
669 | return os.getpid() | |
|
670 | """ | |
|
671 | return remote(self, block=self.block) | |
|
672 | ||
|
673 | 655 | def spin(self): |
|
674 | 656 | """Flush any registration notifications and execution results |
|
675 | 657 | waiting in the ZMQ queue. |
@@ -690,6 +672,7 b' class Client(object):' | |||
|
690 | 672 | |
|
691 | 673 | Parameters |
|
692 | 674 | ---------- |
|
675 | ||
|
693 | 676 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects |
|
694 | 677 | ints are indices to self.history |
|
695 | 678 | strs are msg_ids |
@@ -700,6 +683,7 b' class Client(object):' | |||
|
700 | 683 | |
|
701 | 684 | Returns |
|
702 | 685 | ------- |
|
686 | ||
|
703 | 687 | True : when all msg_ids are done |
|
704 | 688 | False : timeout reached, some msg_ids still outstanding |
|
705 | 689 | """ |
@@ -815,6 +799,7 b' class Client(object):' | |||
|
815 | 799 | |
|
816 | 800 | Parameters |
|
817 | 801 | ---------- |
|
802 | ||
|
818 | 803 | code : str |
|
819 | 804 | the code string to be executed |
|
820 | 805 | targets : int/str/list of ints/strs |
@@ -824,7 +809,7 b' class Client(object):' | |||
|
824 | 809 | whether or not to wait until done to return |
|
825 | 810 | default: self.block |
|
826 | 811 | """ |
|
827 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) | |
|
812 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False) | |
|
828 | 813 | return result |
|
829 | 814 | |
|
830 | 815 | def run(self, filename, targets='all', block=None): |
@@ -834,6 +819,7 b' class Client(object):' | |||
|
834 | 819 | |
|
835 | 820 | Parameters |
|
836 | 821 | ---------- |
|
822 | ||
|
837 | 823 | filename : str |
|
838 | 824 | The path to the file |
|
839 | 825 | targets : int/str/list of ints/strs |
@@ -868,7 +854,8 b' class Client(object):' | |||
|
868 | 854 | return list(Dependency(dep)) |
|
869 | 855 | |
|
870 | 856 | @defaultblock |
|
871 |
def apply(self, f, args=None, kwargs=None, bound=True, block=None, |
|
|
857 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, | |
|
858 | targets=None, balanced=None, | |
|
872 | 859 | after=None, follow=None, timeout=None): |
|
873 | 860 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. |
|
874 | 861 | |
@@ -905,11 +892,34 b' class Client(object):' | |||
|
905 | 892 | if int: |
|
906 | 893 | Run on single engine |
|
907 | 894 | |
|
908 | after,follow,timeout only used in `apply_balanced`. See that docstring | |
|
909 | for details. | |
|
895 | balanced : bool, default None | |
|
896 | whether to load-balance. This will default to True | |
|
897 | if targets is unspecified, or False if targets is specified. | |
|
898 | ||
|
899 | The following arguments are only used when balanced is True: | |
|
900 | after : Dependency or collection of msg_ids | |
|
901 | Only for load-balanced execution (targets=None) | |
|
902 | Specify a list of msg_ids as a time-based dependency. | |
|
903 | This job will only be run *after* the dependencies | |
|
904 | have been met. | |
|
905 | ||
|
906 | follow : Dependency or collection of msg_ids | |
|
907 | Only for load-balanced execution (targets=None) | |
|
908 | Specify a list of msg_ids as a location-based dependency. | |
|
909 | This job will only be run on an engine where this dependency | |
|
910 | is met. | |
|
911 | ||
|
912 | timeout : float/int or None | |
|
913 | Only for load-balanced execution (targets=None) | |
|
914 | Specify an amount of time (in seconds) for the scheduler to | |
|
915 | wait for dependencies to be met before failing with a | |
|
916 | DependencyTimeout. | |
|
917 | ||
|
918 | after,follow,timeout only used if `balanced=True`. | |
|
910 | 919 | |
|
911 | 920 | Returns |
|
912 | 921 | ------- |
|
922 | ||
|
913 | 923 | if block is False: |
|
914 | 924 | return AsyncResult wrapping msg_ids |
|
915 | 925 | output of AsyncResult.get() is identical to that of `apply(...block=True)` |
@@ -921,10 +931,21 b' class Client(object):' | |||
|
921 | 931 | """ |
|
922 | 932 | |
|
923 | 933 | # defaults: |
|
924 | block = block if block is not None else self.block | |
|
925 | 934 | args = args if args is not None else [] |
|
926 | 935 | kwargs = kwargs if kwargs is not None else {} |
|
927 | 936 | |
|
937 | if balanced is None: | |
|
938 | if targets is None: | |
|
939 | # default to balanced if targets unspecified | |
|
940 | balanced = True | |
|
941 | else: | |
|
942 | # otherwise default to multiplexing | |
|
943 | balanced = False | |
|
944 | ||
|
945 | if targets is None and balanced is False: | |
|
946 | # default to all if *not* balanced, and targets is unspecified | |
|
947 | targets = 'all' | |
|
948 | ||
|
928 | 949 | # enforce types of f,args,kwrags |
|
929 | 950 | if not callable(f): |
|
930 | 951 | raise TypeError("f must be callable, not %s"%type(f)) |
@@ -935,80 +956,27 b' class Client(object):' | |||
|
935 | 956 | |
|
936 | 957 | options = dict(bound=bound, block=block, targets=targets) |
|
937 | 958 | |
|
938 | if targets is None: | |
|
939 | return self.apply_balanced(f, args, kwargs, timeout=timeout, | |
|
959 | if balanced: | |
|
960 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
|
940 | 961 | after=after, follow=follow, **options) |
|
941 | else: | |
|
942 | if follow or after or timeout: | |
|
943 | msg = "follow, after, and timeout args are only used for load-balanced" | |
|
944 | msg += "execution." | |
|
962 | elif follow or after or timeout: | |
|
963 | msg = "follow, after, and timeout args are only used for" | |
|
964 | msg += " load-balanced execution." | |
|
945 | 965 | raise ValueError(msg) |
|
966 | else: | |
|
946 | 967 | return self._apply_direct(f, args, kwargs, **options) |
|
947 | 968 | |
|
948 | @defaultblock | |
|
949 | def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, | |
|
969 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, | |
|
950 | 970 | after=None, follow=None, timeout=None): |
|
951 | 971 | """call f(*args, **kwargs) remotely in a load-balanced manner. |
|
952 | 972 | |
|
953 | Parameters | |
|
954 | ---------- | |
|
955 | ||
|
956 | f : function | |
|
957 | The fuction to be called remotely | |
|
958 | args : tuple/list | |
|
959 | The positional arguments passed to `f` | |
|
960 | kwargs : dict | |
|
961 | The keyword arguments passed to `f` | |
|
962 | bound : bool (default: True) | |
|
963 | Whether to execute in the Engine(s) namespace, or in a clean | |
|
964 | namespace not affecting the engine. | |
|
965 | block : bool (default: self.block) | |
|
966 | Whether to wait for the result, or return immediately. | |
|
967 | False: | |
|
968 | returns AsyncResult | |
|
969 | True: | |
|
970 | returns actual result(s) of f(*args, **kwargs) | |
|
971 | if multiple targets: | |
|
972 | list of results, matching `targets` | |
|
973 | targets : int,list of ints, 'all', None | |
|
974 | Specify the destination of the job. | |
|
975 | if None: | |
|
976 | Submit via Task queue for load-balancing. | |
|
977 | if 'all': | |
|
978 | Run on all active engines | |
|
979 | if list: | |
|
980 | Run on each specified engine | |
|
981 | if int: | |
|
982 | Run on single engine | |
|
983 | ||
|
984 | after : Dependency or collection of msg_ids | |
|
985 | Only for load-balanced execution (targets=None) | |
|
986 | Specify a list of msg_ids as a time-based dependency. | |
|
987 | This job will only be run *after* the dependencies | |
|
988 | have been met. | |
|
989 | ||
|
990 | follow : Dependency or collection of msg_ids | |
|
991 | Only for load-balanced execution (targets=None) | |
|
992 | Specify a list of msg_ids as a location-based dependency. | |
|
993 | This job will only be run on an engine where this dependency | |
|
994 | is met. | |
|
995 | ||
|
996 | timeout : float/int or None | |
|
997 | Only for load-balanced execution (targets=None) | |
|
998 | Specify an amount of time (in seconds) for the scheduler to | |
|
999 | wait for dependencies to be met before failing with a | |
|
1000 | DependencyTimeout. | |
|
1001 | ||
|
1002 | Returns | |
|
1003 | ------- | |
|
1004 | if block is False: | |
|
1005 | return AsyncResult wrapping msg_id | |
|
1006 | output of AsyncResult.get() is identical to that of `apply(...block=True)` | |
|
1007 | else: | |
|
1008 | wait for, and return actual result of `f(*args, **kwargs)` | |
|
1009 | ||
|
973 | This is a private method, see `apply` for details. | |
|
974 | Not to be called directly! | |
|
1010 | 975 | """ |
|
1011 | 976 | |
|
977 | for kwarg in (bound, block, targets): | |
|
978 | assert kwarg is not None, "kwarg %r must be specified!"%kwarg | |
|
979 | ||
|
1012 | 980 | if self._task_socket is None: |
|
1013 | 981 | msg = "Task farming is disabled" |
|
1014 | 982 | if self._task_scheme == 'pure': |
@@ -1025,7 +993,6 b' class Client(object):' | |||
|
1025 | 993 | if isinstance(f, dependent): |
|
1026 | 994 | # soft warn on functional dependencies |
|
1027 | 995 | warnings.warn(msg, RuntimeWarning) |
|
1028 | ||
|
1029 | 996 | |
|
1030 | 997 | # defaults: |
|
1031 | 998 | args = args if args is not None else [] |
@@ -1036,14 +1003,6 b' class Client(object):' | |||
|
1036 | 1003 | else: |
|
1037 | 1004 | idents = [] |
|
1038 | 1005 | |
|
1039 | # enforce types of f,args,kwrags | |
|
1040 | if not callable(f): | |
|
1041 | raise TypeError("f must be callable, not %s"%type(f)) | |
|
1042 | if not isinstance(args, (tuple, list)): | |
|
1043 | raise TypeError("args must be tuple or list, not %s"%type(args)) | |
|
1044 | if not isinstance(kwargs, dict): | |
|
1045 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |
|
1046 | ||
|
1047 | 1006 | after = self._build_dependency(after) |
|
1048 | 1007 | follow = self._build_dependency(follow) |
|
1049 | 1008 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) |
@@ -1064,13 +1023,17 b' class Client(object):' | |||
|
1064 | 1023 | else: |
|
1065 | 1024 | return ar |
|
1066 | 1025 | |
|
1067 |
def _apply_direct(self, f, args, kwargs, bound= |
|
|
1026 | def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None): | |
|
1068 | 1027 | """Then underlying method for applying functions to specific engines |
|
1069 | 1028 | via the MUX queue. |
|
1070 | 1029 | |
|
1030 | This is a private method, see `apply` for details. | |
|
1071 | 1031 | Not to be called directly! |
|
1072 | 1032 | """ |
|
1073 | 1033 | |
|
1034 | for kwarg in (bound, block, targets): | |
|
1035 | assert kwarg is not None, "kwarg %r must be specified!"%kwarg | |
|
1036 | ||
|
1074 | 1037 | idents,targets = self._build_targets(targets) |
|
1075 | 1038 | |
|
1076 | 1039 | subheader = {} |
@@ -1095,103 +1058,46 b' class Client(object):' | |||
|
1095 | 1058 | return ar |
|
1096 | 1059 | |
|
1097 | 1060 | #-------------------------------------------------------------------------- |
|
1098 |
# |
|
|
1061 | # decorators | |
|
1099 | 1062 | #-------------------------------------------------------------------------- |
|
1100 | 1063 | |
|
1101 | def map(self, f, *sequences, **kwargs): | |
|
1102 | """Parallel version of builtin `map`, using all our engines. | |
|
1103 | ||
|
1104 | `block` and `targets` can be passed as keyword arguments only. | |
|
1105 | ||
|
1106 | There will be one task per target, so work will be chunked | |
|
1107 | if the sequences are longer than `targets`. | |
|
1108 | ||
|
1109 | Results can be iterated as they are ready, but will become available in chunks. | |
|
1110 | ||
|
1111 | Parameters | |
|
1112 | ---------- | |
|
1113 | ||
|
1114 | f : callable | |
|
1115 | function to be mapped | |
|
1116 | *sequences: one or more sequences of matching length | |
|
1117 | the sequences to be distributed and passed to `f` | |
|
1118 | block : bool | |
|
1119 | whether to wait for the result or not [default self.block] | |
|
1120 | targets : valid targets | |
|
1121 | targets to be used [default self.targets] | |
|
1122 | ||
|
1123 | Returns | |
|
1124 | ------- | |
|
1125 | ||
|
1126 | if block=False: | |
|
1127 | AsyncMapResult | |
|
1128 | An object like AsyncResult, but which reassembles the sequence of results | |
|
1129 | into a single list. AsyncMapResults can be iterated through before all | |
|
1130 | results are complete. | |
|
1131 | else: | |
|
1132 | the result of map(f,*sequences) | |
|
1133 | ||
|
1134 | """ | |
|
1135 | block = kwargs.get('block', self.block) | |
|
1136 | targets = kwargs.get('targets', self.targets) | |
|
1137 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
1138 | pf = ParallelFunction(self, f, block=block, | |
|
1139 | bound=True, targets=targets) | |
|
1140 | return pf.map(*sequences) | |
|
1141 | ||
|
1142 | def imap(self, f, *sequences, **kwargs): | |
|
1143 | """Parallel version of builtin `itertools.imap`, load-balanced across all engines. | |
|
1144 | ||
|
1145 | Each element will be a separate task, and will be load-balanced. This | |
|
1146 | lets individual elements be ready for iteration as soon as they come. | |
|
1147 | ||
|
1148 | Parameters | |
|
1149 | ---------- | |
|
1150 | ||
|
1151 | f : callable | |
|
1152 | function to be mapped | |
|
1153 | *sequences: one or more sequences of matching length | |
|
1154 | the sequences to be distributed and passed to `f` | |
|
1155 | block : bool | |
|
1156 | whether to wait for the result or not [default self.block] | |
|
1157 | ||
|
1158 | Returns | |
|
1159 | ------- | |
|
1160 | ||
|
1161 | if block=False: | |
|
1162 | AsyncMapResult | |
|
1163 | An object like AsyncResult, but which reassembles the sequence of results | |
|
1164 | into a single list. AsyncMapResults can be iterated through before all | |
|
1165 | results are complete. | |
|
1166 | else: | |
|
1167 | the result of map(f,*sequences) | |
|
1168 | ||
|
1169 | """ | |
|
1170 | ||
|
1171 | block = kwargs.get('block', self.block) | |
|
1172 | ||
|
1173 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
1174 | ||
|
1175 | pf = ParallelFunction(self, f, block=self.block, | |
|
1176 | bound=True, targets=None) | |
|
1177 | return pf.map(*sequences) | |
|
1178 | ||
|
1179 | def parallel(self, bound=True, targets='all', block=True): | |
|
1064 | @defaultblock | |
|
1065 | def parallel(self, bound=True, targets='all', block=None): | |
|
1180 | 1066 | """Decorator for making a ParallelFunction.""" |
|
1181 | 1067 | return parallel(self, bound=bound, targets=targets, block=block) |
|
1182 | 1068 | |
|
1183 | def remote(self, bound=True, targets='all', block=True): | |
|
1069 | @defaultblock | |
|
1070 | def remote(self, bound=True, targets='all', block=None): | |
|
1184 | 1071 | """Decorator for making a RemoteFunction.""" |
|
1185 | 1072 | return remote(self, bound=bound, targets=targets, block=block) |
|
1186 | 1073 | |
|
1187 | 1074 | def view(self, targets=None, balanced=False): |
|
1188 | 1075 | """Method for constructing View objects""" |
|
1189 | if not balanced: | |
|
1190 |
if |
|
|
1076 | if targets is None: | |
|
1077 | if balanced: | |
|
1078 | return LoadBalancedView(client=self) | |
|
1079 | else: | |
|
1191 | 1080 | targets = slice(None) |
|
1192 | return self[targets] | |
|
1081 | ||
|
1082 | if balanced: | |
|
1083 | view_class = LoadBalancedView | |
|
1084 | else: | |
|
1085 | view_class = DirectView | |
|
1086 | if isinstance(targets, int): | |
|
1087 | if targets not in self.ids: | |
|
1088 | raise IndexError("No such engine: %i"%targets) | |
|
1089 | return view_class(client=self, targets=targets) | |
|
1090 | ||
|
1091 | if isinstance(targets, slice): | |
|
1092 | indices = range(len(self.ids))[targets] | |
|
1093 | ids = sorted(self._ids) | |
|
1094 | targets = [ ids[i] for i in indices ] | |
|
1095 | ||
|
1096 | if isinstance(targets, (tuple, list, xrange)): | |
|
1097 | _,targets = self._build_targets(list(targets)) | |
|
1098 | return view_class(client=self, targets=targets) | |
|
1193 | 1099 | else: |
|
1194 | return LoadBalancedView(self, targets) | |
|
1100 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | |
|
1195 | 1101 | |
|
1196 | 1102 | #-------------------------------------------------------------------------- |
|
1197 | 1103 | # Data movement |
@@ -1202,7 +1108,7 b' class Client(object):' | |||
|
1202 | 1108 | """Push the contents of `ns` into the namespace on `target`""" |
|
1203 | 1109 | if not isinstance(ns, dict): |
|
1204 | 1110 | raise TypeError("Must be a dict, not %s"%type(ns)) |
|
1205 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True) | |
|
1111 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False) | |
|
1206 | 1112 | return result |
|
1207 | 1113 | |
|
1208 | 1114 | @defaultblock |
@@ -1214,14 +1120,14 b' class Client(object):' | |||
|
1214 | 1120 | for key in keys: |
|
1215 | 1121 | if not isinstance(key, str): |
|
1216 | 1122 | raise TypeError |
|
1217 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |
|
1123 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | |
|
1218 | 1124 | return result |
|
1219 | 1125 | |
|
1126 | @defaultblock | |
|
1220 | 1127 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): |
|
1221 | 1128 | """ |
|
1222 | 1129 | Partition a Python sequence and send the partitions to a set of engines. |
|
1223 | 1130 | """ |
|
1224 | block = block if block is not None else self.block | |
|
1225 | 1131 | targets = self._build_targets(targets)[-1] |
|
1226 | 1132 | mapObject = Map.dists[dist]() |
|
1227 | 1133 | nparts = len(targets) |
@@ -1239,11 +1145,11 b' class Client(object):' | |||
|
1239 | 1145 | else: |
|
1240 | 1146 | return r |
|
1241 | 1147 | |
|
1148 | @defaultblock | |
|
1242 | 1149 | def gather(self, key, dist='b', targets='all', block=None): |
|
1243 | 1150 | """ |
|
1244 | 1151 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
1245 | 1152 | """ |
|
1246 | block = block if block is not None else self.block | |
|
1247 | 1153 | |
|
1248 | 1154 | targets = self._build_targets(targets)[-1] |
|
1249 | 1155 | mapObject = Map.dists[dist]() |
@@ -1267,6 +1173,7 b' class Client(object):' | |||
|
1267 | 1173 | |
|
1268 | 1174 | Parameters |
|
1269 | 1175 | ---------- |
|
1176 | ||
|
1270 | 1177 | msg_ids : list of ints or msg_ids |
|
1271 | 1178 | if int: |
|
1272 | 1179 | Passed as index to self.history for convenience. |
@@ -1351,13 +1258,14 b' class Client(object):' | |||
|
1351 | 1258 | return content |
|
1352 | 1259 | |
|
1353 | 1260 | @spinfirst |
|
1354 |
def queue_status(self, targets= |
|
|
1261 | def queue_status(self, targets='all', verbose=False): | |
|
1355 | 1262 | """Fetch the status of engine queues. |
|
1356 | 1263 | |
|
1357 | 1264 | Parameters |
|
1358 | 1265 | ---------- |
|
1266 | ||
|
1359 | 1267 | targets : int/str/list of ints/strs |
|
1360 |
the engines |
|
|
1268 | the engines whose states are to be queried. | |
|
1361 | 1269 | default : all |
|
1362 | 1270 | verbose : bool |
|
1363 | 1271 | Whether to return lengths only, or lists of ids for each element |
@@ -1383,6 +1291,7 b' class Client(object):' | |||
|
1383 | 1291 | |
|
1384 | 1292 | Parameters |
|
1385 | 1293 | ---------- |
|
1294 | ||
|
1386 | 1295 | msg_ids : str or list of strs |
|
1387 | 1296 | the msg_ids whose results should be forgotten. |
|
1388 | 1297 | targets : int/str/list of ints/strs |
@@ -1404,59 +1313,6 b' class Client(object):' | |||
|
1404 | 1313 | if content['status'] != 'ok': |
|
1405 | 1314 | raise ss.unwrap_exception(content) |
|
1406 | 1315 | |
|
1407 | #---------------------------------------- | |
|
1408 | # activate for %px,%autopx magics | |
|
1409 | #---------------------------------------- | |
|
1410 | def activate(self): | |
|
1411 | """Make this `View` active for parallel magic commands. | |
|
1412 | ||
|
1413 | IPython has a magic command syntax to work with `MultiEngineClient` objects. | |
|
1414 | In a given IPython session there is a single active one. While | |
|
1415 | there can be many `Views` created and used by the user, | |
|
1416 | there is only one active one. The active `View` is used whenever | |
|
1417 | the magic commands %px and %autopx are used. | |
|
1418 | ||
|
1419 | The activate() method is called on a given `View` to make it | |
|
1420 | active. Once this has been done, the magic commands can be used. | |
|
1421 | """ | |
|
1422 | ||
|
1423 | try: | |
|
1424 | # This is injected into __builtins__. | |
|
1425 | ip = get_ipython() | |
|
1426 | except NameError: | |
|
1427 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | |
|
1428 | else: | |
|
1429 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') | |
|
1430 | if pmagic is not None: | |
|
1431 | pmagic.active_multiengine_client = self | |
|
1432 | else: | |
|
1433 | print "You must first load the parallelmagic extension " \ | |
|
1434 | "by doing '%load_ext parallelmagic'" | |
|
1435 | ||
|
1436 | class AsynClient(Client): | |
|
1437 | """An Asynchronous client, using the Tornado Event Loop. | |
|
1438 | !!!unfinished!!!""" | |
|
1439 | io_loop = None | |
|
1440 | _queue_stream = None | |
|
1441 | _notifier_stream = None | |
|
1442 | _task_stream = None | |
|
1443 | _control_stream = None | |
|
1444 | ||
|
1445 | def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): | |
|
1446 | Client.__init__(self, addr, context, username, debug) | |
|
1447 | if io_loop is None: | |
|
1448 | io_loop = ioloop.IOLoop.instance() | |
|
1449 | self.io_loop = io_loop | |
|
1450 | ||
|
1451 | self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop) | |
|
1452 | self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop) | |
|
1453 | self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop) | |
|
1454 | self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop) | |
|
1455 | ||
|
1456 | def spin(self): | |
|
1457 | for stream in (self.queue_stream, self.notifier_stream, | |
|
1458 | self.task_stream, self.control_stream): | |
|
1459 | stream.flush() | |
|
1460 | 1316 | |
|
1461 | 1317 | __all__ = [ 'Client', |
|
1462 | 1318 | 'depend', |
@@ -17,7 +17,7 b' from asyncresult import AsyncMapResult' | |||
|
17 | 17 | # Decorators |
|
18 | 18 | #----------------------------------------------------------------------------- |
|
19 | 19 | |
|
20 | def remote(client, bound=False, block=None, targets=None): | |
|
20 | def remote(client, bound=False, block=None, targets=None, balanced=None): | |
|
21 | 21 | """Turn a function into a remote function. |
|
22 | 22 | |
|
23 | 23 | This method can be used for map: |
@@ -26,10 +26,10 b' def remote(client, bound=False, block=None, targets=None):' | |||
|
26 | 26 | def func(a) |
|
27 | 27 | """ |
|
28 | 28 | def remote_function(f): |
|
29 | return RemoteFunction(client, f, bound, block, targets) | |
|
29 | return RemoteFunction(client, f, bound, block, targets, balanced) | |
|
30 | 30 | return remote_function |
|
31 | 31 | |
|
32 | def parallel(client, dist='b', bound=False, block=None, targets='all'): | |
|
32 | def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): | |
|
33 | 33 | """Turn a function into a parallel remote function. |
|
34 | 34 | |
|
35 | 35 | This method can be used for map: |
@@ -38,7 +38,7 b" def parallel(client, dist='b', bound=False, block=None, targets='all'):" | |||
|
38 | 38 | def func(a) |
|
39 | 39 | """ |
|
40 | 40 | def parallel_function(f): |
|
41 | return ParallelFunction(client, f, dist, bound, block, targets) | |
|
41 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) | |
|
42 | 42 | return parallel_function |
|
43 | 43 | |
|
44 | 44 | #-------------------------------------------------------------------------- |
@@ -62,6 +62,8 b' class RemoteFunction(object):' | |||
|
62 | 62 | to use the current `block` attribute of `client` |
|
63 | 63 | targets : valid target list [default: all] |
|
64 | 64 | The targets on which to execute. |
|
65 | balanced : bool | |
|
66 | Whether to load-balance with the Task scheduler or not | |
|
65 | 67 | """ |
|
66 | 68 | |
|
67 | 69 | client = None # the remote connection |
@@ -69,23 +71,30 b' class RemoteFunction(object):' | |||
|
69 | 71 | block = None # whether to block |
|
70 | 72 | bound = None # whether to affect the namespace |
|
71 | 73 | targets = None # where to execute |
|
74 | balanced = None # whether to load-balance | |
|
72 | 75 | |
|
73 | def __init__(self, client, f, bound=False, block=None, targets=None): | |
|
76 | def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): | |
|
74 | 77 | self.client = client |
|
75 | 78 | self.func = f |
|
76 | 79 | self.block=block |
|
77 | 80 | self.bound=bound |
|
78 | 81 | self.targets=targets |
|
82 | if balanced is None: | |
|
83 | if targets is None: | |
|
84 | balanced = True | |
|
85 | else: | |
|
86 | balanced = False | |
|
87 | self.balanced = balanced | |
|
79 | 88 | |
|
80 | 89 | def __call__(self, *args, **kwargs): |
|
81 | 90 | return self.client.apply(self.func, args=args, kwargs=kwargs, |
|
82 | block=self.block, targets=self.targets, bound=self.bound) | |
|
91 | block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) | |
|
83 | 92 | |
|
84 | 93 | |
|
85 | 94 | class ParallelFunction(RemoteFunction): |
|
86 | 95 | """Class for mapping a function to sequences.""" |
|
87 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): | |
|
88 | super(ParallelFunction, self).__init__(client,f,bound,block,targets) | |
|
96 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None): | |
|
97 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | |
|
89 | 98 | mapClass = Map.dists[dist] |
|
90 | 99 | self.mapObject = mapClass() |
|
91 | 100 | |
@@ -93,21 +102,19 b' class ParallelFunction(RemoteFunction):' | |||
|
93 | 102 | len_0 = len(sequences[0]) |
|
94 | 103 | for s in sequences: |
|
95 | 104 | if len(s)!=len_0: |
|
96 |
|
|
|
105 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | |
|
106 | raise ValueError(msg) | |
|
97 | 107 | |
|
98 |
if self. |
|
|
99 | # load-balanced: | |
|
100 | engines = [None]*len_0 | |
|
101 | elif isinstance(self.targets, int): | |
|
102 | engines = [None]*self.targets | |
|
108 | if self.balanced: | |
|
109 | targets = [self.targets]*len_0 | |
|
103 | 110 | else: |
|
104 | 111 | # multiplexed: |
|
105 |
|
|
|
112 | targets = self.client._build_targets(self.targets)[-1] | |
|
106 | 113 | |
|
107 |
nparts = len( |
|
|
114 | nparts = len(targets) | |
|
108 | 115 | msg_ids = [] |
|
109 | 116 | # my_f = lambda *a: map(self.func, *a) |
|
110 |
for index, |
|
|
117 | for index, t in enumerate(targets): | |
|
111 | 118 | args = [] |
|
112 | 119 | for seq in sequences: |
|
113 | 120 | part = self.mapObject.getPartition(seq, index, nparts) |
@@ -124,22 +131,26 b' class ParallelFunction(RemoteFunction):' | |||
|
124 | 131 | args = [self.func]+args |
|
125 | 132 | else: |
|
126 | 133 | f=self.func |
|
127 |
|
|
|
128 |
|
|
|
129 | targets=engineid).msg_ids[0] | |
|
130 | msg_ids.append(mid) | |
|
134 | ar = self.client.apply(f, args=args, block=False, bound=self.bound, | |
|
135 | targets=targets, balanced=self.balanced) | |
|
136 | ||
|
137 | msg_ids.append(ar.msg_ids[0]) | |
|
131 | 138 | |
|
132 | 139 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) |
|
133 | 140 | if self.block: |
|
134 |
|
|
|
135 |
return r. |
|
|
141 | try: | |
|
142 | return r.get() | |
|
143 | except KeyboardInterrupt: | |
|
144 | return r | |
|
136 | 145 | else: |
|
137 | 146 | return r |
|
138 | 147 | |
|
139 | 148 | def map(self, *sequences): |
|
140 | 149 | """call a function on each element of a sequence remotely.""" |
|
141 | 150 | self._map = True |
|
142 | ret = self.__call__(*sequences) | |
|
143 | del self._map | |
|
151 | try: | |
|
152 | ret = self.__call__(*sequences) | |
|
153 | finally: | |
|
154 | del self._map | |
|
144 | 155 | return ret |
|
145 | 156 |
@@ -59,7 +59,7 b' def validate_url(url):' | |||
|
59 | 59 | except ValueError: |
|
60 | 60 | raise AssertionError("Invalid port %r in url: %r"%(port, url)) |
|
61 | 61 | |
|
62 | assert pat.match(addr) is not None, 'Invalid url: %r'%url | |
|
62 | assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url | |
|
63 | 63 | |
|
64 | 64 | else: |
|
65 | 65 | # only validate tcp urls currently |
@@ -10,7 +10,11 b'' | |||
|
10 | 10 | # Imports |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance | |
|
14 | ||
|
13 | 15 | from IPython.external.decorator import decorator |
|
16 | from IPython.zmq.parallel.asyncresult import AsyncResult | |
|
17 | from IPython.zmq.parallel.dependency import Dependency | |
|
14 | 18 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel |
|
15 | 19 | |
|
16 | 20 | #----------------------------------------------------------------------------- |
@@ -61,30 +65,29 b' def spin_after(f, self, *args, **kwargs):' | |||
|
61 | 65 | # Classes |
|
62 | 66 | #----------------------------------------------------------------------------- |
|
63 | 67 | |
|
64 |
class View( |
|
|
68 | class View(HasTraits): | |
|
65 | 69 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. |
|
66 | 70 | |
|
67 | 71 | Don't use this class, use subclasses. |
|
68 | 72 | """ |
|
69 |
block= |
|
|
70 |
bound= |
|
|
71 |
history= |
|
|
72 |
outstanding = |
|
|
73 |
results = |
|
|
74 | ||
|
73 | block=Bool(False) | |
|
74 | bound=Bool(False) | |
|
75 | history=List() | |
|
76 | outstanding = Set() | |
|
77 | results = Dict() | |
|
78 | client = Instance('IPython.zmq.parallel.client.Client') | |
|
79 | ||
|
80 | _ntargets = Int(1) | |
|
81 | _balanced = Bool(False) | |
|
82 | _default_names = List(['block', 'bound']) | |
|
75 | 83 | _targets = None |
|
76 | _apply_name = 'apply' | |
|
77 | _default_names = ['targets', 'block'] | |
|
78 | 84 | |
|
79 | def __init__(self, client, targets=None): | |
|
80 |
self.client |
|
|
85 | def __init__(self, client=None, targets=None): | |
|
86 | super(View, self).__init__(client=client) | |
|
81 | 87 | self._targets = targets |
|
82 | 88 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) |
|
83 | 89 | self.block = client.block |
|
84 | self.bound=False | |
|
85 | self.history = [] | |
|
86 | self.outstanding = set() | |
|
87 | self.results = {} | |
|
90 | ||
|
88 | 91 | for name in self._default_names: |
|
89 | 92 | setattr(self, name, getattr(self, name, None)) |
|
90 | 93 | |
@@ -101,26 +104,46 b' class View(object):' | |||
|
101 | 104 | |
|
102 | 105 | @targets.setter |
|
103 | 106 | def targets(self, value): |
|
104 | self._targets = value | |
|
105 | # raise AttributeError("Cannot set my targets argument after construction!") | |
|
107 | raise AttributeError("Cannot set View `targets` after construction!") | |
|
106 | 108 | |
|
107 | 109 | def _defaults(self, *excludes): |
|
108 | 110 | """return dict of our default attributes, excluding names given.""" |
|
109 | d = {} | |
|
111 | d = dict(balanced=self._balanced, targets=self.targets) | |
|
110 | 112 | for name in self._default_names: |
|
111 | 113 | if name not in excludes: |
|
112 | 114 | d[name] = getattr(self, name) |
|
113 | 115 | return d |
|
114 | 116 | |
|
117 | def set_flags(self, **kwargs): | |
|
118 | """set my attribute flags by keyword. | |
|
119 | ||
|
120 | A View is a wrapper for the Client's apply method, but | |
|
121 | with attributes that specify keyword arguments, those attributes | |
|
122 | can be set by keyword argument with this method. | |
|
123 | ||
|
124 | Parameters | |
|
125 | ---------- | |
|
126 | ||
|
127 | block : bool | |
|
128 | whether to wait for results | |
|
129 | bound : bool | |
|
130 | whether to use the client's namespace | |
|
131 | """ | |
|
132 | for key in kwargs: | |
|
133 | if key not in self._default_names: | |
|
134 | raise KeyError("Invalid name: %r"%key) | |
|
135 | for name in ('block', 'bound'): | |
|
136 | if name in kwargs: | |
|
137 | setattr(self, name, kwargs) | |
|
138 | ||
|
139 | #---------------------------------------------------------------- | |
|
140 | # wrappers for client methods: | |
|
141 | #---------------------------------------------------------------- | |
|
115 | 142 | @sync_results |
|
116 | 143 | def spin(self): |
|
117 | 144 | """spin the client, and sync""" |
|
118 | 145 | self.client.spin() |
|
119 | 146 | |
|
120 | @property | |
|
121 | def _apply(self): | |
|
122 | return getattr(self.client, self._apply_name) | |
|
123 | ||
|
124 | 147 | @sync_results |
|
125 | 148 | @save_ids |
|
126 | 149 | def apply(self, f, *args, **kwargs): |
@@ -133,7 +156,7 b' class View(object):' | |||
|
133 | 156 | else: |
|
134 | 157 | returns actual result of f(*args, **kwargs) |
|
135 | 158 | """ |
|
136 |
return self. |
|
|
159 | return self.client.apply(f, args, kwargs, **self._defaults()) | |
|
137 | 160 | |
|
138 | 161 | @save_ids |
|
139 | 162 | def apply_async(self, f, *args, **kwargs): |
@@ -144,7 +167,7 b' class View(object):' | |||
|
144 | 167 | returns msg_id |
|
145 | 168 | """ |
|
146 | 169 | d = self._defaults('block', 'bound') |
|
147 |
return self. |
|
|
170 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | |
|
148 | 171 | |
|
149 | 172 | @spin_after |
|
150 | 173 | @save_ids |
@@ -157,7 +180,7 b' class View(object):' | |||
|
157 | 180 | returns: actual result of f(*args, **kwargs) |
|
158 | 181 | """ |
|
159 | 182 | d = self._defaults('block', 'bound') |
|
160 |
return self. |
|
|
183 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | |
|
161 | 184 | |
|
162 | 185 | @sync_results |
|
163 | 186 | @save_ids |
@@ -173,7 +196,7 b' class View(object):' | |||
|
173 | 196 | |
|
174 | 197 | """ |
|
175 | 198 | d = self._defaults('bound') |
|
176 |
return self. |
|
|
199 | return self.client.apply(f, args, kwargs, bound=True, **d) | |
|
177 | 200 | |
|
178 | 201 | @sync_results |
|
179 | 202 | @save_ids |
@@ -187,7 +210,7 b' class View(object):' | |||
|
187 | 210 | |
|
188 | 211 | """ |
|
189 | 212 | d = self._defaults('block', 'bound') |
|
190 |
return self. |
|
|
213 | return self.client.apply(f, args, kwargs, block=False, bound=True, **d) | |
|
191 | 214 | |
|
192 | 215 | @spin_after |
|
193 | 216 | @save_ids |
@@ -200,23 +223,7 b' class View(object):' | |||
|
200 | 223 | |
|
201 | 224 | """ |
|
202 | 225 | d = self._defaults('block', 'bound') |
|
203 |
return self. |
|
|
204 | ||
|
205 | @spin_after | |
|
206 | @save_ids | |
|
207 | def map(self, f, *sequences): | |
|
208 | """Parallel version of builtin `map`, using this view's engines.""" | |
|
209 | if isinstance(self.targets, int): | |
|
210 | targets = [self.targets] | |
|
211 | else: | |
|
212 | targets = self.targets | |
|
213 | pf = ParallelFunction(self.client, f, block=self.block, | |
|
214 | bound=True, targets=targets) | |
|
215 | return pf.map(*sequences) | |
|
216 | ||
|
217 | def parallel(self, bound=True, block=True): | |
|
218 | """Decorator for making a ParallelFunction""" | |
|
219 | return parallel(self.client, bound=bound, targets=self.targets, block=block) | |
|
226 | return self.client.apply(f, args, kwargs, block=True, bound=True, **d) | |
|
220 | 227 | |
|
221 | 228 | def abort(self, msg_ids=None, block=None): |
|
222 | 229 | """Abort jobs on my engines. |
@@ -240,6 +247,17 b' class View(object):' | |||
|
240 | 247 | if targets is None or targets == 'all': |
|
241 | 248 | targets = self.targets |
|
242 | 249 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) |
|
250 | ||
|
251 | #------------------------------------------------------------------- | |
|
252 | # Decorators | |
|
253 | #------------------------------------------------------------------- | |
|
254 | def parallel(self, bound=True, block=True): | |
|
255 | """Decorator for making a ParallelFunction""" | |
|
256 | return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) | |
|
257 | ||
|
258 | def remote(self, bound=True, block=True): | |
|
259 | """Decorator for making a RemoteFunction""" | |
|
260 | return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) | |
|
243 | 261 | |
|
244 | 262 | |
|
245 | 263 | |
@@ -262,6 +280,62 b' class DirectView(View):' | |||
|
262 | 280 | |
|
263 | 281 | """ |
|
264 | 282 | |
|
283 | def __init__(self, client=None, targets=None): | |
|
284 | super(DirectView, self).__init__(client=client, targets=targets) | |
|
285 | self._balanced = False | |
|
286 | ||
|
287 | @spin_after | |
|
288 | @save_ids | |
|
289 | def map(self, f, *sequences, **kwargs): | |
|
290 | """Parallel version of builtin `map`, using this View's `targets`. | |
|
291 | ||
|
292 | There will be one task per target, so work will be chunked | |
|
293 | if the sequences are longer than `targets`. | |
|
294 | ||
|
295 | Results can be iterated as they are ready, but will become available in chunks. | |
|
296 | ||
|
297 | Parameters | |
|
298 | ---------- | |
|
299 | ||
|
300 | f : callable | |
|
301 | function to be mapped | |
|
302 | *sequences: one or more sequences of matching length | |
|
303 | the sequences to be distributed and passed to `f` | |
|
304 | block : bool | |
|
305 | whether to wait for the result or not [default self.block] | |
|
306 | bound : bool | |
|
307 | whether to wait for the result or not [default self.bound] | |
|
308 | ||
|
309 | Returns | |
|
310 | ------- | |
|
311 | ||
|
312 | if block=False: | |
|
313 | AsyncMapResult | |
|
314 | An object like AsyncResult, but which reassembles the sequence of results | |
|
315 | into a single list. AsyncMapResults can be iterated through before all | |
|
316 | results are complete. | |
|
317 | else: | |
|
318 | the result of map(f,*sequences) | |
|
319 | """ | |
|
320 | ||
|
321 | block = kwargs.get('block', self.block) | |
|
322 | bound = kwargs.get('bound', self.bound) | |
|
323 | for k in kwargs.keys(): | |
|
324 | if k not in ['block', 'bound']: | |
|
325 | raise TypeError("invalid keyword arg, %r"%k) | |
|
326 | ||
|
327 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
328 | pf = ParallelFunction(self.client, f, block=block, | |
|
329 | bound=bound, targets=self.targets, balanced=False) | |
|
330 | return pf.map(*sequences) | |
|
331 | ||
|
332 | def map_async(self, f, *sequences, **kwargs): | |
|
333 | """Parallel version of builtin `map`, using this view's engines.""" | |
|
334 | if 'block' in kwargs: | |
|
335 | raise TypeError("map_async doesn't take a `block` keyword argument.") | |
|
336 | kwargs['block'] = True | |
|
337 | return self.map(f,*sequences,**kwargs) | |
|
338 | ||
|
265 | 339 | @sync_results |
|
266 | 340 | @save_ids |
|
267 | 341 | def execute(self, code, block=True): |
@@ -358,14 +432,13 b' class DirectView(View):' | |||
|
358 | 432 | |
|
359 | 433 | |
|
360 | 434 | class LoadBalancedView(View): |
|
361 |
"""An |
|
|
435 | """An load-balancing View that only executes via the Task scheduler. | |
|
362 | 436 | |
|
363 | Typically created via: | |
|
437 | Load-balanced views can be created with the client's `view` method: | |
|
364 | 438 | |
|
365 |
>>> v = client |
|
|
366 | <LoadBalancedView None> | |
|
439 | >>> v = client.view(balanced=True) | |
|
367 | 440 | |
|
368 | but can also be created with: | |
|
441 | or targets can be specified, to restrict the potential destinations: | |
|
369 | 442 | |
|
370 | 443 | >>> v = client.view([1,3],balanced=True) |
|
371 | 444 | |
@@ -374,10 +447,126 b' class LoadBalancedView(View):' | |||
|
374 | 447 | """ |
|
375 | 448 | |
|
376 | 449 | _apply_name = 'apply_balanced' |
|
377 |
_default_names = [ |
|
|
450 | _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] | |
|
378 | 451 | |
|
379 | def __init__(self, client, targets=None): | |
|
380 | super(LoadBalancedView, self).__init__(client, targets) | |
|
452 | def __init__(self, client=None, targets=None): | |
|
453 | super(LoadBalancedView, self).__init__(client=client, targets=targets) | |
|
381 | 454 | self._ntargets = 1 |
|
382 | self._apply_name = 'apply_balanced' | |
|
455 | ||
|
456 | def _validate_dependency(self, dep): | |
|
457 | """validate a dependency. | |
|
458 | ||
|
459 | For use in `set_flags`. | |
|
460 | """ | |
|
461 | if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): | |
|
462 | return True | |
|
463 | elif isinstance(dep, (list,set, tuple)): | |
|
464 | for d in dep: | |
|
465 | if not isinstance(d, str, AsyncResult): | |
|
466 | return False | |
|
467 | elif isinstance(dep, dict): | |
|
468 | if set(dep.keys()) != set(Dependency().as_dict().keys()): | |
|
469 | return False | |
|
470 | if not isinstance(dep['msg_ids'], list): | |
|
471 | return False | |
|
472 | for d in dep['msg_ids']: | |
|
473 | if not isinstance(d, str): | |
|
474 | return False | |
|
475 | else: | |
|
476 | return False | |
|
477 | ||
|
478 | def set_flags(self, **kwargs): | |
|
479 | """set my attribute flags by keyword. | |
|
480 | ||
|
481 | A View is a wrapper for the Client's apply method, but | |
|
482 | with attributes that specify keyword arguments, those attributes | |
|
483 | can be set by keyword argument with this method. | |
|
484 | ||
|
485 | Parameters | |
|
486 | ---------- | |
|
487 | ||
|
488 | block : bool | |
|
489 | whether to wait for results | |
|
490 | bound : bool | |
|
491 | whether to use the engine's namespace | |
|
492 | follow : Dependency, list, msg_id, AsyncResult | |
|
493 | the location dependencies of tasks | |
|
494 | after : Dependency, list, msg_id, AsyncResult | |
|
495 | the time dependencies of tasks | |
|
496 | timeout : int,None | |
|
497 | the timeout to be used for tasks | |
|
498 | """ | |
|
499 | ||
|
500 | super(LoadBalancedView, self).set_flags(**kwargs) | |
|
501 | for name in ('follow', 'after'): | |
|
502 | if name in kwargs: | |
|
503 | value = kwargs[name] | |
|
504 | if self._validate_dependency(value): | |
|
505 | setattr(self, name, value) | |
|
506 | else: | |
|
507 | raise ValueError("Invalid dependency: %r"%value) | |
|
508 | if 'timeout' in kwargs: | |
|
509 | t = kwargs['timeout'] | |
|
510 | if not isinstance(t, (int, long, float, None)): | |
|
511 | raise TypeError("Invalid type for timeout: %r"%type(t)) | |
|
512 | if t is not None: | |
|
513 | if t < 0: | |
|
514 | raise ValueError("Invalid timeout: %s"%t) | |
|
515 | self.timeout = t | |
|
516 | ||
|
517 | @spin_after | |
|
518 | @save_ids | |
|
519 | def map(self, f, *sequences, **kwargs): | |
|
520 | """Parallel version of builtin `map`, load-balanced by this View. | |
|
521 | ||
|
522 | Each element will be a separate task, and will be load-balanced. This | |
|
523 | lets individual elements be available for iteration as soon as they arrive. | |
|
524 | ||
|
525 | Parameters | |
|
526 | ---------- | |
|
527 | ||
|
528 | f : callable | |
|
529 | function to be mapped | |
|
530 | *sequences: one or more sequences of matching length | |
|
531 | the sequences to be distributed and passed to `f` | |
|
532 | block : bool | |
|
533 | whether to wait for the result or not [default self.block] | |
|
534 | bound : bool | |
|
535 | whether to use the engine's namespace | |
|
536 | ||
|
537 | Returns | |
|
538 | ------- | |
|
539 | ||
|
540 | if block=False: | |
|
541 | AsyncMapResult | |
|
542 | An object like AsyncResult, but which reassembles the sequence of results | |
|
543 | into a single list. AsyncMapResults can be iterated through before all | |
|
544 | results are complete. | |
|
545 | else: | |
|
546 | the result of map(f,*sequences) | |
|
547 | ||
|
548 | """ | |
|
549 | ||
|
550 | block = kwargs.get('block', self.block) | |
|
551 | bound = kwargs.get('bound', self.bound) | |
|
552 | ||
|
553 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
554 | ||
|
555 | pf = ParallelFunction(self.client, f, block=block, bound=bound, | |
|
556 | targets=self.targets, balanced=True) | |
|
557 | return pf.map(*sequences) | |
|
558 | ||
|
559 | def map_async(self, f, *sequences, **kwargs): | |
|
560 | """Parallel version of builtin `map`, using this view's engines. | |
|
561 | ||
|
562 | This is equivalent to map(...block=False) | |
|
563 | ||
|
564 | See `map` for details. | |
|
565 | """ | |
|
566 | ||
|
567 | if 'block' in kwargs: | |
|
568 | raise TypeError("map_async doesn't take a `block` keyword argument.") | |
|
569 | kwargs['block'] = True | |
|
570 | return self.map(f,*sequences,**kwargs) | |
|
571 | ||
|
383 | 572 |
@@ -37,7 +37,7 b' module and then create a :class:`.Client` instance:' | |||
|
37 | 37 | In [2]: rc = client.Client() |
|
38 | 38 | |
|
39 | 39 | This form assumes that the default connection information (stored in |
|
40 |
:file:`ipcontroller-client.json` found in |
|
|
40 | :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is | |
|
41 | 41 | accurate. If the controller was started on a remote machine, you must copy that connection |
|
42 | 42 | file to the client machine, or enter its contents as arguments to the Client constructor: |
|
43 | 43 | |
@@ -45,17 +45,17 b' file to the client machine, or enter its contents as arguments to the Client con' | |||
|
45 | 45 | |
|
46 | 46 | # If you have copied the json connector file from the controller: |
|
47 | 47 | In [2]: rc = client.Client('/path/to/ipcontroller-client.json') |
|
48 | # for a remote controller at 10.0.1.5, visible from my.server.com: | |
|
48 | # or for a remote controller at 10.0.1.5, visible from my.server.com: | |
|
49 | 49 | In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com') |
|
50 | 50 | |
|
51 | 51 | |
|
52 | To make sure there are engines connected to the controller, use can get a list | |
|
52 | To make sure there are engines connected to the controller, users can get a list | |
|
53 | 53 | of engine ids: |
|
54 | 54 | |
|
55 | 55 | .. sourcecode:: ipython |
|
56 | 56 | |
|
57 | 57 | In [3]: rc.ids |
|
58 |
Out[3]: |
|
|
58 | Out[3]: [0, 1, 2, 3] | |
|
59 | 59 | |
|
60 | 60 | Here we see that there are four engines ready to do work for us. |
|
61 | 61 | |
@@ -73,24 +73,25 b' Parallel map' | |||
|
73 | 73 | Python's builtin :func:`map` functions allows a function to be applied to a |
|
74 | 74 | sequence element-by-element. This type of code is typically trivial to |
|
75 | 75 | parallelize. In fact, since IPython's interface is all about functions anyway, |
|
76 |
you can just use the builtin :func:`map` |
|
|
76 | you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a | |
|
77 | DirectView's :meth:`map` method: | |
|
77 | 78 | |
|
78 | 79 | .. sourcecode:: ipython |
|
79 | 80 | |
|
80 | 81 | In [62]: serial_result = map(lambda x:x**10, range(32)) |
|
81 | 82 | |
|
82 | In [66]: parallel_result = rc.map(lambda x: x**10, range(32)) | |
|
83 | In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32)) | |
|
83 | 84 | |
|
84 | In [67]: serial_result==parallel_result | |
|
85 | In [67]: serial_result==parallel_result.get() | |
|
85 | 86 | Out[67]: True |
|
86 | 87 | |
|
87 | 88 | |
|
88 | 89 | .. note:: |
|
89 | 90 | |
|
90 | The client's own version of :meth:`map` or that of :class:`.DirectView` do | |
|
91 | The :class:`DirectView`'s version of :meth:`map` does | |
|
91 | 92 | not do any load balancing. For a load balanced version, use a |
|
92 | 93 | :class:`LoadBalancedView`, or a :class:`ParallelFunction` with |
|
93 | `targets=None`. | |
|
94 | `balanced=True`. | |
|
94 | 95 | |
|
95 | 96 | .. seealso:: |
|
96 | 97 | |
@@ -105,16 +106,18 b' some decorators:' | |||
|
105 | 106 | |
|
106 | 107 | .. sourcecode:: ipython |
|
107 | 108 | |
|
108 | In [10]: @rc.remote(block=True) | |
|
109 | In [10]: @rc.remote(block=True, targets=0) | |
|
109 | 110 | ....: def f(x): |
|
110 | 111 | ....: return 10.0*x**4 |
|
111 | 112 | ....: |
|
112 | 113 | |
|
113 |
In [11]: map(f, range(32)) # this is done |
|
|
114 | In [11]: map(f, range(32)) # this is done on engine 0 | |
|
114 | 115 | Out[11]: [0.0,10.0,160.0,...] |
|
115 | 116 | |
|
116 | See the docstring for the :func:`parallel` and :func:`remote` decorators for | |
|
117 | options. | |
|
117 | .. seealso:: | |
|
118 | ||
|
119 | See the docstring for the :func:`parallel` and :func:`remote` decorators for | |
|
120 | options. | |
|
118 | 121 | |
|
119 | 122 | Calling Python functions |
|
120 | 123 | ======================== |
@@ -141,7 +144,7 b' Instead, they provide the signature::' | |||
|
141 | 144 | In order to provide the nicer interface, we have :class:`View` classes, which wrap |
|
142 | 145 | :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine |
|
143 | 146 | the extra arguments. For instance, performing index-access on a client creates a |
|
144 |
:class:`. |
|
|
147 | :class:`.DirectView`. | |
|
145 | 148 | |
|
146 | 149 | .. sourcecode:: ipython |
|
147 | 150 | |
@@ -218,7 +221,7 b' unbound, unless called by the :meth:`apply_bound` method:' | |||
|
218 | 221 | |
|
219 | 222 | .. sourcecode:: ipython |
|
220 | 223 | |
|
221 |
In [9]: |
|
|
224 | In [9]: dview['b'] = 5 # assign b to 5 everywhere | |
|
222 | 225 | |
|
223 | 226 | In [10]: v0 = rc[0] |
|
224 | 227 | |
@@ -277,14 +280,14 b' local Python/IPython session:' | |||
|
277 | 280 | ...: return time.time()-tic |
|
278 | 281 | |
|
279 | 282 | # In non-blocking mode |
|
280 |
In [7]: pr = |
|
|
283 | In [7]: pr = dview.apply_async(wait, 2) | |
|
281 | 284 | |
|
282 | 285 | # Now block for the result |
|
283 | 286 | In [8]: pr.get() |
|
284 | 287 | Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] |
|
285 | 288 | |
|
286 | 289 | # Again in non-blocking mode |
|
287 |
In [9]: pr = |
|
|
290 | In [9]: pr = dview.apply_async(wait, 10) | |
|
288 | 291 | |
|
289 | 292 | # Poll to see if the result is ready |
|
290 | 293 | In [10]: pr.ready() |
@@ -321,7 +324,7 b' associated results are ready:' | |||
|
321 | 324 | In [72]: rc.block=False |
|
322 | 325 | |
|
323 | 326 | # A trivial list of AsyncResults objects |
|
324 |
In [73]: pr_list = [ |
|
|
327 | In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] | |
|
325 | 328 | |
|
326 | 329 | # Wait until all of them are done |
|
327 | 330 | In [74]: rc.barrier(pr_list) |
@@ -332,63 +335,38 b' associated results are ready:' | |||
|
332 | 335 | |
|
333 | 336 | |
|
334 | 337 | |
|
335 |
The ``block`` |
|
|
336 |
--------------------------------------------- |
|
|
337 | ||
|
338 | .. warning:: | |
|
339 | ||
|
340 | This is different now, I haven't updated this section. | |
|
341 | -MinRK | |
|
338 | The ``block`` keyword argument and attributes | |
|
339 | --------------------------------------------- | |
|
342 | 340 | |
|
343 | 341 | Most methods(like :meth:`apply`) accept |
|
344 |
``block`` a |
|
|
345 |
keyword arguments control the blocking mode |
|
|
346 | applied to. The :class:`Client` class also has :attr:`block` and | |
|
347 | :attr:`targets` attributes that control the default behavior when the keyword | |
|
348 | arguments are not provided. Thus the following logic is used for :attr:`block` | |
|
349 | and :attr:`targets`: | |
|
342 | ``block`` as a keyword argument. As we have seen above, these | |
|
343 | keyword arguments control the blocking mode . The :class:`Client` class also has | |
|
344 | a :attr:`block` attribute that controls the default behavior when the keyword | |
|
345 | argument is not provided. Thus the following logic is used for :attr:`block`: | |
|
350 | 346 | |
|
351 | 347 | * If no keyword argument is provided, the instance attributes are used. |
|
352 |
* Keyword argument, if provided override the instance attributes |
|
|
348 | * Keyword argument, if provided override the instance attributes for | |
|
349 | the duration of a single call. | |
|
353 | 350 | |
|
354 | 351 | The following examples demonstrate how to use the instance attributes: |
|
355 | 352 | |
|
356 | 353 | .. sourcecode:: ipython |
|
357 | 354 | |
|
358 | In [16]: rc.targets = [0,2] | |
|
359 | ||
|
360 | 355 | In [17]: rc.block = False |
|
361 | 356 | |
|
362 | In [18]: pr = rc.execute('a=5') | |
|
363 | ||
|
364 | In [19]: pr.r | |
|
365 | Out[19]: | |
|
366 | <Results List> | |
|
367 | [0] In [6]: a=5 | |
|
368 | [2] In [6]: a=5 | |
|
357 | In [18]: ar = rc.apply(lambda : 10, targets=[0,2]) | |
|
369 | 358 | |
|
370 | # Note targets='all' means all engines | |
|
371 | In [20]: rc.targets = 'all' | |
|
359 | In [19]: ar.get() | |
|
360 | Out[19]: [10,10] | |
|
372 | 361 | |
|
373 | 362 | In [21]: rc.block = True |
|
374 | 363 | |
|
375 | In [22]: rc.execute('b=10; print b') | |
|
376 | Out[22]: | |
|
377 | <Results List> | |
|
378 | [0] In [7]: b=10; print b | |
|
379 | [0] Out[7]: 10 | |
|
380 | ||
|
381 | [1] In [6]: b=10; print b | |
|
382 | [1] Out[6]: 10 | |
|
383 | ||
|
384 | [2] In [7]: b=10; print b | |
|
385 | [2] Out[7]: 10 | |
|
386 | ||
|
387 | [3] In [6]: b=10; print b | |
|
388 | [3] Out[6]: 10 | |
|
364 | # Note targets='all' means all engines | |
|
365 | In [22]: rc.apply(lambda : 42, targets='all') | |
|
366 | Out[22]: [42, 42, 42, 42] | |
|
389 | 367 | |
|
390 |
The :attr:`block` and :attr:`targets` instance attributes |
|
|
391 | behavior of the parallel magic commands. | |
|
368 | The :attr:`block` and :attr:`targets` instance attributes of the | |
|
369 | :class:`.DirectView` also determine the behavior of the parallel magic commands. | |
|
392 | 370 | |
|
393 | 371 | |
|
394 | 372 | Parallel magic commands |
@@ -567,9 +545,9 b' appear as a local dictionary. Underneath, this uses :meth:`push` and' | |||
|
567 | 545 | |
|
568 | 546 | In [50]: rc.block=True |
|
569 | 547 | |
|
570 |
In [51]: |
|
|
548 | In [51]: dview['a']=['foo','bar'] | |
|
571 | 549 | |
|
572 |
In [52]: |
|
|
550 | In [52]: dview['a'] | |
|
573 | 551 | Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] |
|
574 | 552 | |
|
575 | 553 | Scatter and gather |
@@ -585,13 +563,13 b' between engines, MPI should be used:' | |||
|
585 | 563 | |
|
586 | 564 | .. sourcecode:: ipython |
|
587 | 565 | |
|
588 |
In [58]: |
|
|
566 | In [58]: dview.scatter('a',range(16)) | |
|
589 | 567 | Out[58]: [None,None,None,None] |
|
590 | 568 | |
|
591 |
In [59]: |
|
|
569 | In [59]: dview['a'] | |
|
592 | 570 | Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] |
|
593 | 571 | |
|
594 |
In [60]: |
|
|
572 | In [60]: dview.gather('a') | |
|
595 | 573 | Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] |
|
596 | 574 | |
|
597 | 575 | Other things to look at |
@@ -606,14 +584,14 b' basic effect using :meth:`scatter` and :meth:`gather`:' | |||
|
606 | 584 | |
|
607 | 585 | .. sourcecode:: ipython |
|
608 | 586 | |
|
609 |
In [66]: |
|
|
587 | In [66]: dview.scatter('x',range(64)) | |
|
610 | 588 | Out[66]: [None,None,None,None] |
|
611 | 589 | |
|
612 | 590 | In [67]: px y = [i**10 for i in x] |
|
613 | 591 | Parallel execution on engines: [0, 1, 2, 3] |
|
614 | 592 | Out[67]: |
|
615 | 593 | |
|
616 |
In [68]: y = |
|
|
594 | In [68]: y = dview.gather('y') | |
|
617 | 595 | |
|
618 | 596 | In [69]: print y |
|
619 | 597 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] |
@@ -41,8 +41,8 b' module and then create a :class:`.Client` instance:' | |||
|
41 | 41 | |
|
42 | 42 | In [2]: rc = client.Client() |
|
43 | 43 | |
|
44 |
In [3]: lview = rc |
|
|
45 |
Out[3]: <LoadBalancedView |
|
|
44 | In [3]: lview = rc.view(balanced=True) | |
|
45 | Out[3]: <LoadBalancedView None> | |
|
46 | 46 | |
|
47 | 47 | |
|
48 | 48 | This form assumes that the controller was started on localhost with default |
@@ -66,7 +66,7 b' objects, but *in parallel*. Like the multiengine interface, these can be' | |||
|
66 | 66 | implemented via the task interface. The exact same tools can perform these |
|
67 | 67 | actions in load-balanced ways as well as multiplexed ways: a parallel version |
|
68 | 68 | of :func:`map` and :func:`@parallel` function decorator. If one specifies the |
|
69 |
argument ` |
|
|
69 | argument `balanced=True`, then they are dynamically load balanced. Thus, if the | |
|
70 | 70 | execution time per item varies significantly, you should use the versions in |
|
71 | 71 | the task interface. |
|
72 | 72 | |
@@ -80,7 +80,7 b' for the ``None`` element:' | |||
|
80 | 80 | |
|
81 | 81 | In [63]: serial_result = map(lambda x:x**10, range(32)) |
|
82 | 82 | |
|
83 |
In [64]: parallel_result = |
|
|
83 | In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True) | |
|
84 | 84 | |
|
85 | 85 | In [65]: serial_result==parallel_result |
|
86 | 86 | Out[65]: True |
@@ -258,13 +258,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||
|
258 | 258 | |
|
259 | 259 | In [14]: client.block=False |
|
260 | 260 | |
|
261 |
In [15]: ar = client.apply(f, args, kwargs, |
|
|
261 | In [15]: ar = client.apply(f, args, kwargs, balanced=True) | |
|
262 | 262 | |
|
263 |
In [16]: ar2 = client.apply(f2, |
|
|
263 | In [16]: ar2 = client.apply(f2, balanced=True) | |
|
264 | 264 | |
|
265 | In [17]: ar3 = client.apply(f3, after=[ar,ar2]) | |
|
265 | In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True) | |
|
266 | 266 | |
|
267 | In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5) | |
|
267 | In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True) | |
|
268 | 268 | |
|
269 | 269 | |
|
270 | 270 | .. seealso:: |
@@ -383,7 +383,7 b' The following is an overview of how to use these classes together:' | |||
|
383 | 383 | 1. Create a :class:`Client`. |
|
384 | 384 | 2. Define some functions to be run as tasks |
|
385 | 385 | 3. Submit your tasks to using the :meth:`apply` method of your |
|
386 |
:class:`Client` instance, specifying ` |
|
|
386 | :class:`Client` instance, specifying `balanced=True`. This signals | |
|
387 | 387 | the :class:`Client` to entrust the Scheduler with assigning tasks to engines. |
|
388 | 388 | 4. Use :meth:`Client.get_results` to get the results of the |
|
389 | 389 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
General Comments 0
You need to be logged in to leave comments.
Login now