Show More
@@ -147,9 +147,9 b' class UnSerializeIt(UnSerialized):' | |||||
147 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': |
|
147 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': | |
148 | result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) |
|
148 | result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) | |
149 | result.shape = self.serialized.metadata['shape'] |
|
149 | result.shape = self.serialized.metadata['shape'] | |
150 |
# |
|
150 | # numpy arrays with frombuffer are read-only. We are working with | |
151 | # the numpy folks to address this issue. |
|
151 | # the numpy folks to address this issue. | |
152 | result = result.copy() |
|
152 | # result = result.copy() | |
153 | elif typeDescriptor == 'pickle': |
|
153 | elif typeDescriptor == 'pickle': | |
154 | result = pickle.loads(self.serialized.getData()) |
|
154 | result = pickle.loads(self.serialized.getData()) | |
155 | elif typeDescriptor in ('bytes', 'buffer'): |
|
155 | elif typeDescriptor in ('bytes', 'buffer'): |
@@ -21,7 +21,7 b' from pprint import pprint' | |||||
21 | pjoin = os.path.join |
|
21 | pjoin = os.path.join | |
22 |
|
22 | |||
23 | import zmq |
|
23 | import zmq | |
24 | from zmq.eventloop import ioloop, zmqstream |
|
24 | # from zmq.eventloop import ioloop, zmqstream | |
25 |
|
25 | |||
26 | from IPython.utils.path import get_ipython_dir |
|
26 | from IPython.utils.path import get_ipython_dir | |
27 | from IPython.external.decorator import decorator |
|
27 | from IPython.external.decorator import decorator | |
@@ -203,6 +203,7 b' class Client(object):' | |||||
203 |
|
203 | |||
204 | Attributes |
|
204 | Attributes | |
205 | ---------- |
|
205 | ---------- | |
|
206 | ||||
206 | ids : set of int engine IDs |
|
207 | ids : set of int engine IDs | |
207 | requesting the ids attribute always synchronizes |
|
208 | requesting the ids attribute always synchronizes | |
208 | the registration state. To request ids without synchronization, |
|
209 | the registration state. To request ids without synchronization, | |
@@ -225,17 +226,23 b' class Client(object):' | |||||
225 |
|
226 | |||
226 | Methods |
|
227 | Methods | |
227 | ------- |
|
228 | ------- | |
228 | spin : flushes incoming results and registration state changes |
|
|||
229 | control methods spin, and requesting `ids` also ensures up to date |
|
|||
230 |
|
||||
231 | barrier : wait on one or more msg_ids |
|
|||
232 |
|
229 | |||
233 | execution methods: apply/apply_bound/apply_to/apply_bound |
|
230 | spin | |
|
231 | flushes incoming results and registration state changes | |||
|
232 | control methods spin, and requesting `ids` also ensures up to date | |||
|
233 | ||||
|
234 | barrier | |||
|
235 | wait on one or more msg_ids | |||
|
236 | ||||
|
237 | execution methods | |||
|
238 | apply | |||
234 | legacy: execute, run |
|
239 | legacy: execute, run | |
235 |
|
240 | |||
236 | query methods: queue_status, get_result, purge |
|
241 | query methods | |
|
242 | queue_status, get_result, purge | |||
237 |
|
243 | |||
238 |
control methods |
|
244 | control methods | |
|
245 | abort, shutdown | |||
239 |
|
246 | |||
240 | """ |
|
247 | """ | |
241 |
|
248 | |||
@@ -265,7 +272,6 b' class Client(object):' | |||||
265 | if context is None: |
|
272 | if context is None: | |
266 | context = zmq.Context() |
|
273 | context = zmq.Context() | |
267 | self.context = context |
|
274 | self.context = context | |
268 | self.targets = 'all' |
|
|||
269 |
|
275 | |||
270 | self._setup_cluster_dir(profile, cluster_dir, ipython_dir) |
|
276 | self._setup_cluster_dir(profile, cluster_dir, ipython_dir) | |
271 | if self._cd is not None: |
|
277 | if self._cd is not None: | |
@@ -634,42 +640,18 b' class Client(object):' | |||||
634 | #-------------------------------------------------------------------------- |
|
640 | #-------------------------------------------------------------------------- | |
635 |
|
641 | |||
636 | def __getitem__(self, key): |
|
642 | def __getitem__(self, key): | |
637 |
""" |
|
643 | """index access returns DirectView multiplexer objects | |
638 | if key is None, a LoadBalancedView.""" |
|
|||
639 | if key is None: |
|
|||
640 | return LoadBalancedView(self) |
|
|||
641 | if isinstance(key, int): |
|
|||
642 | if key not in self.ids: |
|
|||
643 | raise IndexError("No such engine: %i"%key) |
|
|||
644 | return DirectView(self, key) |
|
|||
645 |
|
||||
646 | if isinstance(key, slice): |
|
|||
647 | indices = range(len(self.ids))[key] |
|
|||
648 | ids = sorted(self._ids) |
|
|||
649 | key = [ ids[i] for i in indices ] |
|
|||
650 | # newkeys = sorted(self._ids)[thekeys[k]] |
|
|||
651 |
|
|
644 | ||
652 | if isinstance(key, (tuple, list, xrange)): |
|
645 | Must be int, slice, or list/tuple/xrange of ints""" | |
653 | _,targets = self._build_targets(list(key)) |
|
646 | if not isinstance(key, (int, slice, tuple, list, xrange)): | |
654 | return DirectView(self, targets) |
|
647 | raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | |
655 | else: |
|
648 | else: | |
656 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) |
|
649 | return self.view(key, balanced=False) | |
657 |
|
650 | |||
658 | #-------------------------------------------------------------------------- |
|
651 | #-------------------------------------------------------------------------- | |
659 | # Begin public methods |
|
652 | # Begin public methods | |
660 | #-------------------------------------------------------------------------- |
|
653 | #-------------------------------------------------------------------------- | |
661 |
|
654 | |||
662 | @property |
|
|||
663 | def remote(self): |
|
|||
664 | """property for convenient RemoteFunction generation. |
|
|||
665 |
|
||||
666 | >>> @client.remote |
|
|||
667 | ... def getpid(): |
|
|||
668 | import os |
|
|||
669 | return os.getpid() |
|
|||
670 | """ |
|
|||
671 | return remote(self, block=self.block) |
|
|||
672 |
|
||||
673 | def spin(self): |
|
655 | def spin(self): | |
674 | """Flush any registration notifications and execution results |
|
656 | """Flush any registration notifications and execution results | |
675 | waiting in the ZMQ queue. |
|
657 | waiting in the ZMQ queue. | |
@@ -690,6 +672,7 b' class Client(object):' | |||||
690 |
|
672 | |||
691 | Parameters |
|
673 | Parameters | |
692 | ---------- |
|
674 | ---------- | |
|
675 | ||||
693 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects |
|
676 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects | |
694 | ints are indices to self.history |
|
677 | ints are indices to self.history | |
695 | strs are msg_ids |
|
678 | strs are msg_ids | |
@@ -700,6 +683,7 b' class Client(object):' | |||||
700 |
|
683 | |||
701 | Returns |
|
684 | Returns | |
702 | ------- |
|
685 | ------- | |
|
686 | ||||
703 | True : when all msg_ids are done |
|
687 | True : when all msg_ids are done | |
704 | False : timeout reached, some msg_ids still outstanding |
|
688 | False : timeout reached, some msg_ids still outstanding | |
705 | """ |
|
689 | """ | |
@@ -815,6 +799,7 b' class Client(object):' | |||||
815 |
|
799 | |||
816 | Parameters |
|
800 | Parameters | |
817 | ---------- |
|
801 | ---------- | |
|
802 | ||||
818 | code : str |
|
803 | code : str | |
819 | the code string to be executed |
|
804 | the code string to be executed | |
820 | targets : int/str/list of ints/strs |
|
805 | targets : int/str/list of ints/strs | |
@@ -824,7 +809,7 b' class Client(object):' | |||||
824 | whether or not to wait until done to return |
|
809 | whether or not to wait until done to return | |
825 | default: self.block |
|
810 | default: self.block | |
826 | """ |
|
811 | """ | |
827 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) |
|
812 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False) | |
828 | return result |
|
813 | return result | |
829 |
|
814 | |||
830 | def run(self, filename, targets='all', block=None): |
|
815 | def run(self, filename, targets='all', block=None): | |
@@ -834,6 +819,7 b' class Client(object):' | |||||
834 |
|
819 | |||
835 | Parameters |
|
820 | Parameters | |
836 | ---------- |
|
821 | ---------- | |
|
822 | ||||
837 | filename : str |
|
823 | filename : str | |
838 | The path to the file |
|
824 | The path to the file | |
839 | targets : int/str/list of ints/strs |
|
825 | targets : int/str/list of ints/strs | |
@@ -868,7 +854,8 b' class Client(object):' | |||||
868 | return list(Dependency(dep)) |
|
854 | return list(Dependency(dep)) | |
869 |
|
855 | |||
870 | @defaultblock |
|
856 | @defaultblock | |
871 |
def apply(self, f, args=None, kwargs=None, bound=True, block=None, |
|
857 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, | |
|
858 | targets=None, balanced=None, | |||
872 | after=None, follow=None, timeout=None): |
|
859 | after=None, follow=None, timeout=None): | |
873 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. |
|
860 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | |
874 |
|
861 | |||
@@ -905,11 +892,34 b' class Client(object):' | |||||
905 | if int: |
|
892 | if int: | |
906 | Run on single engine |
|
893 | Run on single engine | |
907 |
|
894 | |||
908 | after,follow,timeout only used in `apply_balanced`. See that docstring |
|
895 | balanced : bool, default None | |
909 | for details. |
|
896 | whether to load-balance. This will default to True | |
|
897 | if targets is unspecified, or False if targets is specified. | |||
|
898 | ||||
|
899 | The following arguments are only used when balanced is True: | |||
|
900 | after : Dependency or collection of msg_ids | |||
|
901 | Only for load-balanced execution (targets=None) | |||
|
902 | Specify a list of msg_ids as a time-based dependency. | |||
|
903 | This job will only be run *after* the dependencies | |||
|
904 | have been met. | |||
|
905 | ||||
|
906 | follow : Dependency or collection of msg_ids | |||
|
907 | Only for load-balanced execution (targets=None) | |||
|
908 | Specify a list of msg_ids as a location-based dependency. | |||
|
909 | This job will only be run on an engine where this dependency | |||
|
910 | is met. | |||
|
911 | ||||
|
912 | timeout : float/int or None | |||
|
913 | Only for load-balanced execution (targets=None) | |||
|
914 | Specify an amount of time (in seconds) for the scheduler to | |||
|
915 | wait for dependencies to be met before failing with a | |||
|
916 | DependencyTimeout. | |||
|
917 | ||||
|
918 | after,follow,timeout only used if `balanced=True`. | |||
910 |
|
919 | |||
911 | Returns |
|
920 | Returns | |
912 | ------- |
|
921 | ------- | |
|
922 | ||||
913 | if block is False: |
|
923 | if block is False: | |
914 | return AsyncResult wrapping msg_ids |
|
924 | return AsyncResult wrapping msg_ids | |
915 | output of AsyncResult.get() is identical to that of `apply(...block=True)` |
|
925 | output of AsyncResult.get() is identical to that of `apply(...block=True)` | |
@@ -921,10 +931,21 b' class Client(object):' | |||||
921 | """ |
|
931 | """ | |
922 |
|
932 | |||
923 | # defaults: |
|
933 | # defaults: | |
924 | block = block if block is not None else self.block |
|
|||
925 | args = args if args is not None else [] |
|
934 | args = args if args is not None else [] | |
926 | kwargs = kwargs if kwargs is not None else {} |
|
935 | kwargs = kwargs if kwargs is not None else {} | |
927 |
|
936 | |||
|
937 | if balanced is None: | |||
|
938 | if targets is None: | |||
|
939 | # default to balanced if targets unspecified | |||
|
940 | balanced = True | |||
|
941 | else: | |||
|
942 | # otherwise default to multiplexing | |||
|
943 | balanced = False | |||
|
944 | ||||
|
945 | if targets is None and balanced is False: | |||
|
946 | # default to all if *not* balanced, and targets is unspecified | |||
|
947 | targets = 'all' | |||
|
948 | ||||
928 | # enforce types of f,args,kwrags |
|
949 | # enforce types of f,args,kwrags | |
929 | if not callable(f): |
|
950 | if not callable(f): | |
930 | raise TypeError("f must be callable, not %s"%type(f)) |
|
951 | raise TypeError("f must be callable, not %s"%type(f)) | |
@@ -935,80 +956,27 b' class Client(object):' | |||||
935 |
|
956 | |||
936 | options = dict(bound=bound, block=block, targets=targets) |
|
957 | options = dict(bound=bound, block=block, targets=targets) | |
937 |
|
958 | |||
938 | if targets is None: |
|
959 | if balanced: | |
939 | return self.apply_balanced(f, args, kwargs, timeout=timeout, |
|
960 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | |
940 | after=after, follow=follow, **options) |
|
961 | after=after, follow=follow, **options) | |
941 | else: |
|
962 | elif follow or after or timeout: | |
942 | if follow or after or timeout: |
|
963 | msg = "follow, after, and timeout args are only used for" | |
943 | msg = "follow, after, and timeout args are only used for load-balanced" |
|
964 | msg += " load-balanced execution." | |
944 | msg += "execution." |
|
|||
945 | raise ValueError(msg) |
|
965 | raise ValueError(msg) | |
|
966 | else: | |||
946 | return self._apply_direct(f, args, kwargs, **options) |
|
967 | return self._apply_direct(f, args, kwargs, **options) | |
947 |
|
968 | |||
948 | @defaultblock |
|
969 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, | |
949 | def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, |
|
|||
950 | after=None, follow=None, timeout=None): |
|
970 | after=None, follow=None, timeout=None): | |
951 | """call f(*args, **kwargs) remotely in a load-balanced manner. |
|
971 | """call f(*args, **kwargs) remotely in a load-balanced manner. | |
952 |
|
972 | |||
953 | Parameters |
|
973 | This is a private method, see `apply` for details. | |
954 | ---------- |
|
974 | Not to be called directly! | |
955 |
|
||||
956 | f : function |
|
|||
957 | The fuction to be called remotely |
|
|||
958 | args : tuple/list |
|
|||
959 | The positional arguments passed to `f` |
|
|||
960 | kwargs : dict |
|
|||
961 | The keyword arguments passed to `f` |
|
|||
962 | bound : bool (default: True) |
|
|||
963 | Whether to execute in the Engine(s) namespace, or in a clean |
|
|||
964 | namespace not affecting the engine. |
|
|||
965 | block : bool (default: self.block) |
|
|||
966 | Whether to wait for the result, or return immediately. |
|
|||
967 | False: |
|
|||
968 | returns AsyncResult |
|
|||
969 | True: |
|
|||
970 | returns actual result(s) of f(*args, **kwargs) |
|
|||
971 | if multiple targets: |
|
|||
972 | list of results, matching `targets` |
|
|||
973 | targets : int,list of ints, 'all', None |
|
|||
974 | Specify the destination of the job. |
|
|||
975 | if None: |
|
|||
976 | Submit via Task queue for load-balancing. |
|
|||
977 | if 'all': |
|
|||
978 | Run on all active engines |
|
|||
979 | if list: |
|
|||
980 | Run on each specified engine |
|
|||
981 | if int: |
|
|||
982 | Run on single engine |
|
|||
983 |
|
||||
984 | after : Dependency or collection of msg_ids |
|
|||
985 | Only for load-balanced execution (targets=None) |
|
|||
986 | Specify a list of msg_ids as a time-based dependency. |
|
|||
987 | This job will only be run *after* the dependencies |
|
|||
988 | have been met. |
|
|||
989 |
|
||||
990 | follow : Dependency or collection of msg_ids |
|
|||
991 | Only for load-balanced execution (targets=None) |
|
|||
992 | Specify a list of msg_ids as a location-based dependency. |
|
|||
993 | This job will only be run on an engine where this dependency |
|
|||
994 | is met. |
|
|||
995 |
|
||||
996 | timeout : float/int or None |
|
|||
997 | Only for load-balanced execution (targets=None) |
|
|||
998 | Specify an amount of time (in seconds) for the scheduler to |
|
|||
999 | wait for dependencies to be met before failing with a |
|
|||
1000 | DependencyTimeout. |
|
|||
1001 |
|
||||
1002 | Returns |
|
|||
1003 | ------- |
|
|||
1004 | if block is False: |
|
|||
1005 | return AsyncResult wrapping msg_id |
|
|||
1006 | output of AsyncResult.get() is identical to that of `apply(...block=True)` |
|
|||
1007 | else: |
|
|||
1008 | wait for, and return actual result of `f(*args, **kwargs)` |
|
|||
1009 |
|
||||
1010 | """ |
|
975 | """ | |
1011 |
|
976 | |||
|
977 | for kwarg in (bound, block, targets): | |||
|
978 | assert kwarg is not None, "kwarg %r must be specified!"%kwarg | |||
|
979 | ||||
1012 | if self._task_socket is None: |
|
980 | if self._task_socket is None: | |
1013 | msg = "Task farming is disabled" |
|
981 | msg = "Task farming is disabled" | |
1014 | if self._task_scheme == 'pure': |
|
982 | if self._task_scheme == 'pure': | |
@@ -1025,7 +993,6 b' class Client(object):' | |||||
1025 | if isinstance(f, dependent): |
|
993 | if isinstance(f, dependent): | |
1026 | # soft warn on functional dependencies |
|
994 | # soft warn on functional dependencies | |
1027 | warnings.warn(msg, RuntimeWarning) |
|
995 | warnings.warn(msg, RuntimeWarning) | |
1028 |
|
||||
1029 |
|
996 | |||
1030 | # defaults: |
|
997 | # defaults: | |
1031 | args = args if args is not None else [] |
|
998 | args = args if args is not None else [] | |
@@ -1036,14 +1003,6 b' class Client(object):' | |||||
1036 | else: |
|
1003 | else: | |
1037 | idents = [] |
|
1004 | idents = [] | |
1038 |
|
1005 | |||
1039 | # enforce types of f,args,kwrags |
|
|||
1040 | if not callable(f): |
|
|||
1041 | raise TypeError("f must be callable, not %s"%type(f)) |
|
|||
1042 | if not isinstance(args, (tuple, list)): |
|
|||
1043 | raise TypeError("args must be tuple or list, not %s"%type(args)) |
|
|||
1044 | if not isinstance(kwargs, dict): |
|
|||
1045 | raise TypeError("kwargs must be dict, not %s"%type(kwargs)) |
|
|||
1046 |
|
||||
1047 | after = self._build_dependency(after) |
|
1006 | after = self._build_dependency(after) | |
1048 | follow = self._build_dependency(follow) |
|
1007 | follow = self._build_dependency(follow) | |
1049 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) |
|
1008 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) | |
@@ -1064,13 +1023,17 b' class Client(object):' | |||||
1064 | else: |
|
1023 | else: | |
1065 | return ar |
|
1024 | return ar | |
1066 |
|
1025 | |||
1067 |
def _apply_direct(self, f, args, kwargs, bound= |
|
1026 | def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None): | |
1068 | """Then underlying method for applying functions to specific engines |
|
1027 | """Then underlying method for applying functions to specific engines | |
1069 | via the MUX queue. |
|
1028 | via the MUX queue. | |
1070 |
|
1029 | |||
|
1030 | This is a private method, see `apply` for details. | |||
1071 | Not to be called directly! |
|
1031 | Not to be called directly! | |
1072 | """ |
|
1032 | """ | |
1073 |
|
1033 | |||
|
1034 | for kwarg in (bound, block, targets): | |||
|
1035 | assert kwarg is not None, "kwarg %r must be specified!"%kwarg | |||
|
1036 | ||||
1074 | idents,targets = self._build_targets(targets) |
|
1037 | idents,targets = self._build_targets(targets) | |
1075 |
|
1038 | |||
1076 | subheader = {} |
|
1039 | subheader = {} | |
@@ -1095,103 +1058,46 b' class Client(object):' | |||||
1095 | return ar |
|
1058 | return ar | |
1096 |
|
1059 | |||
1097 | #-------------------------------------------------------------------------- |
|
1060 | #-------------------------------------------------------------------------- | |
1098 |
# |
|
1061 | # decorators | |
1099 | #-------------------------------------------------------------------------- |
|
1062 | #-------------------------------------------------------------------------- | |
1100 |
|
1063 | |||
1101 | def map(self, f, *sequences, **kwargs): |
|
1064 | @defaultblock | |
1102 | """Parallel version of builtin `map`, using all our engines. |
|
1065 | def parallel(self, bound=True, targets='all', block=None): | |
1103 |
|
||||
1104 | `block` and `targets` can be passed as keyword arguments only. |
|
|||
1105 |
|
||||
1106 | There will be one task per target, so work will be chunked |
|
|||
1107 | if the sequences are longer than `targets`. |
|
|||
1108 |
|
||||
1109 | Results can be iterated as they are ready, but will become available in chunks. |
|
|||
1110 |
|
||||
1111 | Parameters |
|
|||
1112 | ---------- |
|
|||
1113 |
|
||||
1114 | f : callable |
|
|||
1115 | function to be mapped |
|
|||
1116 | *sequences: one or more sequences of matching length |
|
|||
1117 | the sequences to be distributed and passed to `f` |
|
|||
1118 | block : bool |
|
|||
1119 | whether to wait for the result or not [default self.block] |
|
|||
1120 | targets : valid targets |
|
|||
1121 | targets to be used [default self.targets] |
|
|||
1122 |
|
||||
1123 | Returns |
|
|||
1124 | ------- |
|
|||
1125 |
|
||||
1126 | if block=False: |
|
|||
1127 | AsyncMapResult |
|
|||
1128 | An object like AsyncResult, but which reassembles the sequence of results |
|
|||
1129 | into a single list. AsyncMapResults can be iterated through before all |
|
|||
1130 | results are complete. |
|
|||
1131 | else: |
|
|||
1132 | the result of map(f,*sequences) |
|
|||
1133 |
|
||||
1134 | """ |
|
|||
1135 | block = kwargs.get('block', self.block) |
|
|||
1136 | targets = kwargs.get('targets', self.targets) |
|
|||
1137 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
|||
1138 | pf = ParallelFunction(self, f, block=block, |
|
|||
1139 | bound=True, targets=targets) |
|
|||
1140 | return pf.map(*sequences) |
|
|||
1141 |
|
||||
1142 | def imap(self, f, *sequences, **kwargs): |
|
|||
1143 | """Parallel version of builtin `itertools.imap`, load-balanced across all engines. |
|
|||
1144 |
|
||||
1145 | Each element will be a separate task, and will be load-balanced. This |
|
|||
1146 | lets individual elements be ready for iteration as soon as they come. |
|
|||
1147 |
|
||||
1148 | Parameters |
|
|||
1149 | ---------- |
|
|||
1150 |
|
||||
1151 | f : callable |
|
|||
1152 | function to be mapped |
|
|||
1153 | *sequences: one or more sequences of matching length |
|
|||
1154 | the sequences to be distributed and passed to `f` |
|
|||
1155 | block : bool |
|
|||
1156 | whether to wait for the result or not [default self.block] |
|
|||
1157 |
|
||||
1158 | Returns |
|
|||
1159 | ------- |
|
|||
1160 |
|
||||
1161 | if block=False: |
|
|||
1162 | AsyncMapResult |
|
|||
1163 | An object like AsyncResult, but which reassembles the sequence of results |
|
|||
1164 | into a single list. AsyncMapResults can be iterated through before all |
|
|||
1165 | results are complete. |
|
|||
1166 | else: |
|
|||
1167 | the result of map(f,*sequences) |
|
|||
1168 |
|
||||
1169 | """ |
|
|||
1170 |
|
||||
1171 | block = kwargs.get('block', self.block) |
|
|||
1172 |
|
||||
1173 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
|||
1174 |
|
||||
1175 | pf = ParallelFunction(self, f, block=self.block, |
|
|||
1176 | bound=True, targets=None) |
|
|||
1177 | return pf.map(*sequences) |
|
|||
1178 |
|
||||
1179 | def parallel(self, bound=True, targets='all', block=True): |
|
|||
1180 | """Decorator for making a ParallelFunction.""" |
|
1066 | """Decorator for making a ParallelFunction.""" | |
1181 | return parallel(self, bound=bound, targets=targets, block=block) |
|
1067 | return parallel(self, bound=bound, targets=targets, block=block) | |
1182 |
|
1068 | |||
1183 | def remote(self, bound=True, targets='all', block=True): |
|
1069 | @defaultblock | |
|
1070 | def remote(self, bound=True, targets='all', block=None): | |||
1184 | """Decorator for making a RemoteFunction.""" |
|
1071 | """Decorator for making a RemoteFunction.""" | |
1185 | return remote(self, bound=bound, targets=targets, block=block) |
|
1072 | return remote(self, bound=bound, targets=targets, block=block) | |
1186 |
|
1073 | |||
1187 | def view(self, targets=None, balanced=False): |
|
1074 | def view(self, targets=None, balanced=False): | |
1188 | """Method for constructing View objects""" |
|
1075 | """Method for constructing View objects""" | |
1189 | if not balanced: |
|
1076 | if targets is None: | |
1190 |
if |
|
1077 | if balanced: | |
|
1078 | return LoadBalancedView(client=self) | |||
|
1079 | else: | |||
1191 | targets = slice(None) |
|
1080 | targets = slice(None) | |
1192 | return self[targets] |
|
1081 | ||
|
1082 | if balanced: | |||
|
1083 | view_class = LoadBalancedView | |||
|
1084 | else: | |||
|
1085 | view_class = DirectView | |||
|
1086 | if isinstance(targets, int): | |||
|
1087 | if targets not in self.ids: | |||
|
1088 | raise IndexError("No such engine: %i"%targets) | |||
|
1089 | return view_class(client=self, targets=targets) | |||
|
1090 | ||||
|
1091 | if isinstance(targets, slice): | |||
|
1092 | indices = range(len(self.ids))[targets] | |||
|
1093 | ids = sorted(self._ids) | |||
|
1094 | targets = [ ids[i] for i in indices ] | |||
|
1095 | ||||
|
1096 | if isinstance(targets, (tuple, list, xrange)): | |||
|
1097 | _,targets = self._build_targets(list(targets)) | |||
|
1098 | return view_class(client=self, targets=targets) | |||
1193 | else: |
|
1099 | else: | |
1194 | return LoadBalancedView(self, targets) |
|
1100 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | |
1195 |
|
1101 | |||
1196 | #-------------------------------------------------------------------------- |
|
1102 | #-------------------------------------------------------------------------- | |
1197 | # Data movement |
|
1103 | # Data movement | |
@@ -1202,7 +1108,7 b' class Client(object):' | |||||
1202 | """Push the contents of `ns` into the namespace on `target`""" |
|
1108 | """Push the contents of `ns` into the namespace on `target`""" | |
1203 | if not isinstance(ns, dict): |
|
1109 | if not isinstance(ns, dict): | |
1204 | raise TypeError("Must be a dict, not %s"%type(ns)) |
|
1110 | raise TypeError("Must be a dict, not %s"%type(ns)) | |
1205 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True) |
|
1111 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False) | |
1206 | return result |
|
1112 | return result | |
1207 |
|
1113 | |||
1208 | @defaultblock |
|
1114 | @defaultblock | |
@@ -1214,14 +1120,14 b' class Client(object):' | |||||
1214 | for key in keys: |
|
1120 | for key in keys: | |
1215 | if not isinstance(key, str): |
|
1121 | if not isinstance(key, str): | |
1216 | raise TypeError |
|
1122 | raise TypeError | |
1217 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) |
|
1123 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | |
1218 | return result |
|
1124 | return result | |
1219 |
|
1125 | |||
|
1126 | @defaultblock | |||
1220 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): |
|
1127 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): | |
1221 | """ |
|
1128 | """ | |
1222 | Partition a Python sequence and send the partitions to a set of engines. |
|
1129 | Partition a Python sequence and send the partitions to a set of engines. | |
1223 | """ |
|
1130 | """ | |
1224 | block = block if block is not None else self.block |
|
|||
1225 | targets = self._build_targets(targets)[-1] |
|
1131 | targets = self._build_targets(targets)[-1] | |
1226 | mapObject = Map.dists[dist]() |
|
1132 | mapObject = Map.dists[dist]() | |
1227 | nparts = len(targets) |
|
1133 | nparts = len(targets) | |
@@ -1239,11 +1145,11 b' class Client(object):' | |||||
1239 | else: |
|
1145 | else: | |
1240 | return r |
|
1146 | return r | |
1241 |
|
1147 | |||
|
1148 | @defaultblock | |||
1242 | def gather(self, key, dist='b', targets='all', block=None): |
|
1149 | def gather(self, key, dist='b', targets='all', block=None): | |
1243 | """ |
|
1150 | """ | |
1244 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
1151 | Gather a partitioned sequence on a set of engines as a single local seq. | |
1245 | """ |
|
1152 | """ | |
1246 | block = block if block is not None else self.block |
|
|||
1247 |
|
1153 | |||
1248 | targets = self._build_targets(targets)[-1] |
|
1154 | targets = self._build_targets(targets)[-1] | |
1249 | mapObject = Map.dists[dist]() |
|
1155 | mapObject = Map.dists[dist]() | |
@@ -1267,6 +1173,7 b' class Client(object):' | |||||
1267 |
|
1173 | |||
1268 | Parameters |
|
1174 | Parameters | |
1269 | ---------- |
|
1175 | ---------- | |
|
1176 | ||||
1270 | msg_ids : list of ints or msg_ids |
|
1177 | msg_ids : list of ints or msg_ids | |
1271 | if int: |
|
1178 | if int: | |
1272 | Passed as index to self.history for convenience. |
|
1179 | Passed as index to self.history for convenience. | |
@@ -1351,13 +1258,14 b' class Client(object):' | |||||
1351 | return content |
|
1258 | return content | |
1352 |
|
1259 | |||
1353 | @spinfirst |
|
1260 | @spinfirst | |
1354 |
def queue_status(self, targets= |
|
1261 | def queue_status(self, targets='all', verbose=False): | |
1355 | """Fetch the status of engine queues. |
|
1262 | """Fetch the status of engine queues. | |
1356 |
|
1263 | |||
1357 | Parameters |
|
1264 | Parameters | |
1358 | ---------- |
|
1265 | ---------- | |
|
1266 | ||||
1359 | targets : int/str/list of ints/strs |
|
1267 | targets : int/str/list of ints/strs | |
1360 |
the engines |
|
1268 | the engines whose states are to be queried. | |
1361 | default : all |
|
1269 | default : all | |
1362 | verbose : bool |
|
1270 | verbose : bool | |
1363 | Whether to return lengths only, or lists of ids for each element |
|
1271 | Whether to return lengths only, or lists of ids for each element | |
@@ -1383,6 +1291,7 b' class Client(object):' | |||||
1383 |
|
1291 | |||
1384 | Parameters |
|
1292 | Parameters | |
1385 | ---------- |
|
1293 | ---------- | |
|
1294 | ||||
1386 | msg_ids : str or list of strs |
|
1295 | msg_ids : str or list of strs | |
1387 | the msg_ids whose results should be forgotten. |
|
1296 | the msg_ids whose results should be forgotten. | |
1388 | targets : int/str/list of ints/strs |
|
1297 | targets : int/str/list of ints/strs | |
@@ -1404,59 +1313,6 b' class Client(object):' | |||||
1404 | if content['status'] != 'ok': |
|
1313 | if content['status'] != 'ok': | |
1405 | raise ss.unwrap_exception(content) |
|
1314 | raise ss.unwrap_exception(content) | |
1406 |
|
1315 | |||
1407 | #---------------------------------------- |
|
|||
1408 | # activate for %px,%autopx magics |
|
|||
1409 | #---------------------------------------- |
|
|||
1410 | def activate(self): |
|
|||
1411 | """Make this `View` active for parallel magic commands. |
|
|||
1412 |
|
||||
1413 | IPython has a magic command syntax to work with `MultiEngineClient` objects. |
|
|||
1414 | In a given IPython session there is a single active one. While |
|
|||
1415 | there can be many `Views` created and used by the user, |
|
|||
1416 | there is only one active one. The active `View` is used whenever |
|
|||
1417 | the magic commands %px and %autopx are used. |
|
|||
1418 |
|
||||
1419 | The activate() method is called on a given `View` to make it |
|
|||
1420 | active. Once this has been done, the magic commands can be used. |
|
|||
1421 | """ |
|
|||
1422 |
|
||||
1423 | try: |
|
|||
1424 | # This is injected into __builtins__. |
|
|||
1425 | ip = get_ipython() |
|
|||
1426 | except NameError: |
|
|||
1427 | print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." |
|
|||
1428 | else: |
|
|||
1429 | pmagic = ip.plugin_manager.get_plugin('parallelmagic') |
|
|||
1430 | if pmagic is not None: |
|
|||
1431 | pmagic.active_multiengine_client = self |
|
|||
1432 | else: |
|
|||
1433 | print "You must first load the parallelmagic extension " \ |
|
|||
1434 | "by doing '%load_ext parallelmagic'" |
|
|||
1435 |
|
||||
1436 | class AsynClient(Client): |
|
|||
1437 | """An Asynchronous client, using the Tornado Event Loop. |
|
|||
1438 | !!!unfinished!!!""" |
|
|||
1439 | io_loop = None |
|
|||
1440 | _queue_stream = None |
|
|||
1441 | _notifier_stream = None |
|
|||
1442 | _task_stream = None |
|
|||
1443 | _control_stream = None |
|
|||
1444 |
|
||||
1445 | def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): |
|
|||
1446 | Client.__init__(self, addr, context, username, debug) |
|
|||
1447 | if io_loop is None: |
|
|||
1448 | io_loop = ioloop.IOLoop.instance() |
|
|||
1449 | self.io_loop = io_loop |
|
|||
1450 |
|
||||
1451 | self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop) |
|
|||
1452 | self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop) |
|
|||
1453 | self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop) |
|
|||
1454 | self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop) |
|
|||
1455 |
|
||||
1456 | def spin(self): |
|
|||
1457 | for stream in (self.queue_stream, self.notifier_stream, |
|
|||
1458 | self.task_stream, self.control_stream): |
|
|||
1459 | stream.flush() |
|
|||
1460 |
|
1316 | |||
1461 | __all__ = [ 'Client', |
|
1317 | __all__ = [ 'Client', | |
1462 | 'depend', |
|
1318 | 'depend', |
@@ -17,7 +17,7 b' from asyncresult import AsyncMapResult' | |||||
17 | # Decorators |
|
17 | # Decorators | |
18 | #----------------------------------------------------------------------------- |
|
18 | #----------------------------------------------------------------------------- | |
19 |
|
19 | |||
20 | def remote(client, bound=False, block=None, targets=None): |
|
20 | def remote(client, bound=False, block=None, targets=None, balanced=None): | |
21 | """Turn a function into a remote function. |
|
21 | """Turn a function into a remote function. | |
22 |
|
22 | |||
23 | This method can be used for map: |
|
23 | This method can be used for map: | |
@@ -26,10 +26,10 b' def remote(client, bound=False, block=None, targets=None):' | |||||
26 | def func(a) |
|
26 | def func(a) | |
27 | """ |
|
27 | """ | |
28 | def remote_function(f): |
|
28 | def remote_function(f): | |
29 | return RemoteFunction(client, f, bound, block, targets) |
|
29 | return RemoteFunction(client, f, bound, block, targets, balanced) | |
30 | return remote_function |
|
30 | return remote_function | |
31 |
|
31 | |||
32 | def parallel(client, dist='b', bound=False, block=None, targets='all'): |
|
32 | def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): | |
33 | """Turn a function into a parallel remote function. |
|
33 | """Turn a function into a parallel remote function. | |
34 |
|
34 | |||
35 | This method can be used for map: |
|
35 | This method can be used for map: | |
@@ -38,7 +38,7 b" def parallel(client, dist='b', bound=False, block=None, targets='all'):" | |||||
38 | def func(a) |
|
38 | def func(a) | |
39 | """ |
|
39 | """ | |
40 | def parallel_function(f): |
|
40 | def parallel_function(f): | |
41 | return ParallelFunction(client, f, dist, bound, block, targets) |
|
41 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) | |
42 | return parallel_function |
|
42 | return parallel_function | |
43 |
|
43 | |||
44 | #-------------------------------------------------------------------------- |
|
44 | #-------------------------------------------------------------------------- | |
@@ -62,6 +62,8 b' class RemoteFunction(object):' | |||||
62 | to use the current `block` attribute of `client` |
|
62 | to use the current `block` attribute of `client` | |
63 | targets : valid target list [default: all] |
|
63 | targets : valid target list [default: all] | |
64 | The targets on which to execute. |
|
64 | The targets on which to execute. | |
|
65 | balanced : bool | |||
|
66 | Whether to load-balance with the Task scheduler or not | |||
65 | """ |
|
67 | """ | |
66 |
|
68 | |||
67 | client = None # the remote connection |
|
69 | client = None # the remote connection | |
@@ -69,23 +71,30 b' class RemoteFunction(object):' | |||||
69 | block = None # whether to block |
|
71 | block = None # whether to block | |
70 | bound = None # whether to affect the namespace |
|
72 | bound = None # whether to affect the namespace | |
71 | targets = None # where to execute |
|
73 | targets = None # where to execute | |
|
74 | balanced = None # whether to load-balance | |||
72 |
|
75 | |||
73 | def __init__(self, client, f, bound=False, block=None, targets=None): |
|
76 | def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): | |
74 | self.client = client |
|
77 | self.client = client | |
75 | self.func = f |
|
78 | self.func = f | |
76 | self.block=block |
|
79 | self.block=block | |
77 | self.bound=bound |
|
80 | self.bound=bound | |
78 | self.targets=targets |
|
81 | self.targets=targets | |
|
82 | if balanced is None: | |||
|
83 | if targets is None: | |||
|
84 | balanced = True | |||
|
85 | else: | |||
|
86 | balanced = False | |||
|
87 | self.balanced = balanced | |||
79 |
|
88 | |||
80 | def __call__(self, *args, **kwargs): |
|
89 | def __call__(self, *args, **kwargs): | |
81 | return self.client.apply(self.func, args=args, kwargs=kwargs, |
|
90 | return self.client.apply(self.func, args=args, kwargs=kwargs, | |
82 | block=self.block, targets=self.targets, bound=self.bound) |
|
91 | block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) | |
83 |
|
92 | |||
84 |
|
93 | |||
85 | class ParallelFunction(RemoteFunction): |
|
94 | class ParallelFunction(RemoteFunction): | |
86 | """Class for mapping a function to sequences.""" |
|
95 | """Class for mapping a function to sequences.""" | |
87 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): |
|
96 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None): | |
88 | super(ParallelFunction, self).__init__(client,f,bound,block,targets) |
|
97 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | |
89 | mapClass = Map.dists[dist] |
|
98 | mapClass = Map.dists[dist] | |
90 | self.mapObject = mapClass() |
|
99 | self.mapObject = mapClass() | |
91 |
|
100 | |||
@@ -93,21 +102,19 b' class ParallelFunction(RemoteFunction):' | |||||
93 | len_0 = len(sequences[0]) |
|
102 | len_0 = len(sequences[0]) | |
94 | for s in sequences: |
|
103 | for s in sequences: | |
95 | if len(s)!=len_0: |
|
104 | if len(s)!=len_0: | |
96 |
|
|
105 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | |
|
106 | raise ValueError(msg) | |||
97 |
|
107 | |||
98 |
if self. |
|
108 | if self.balanced: | |
99 | # load-balanced: |
|
109 | targets = [self.targets]*len_0 | |
100 | engines = [None]*len_0 |
|
|||
101 | elif isinstance(self.targets, int): |
|
|||
102 | engines = [None]*self.targets |
|
|||
103 | else: |
|
110 | else: | |
104 | # multiplexed: |
|
111 | # multiplexed: | |
105 |
|
|
112 | targets = self.client._build_targets(self.targets)[-1] | |
106 |
|
113 | |||
107 |
nparts = len( |
|
114 | nparts = len(targets) | |
108 | msg_ids = [] |
|
115 | msg_ids = [] | |
109 | # my_f = lambda *a: map(self.func, *a) |
|
116 | # my_f = lambda *a: map(self.func, *a) | |
110 |
for index, |
|
117 | for index, t in enumerate(targets): | |
111 | args = [] |
|
118 | args = [] | |
112 | for seq in sequences: |
|
119 | for seq in sequences: | |
113 | part = self.mapObject.getPartition(seq, index, nparts) |
|
120 | part = self.mapObject.getPartition(seq, index, nparts) | |
@@ -124,22 +131,26 b' class ParallelFunction(RemoteFunction):' | |||||
124 | args = [self.func]+args |
|
131 | args = [self.func]+args | |
125 | else: |
|
132 | else: | |
126 | f=self.func |
|
133 | f=self.func | |
127 |
|
|
134 | ar = self.client.apply(f, args=args, block=False, bound=self.bound, | |
128 |
|
|
135 | targets=targets, balanced=self.balanced) | |
129 | targets=engineid).msg_ids[0] |
|
136 | ||
130 | msg_ids.append(mid) |
|
137 | msg_ids.append(ar.msg_ids[0]) | |
131 |
|
138 | |||
132 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) |
|
139 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) | |
133 | if self.block: |
|
140 | if self.block: | |
134 |
|
|
141 | try: | |
135 |
return r. |
|
142 | return r.get() | |
|
143 | except KeyboardInterrupt: | |||
|
144 | return r | |||
136 | else: |
|
145 | else: | |
137 | return r |
|
146 | return r | |
138 |
|
147 | |||
139 | def map(self, *sequences): |
|
148 | def map(self, *sequences): | |
140 | """call a function on each element of a sequence remotely.""" |
|
149 | """call a function on each element of a sequence remotely.""" | |
141 | self._map = True |
|
150 | self._map = True | |
142 | ret = self.__call__(*sequences) |
|
151 | try: | |
143 | del self._map |
|
152 | ret = self.__call__(*sequences) | |
|
153 | finally: | |||
|
154 | del self._map | |||
144 | return ret |
|
155 | return ret | |
145 |
|
156 |
@@ -59,7 +59,7 b' def validate_url(url):' | |||||
59 | except ValueError: |
|
59 | except ValueError: | |
60 | raise AssertionError("Invalid port %r in url: %r"%(port, url)) |
|
60 | raise AssertionError("Invalid port %r in url: %r"%(port, url)) | |
61 |
|
61 | |||
62 | assert pat.match(addr) is not None, 'Invalid url: %r'%url |
|
62 | assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url | |
63 |
|
63 | |||
64 | else: |
|
64 | else: | |
65 | # only validate tcp urls currently |
|
65 | # only validate tcp urls currently |
@@ -10,7 +10,11 b'' | |||||
10 | # Imports |
|
10 | # Imports | |
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
|
13 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance | |||
|
14 | ||||
13 | from IPython.external.decorator import decorator |
|
15 | from IPython.external.decorator import decorator | |
|
16 | from IPython.zmq.parallel.asyncresult import AsyncResult | |||
|
17 | from IPython.zmq.parallel.dependency import Dependency | |||
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel |
|
18 | from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel | |
15 |
|
19 | |||
16 | #----------------------------------------------------------------------------- |
|
20 | #----------------------------------------------------------------------------- | |
@@ -61,30 +65,29 b' def spin_after(f, self, *args, **kwargs):' | |||||
61 | # Classes |
|
65 | # Classes | |
62 | #----------------------------------------------------------------------------- |
|
66 | #----------------------------------------------------------------------------- | |
63 |
|
67 | |||
64 |
class View( |
|
68 | class View(HasTraits): | |
65 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. |
|
69 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | |
66 |
|
70 | |||
67 | Don't use this class, use subclasses. |
|
71 | Don't use this class, use subclasses. | |
68 | """ |
|
72 | """ | |
69 |
block= |
|
73 | block=Bool(False) | |
70 |
bound= |
|
74 | bound=Bool(False) | |
71 |
history= |
|
75 | history=List() | |
72 |
outstanding = |
|
76 | outstanding = Set() | |
73 |
results = |
|
77 | results = Dict() | |
74 |
|
78 | client = Instance('IPython.zmq.parallel.client.Client') | ||
|
79 | ||||
|
80 | _ntargets = Int(1) | |||
|
81 | _balanced = Bool(False) | |||
|
82 | _default_names = List(['block', 'bound']) | |||
75 | _targets = None |
|
83 | _targets = None | |
76 | _apply_name = 'apply' |
|
|||
77 | _default_names = ['targets', 'block'] |
|
|||
78 |
|
84 | |||
79 | def __init__(self, client, targets=None): |
|
85 | def __init__(self, client=None, targets=None): | |
80 |
self.client |
|
86 | super(View, self).__init__(client=client) | |
81 | self._targets = targets |
|
87 | self._targets = targets | |
82 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) |
|
88 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | |
83 | self.block = client.block |
|
89 | self.block = client.block | |
84 | self.bound=False |
|
90 | ||
85 | self.history = [] |
|
|||
86 | self.outstanding = set() |
|
|||
87 | self.results = {} |
|
|||
88 | for name in self._default_names: |
|
91 | for name in self._default_names: | |
89 | setattr(self, name, getattr(self, name, None)) |
|
92 | setattr(self, name, getattr(self, name, None)) | |
90 |
|
93 | |||
@@ -101,26 +104,46 b' class View(object):' | |||||
101 |
|
104 | |||
102 | @targets.setter |
|
105 | @targets.setter | |
103 | def targets(self, value): |
|
106 | def targets(self, value): | |
104 | self._targets = value |
|
107 | raise AttributeError("Cannot set View `targets` after construction!") | |
105 | # raise AttributeError("Cannot set my targets argument after construction!") |
|
|||
106 |
|
108 | |||
107 | def _defaults(self, *excludes): |
|
109 | def _defaults(self, *excludes): | |
108 | """return dict of our default attributes, excluding names given.""" |
|
110 | """return dict of our default attributes, excluding names given.""" | |
109 | d = {} |
|
111 | d = dict(balanced=self._balanced, targets=self.targets) | |
110 | for name in self._default_names: |
|
112 | for name in self._default_names: | |
111 | if name not in excludes: |
|
113 | if name not in excludes: | |
112 | d[name] = getattr(self, name) |
|
114 | d[name] = getattr(self, name) | |
113 | return d |
|
115 | return d | |
114 |
|
116 | |||
|
117 | def set_flags(self, **kwargs): | |||
|
118 | """set my attribute flags by keyword. | |||
|
119 | ||||
|
120 | A View is a wrapper for the Client's apply method, but | |||
|
121 | with attributes that specify keyword arguments, those attributes | |||
|
122 | can be set by keyword argument with this method. | |||
|
123 | ||||
|
124 | Parameters | |||
|
125 | ---------- | |||
|
126 | ||||
|
127 | block : bool | |||
|
128 | whether to wait for results | |||
|
129 | bound : bool | |||
|
130 | whether to use the client's namespace | |||
|
131 | """ | |||
|
132 | for key in kwargs: | |||
|
133 | if key not in self._default_names: | |||
|
134 | raise KeyError("Invalid name: %r"%key) | |||
|
135 | for name in ('block', 'bound'): | |||
|
136 | if name in kwargs: | |||
|
137 | setattr(self, name, kwargs) | |||
|
138 | ||||
|
139 | #---------------------------------------------------------------- | |||
|
140 | # wrappers for client methods: | |||
|
141 | #---------------------------------------------------------------- | |||
115 | @sync_results |
|
142 | @sync_results | |
116 | def spin(self): |
|
143 | def spin(self): | |
117 | """spin the client, and sync""" |
|
144 | """spin the client, and sync""" | |
118 | self.client.spin() |
|
145 | self.client.spin() | |
119 |
|
146 | |||
120 | @property |
|
|||
121 | def _apply(self): |
|
|||
122 | return getattr(self.client, self._apply_name) |
|
|||
123 |
|
||||
124 | @sync_results |
|
147 | @sync_results | |
125 | @save_ids |
|
148 | @save_ids | |
126 | def apply(self, f, *args, **kwargs): |
|
149 | def apply(self, f, *args, **kwargs): | |
@@ -133,7 +156,7 b' class View(object):' | |||||
133 | else: |
|
156 | else: | |
134 | returns actual result of f(*args, **kwargs) |
|
157 | returns actual result of f(*args, **kwargs) | |
135 | """ |
|
158 | """ | |
136 |
return self. |
|
159 | return self.client.apply(f, args, kwargs, **self._defaults()) | |
137 |
|
160 | |||
138 | @save_ids |
|
161 | @save_ids | |
139 | def apply_async(self, f, *args, **kwargs): |
|
162 | def apply_async(self, f, *args, **kwargs): | |
@@ -144,7 +167,7 b' class View(object):' | |||||
144 | returns msg_id |
|
167 | returns msg_id | |
145 | """ |
|
168 | """ | |
146 | d = self._defaults('block', 'bound') |
|
169 | d = self._defaults('block', 'bound') | |
147 |
return self. |
|
170 | return self.client.apply(f,args,kwargs, block=False, bound=False, **d) | |
148 |
|
171 | |||
149 | @spin_after |
|
172 | @spin_after | |
150 | @save_ids |
|
173 | @save_ids | |
@@ -157,7 +180,7 b' class View(object):' | |||||
157 | returns: actual result of f(*args, **kwargs) |
|
180 | returns: actual result of f(*args, **kwargs) | |
158 | """ |
|
181 | """ | |
159 | d = self._defaults('block', 'bound') |
|
182 | d = self._defaults('block', 'bound') | |
160 |
return self. |
|
183 | return self.client.apply(f,args,kwargs, block=True, bound=False, **d) | |
161 |
|
184 | |||
162 | @sync_results |
|
185 | @sync_results | |
163 | @save_ids |
|
186 | @save_ids | |
@@ -173,7 +196,7 b' class View(object):' | |||||
173 |
|
196 | |||
174 | """ |
|
197 | """ | |
175 | d = self._defaults('bound') |
|
198 | d = self._defaults('bound') | |
176 |
return self. |
|
199 | return self.client.apply(f, args, kwargs, bound=True, **d) | |
177 |
|
200 | |||
178 | @sync_results |
|
201 | @sync_results | |
179 | @save_ids |
|
202 | @save_ids | |
@@ -187,7 +210,7 b' class View(object):' | |||||
187 |
|
210 | |||
188 | """ |
|
211 | """ | |
189 | d = self._defaults('block', 'bound') |
|
212 | d = self._defaults('block', 'bound') | |
190 |
return self. |
|
213 | return self.client.apply(f, args, kwargs, block=False, bound=True, **d) | |
191 |
|
214 | |||
192 | @spin_after |
|
215 | @spin_after | |
193 | @save_ids |
|
216 | @save_ids | |
@@ -200,23 +223,7 b' class View(object):' | |||||
200 |
|
223 | |||
201 | """ |
|
224 | """ | |
202 | d = self._defaults('block', 'bound') |
|
225 | d = self._defaults('block', 'bound') | |
203 |
return self. |
|
226 | return self.client.apply(f, args, kwargs, block=True, bound=True, **d) | |
204 |
|
||||
205 | @spin_after |
|
|||
206 | @save_ids |
|
|||
207 | def map(self, f, *sequences): |
|
|||
208 | """Parallel version of builtin `map`, using this view's engines.""" |
|
|||
209 | if isinstance(self.targets, int): |
|
|||
210 | targets = [self.targets] |
|
|||
211 | else: |
|
|||
212 | targets = self.targets |
|
|||
213 | pf = ParallelFunction(self.client, f, block=self.block, |
|
|||
214 | bound=True, targets=targets) |
|
|||
215 | return pf.map(*sequences) |
|
|||
216 |
|
||||
217 | def parallel(self, bound=True, block=True): |
|
|||
218 | """Decorator for making a ParallelFunction""" |
|
|||
219 | return parallel(self.client, bound=bound, targets=self.targets, block=block) |
|
|||
220 |
|
227 | |||
221 | def abort(self, msg_ids=None, block=None): |
|
228 | def abort(self, msg_ids=None, block=None): | |
222 | """Abort jobs on my engines. |
|
229 | """Abort jobs on my engines. | |
@@ -240,6 +247,17 b' class View(object):' | |||||
240 | if targets is None or targets == 'all': |
|
247 | if targets is None or targets == 'all': | |
241 | targets = self.targets |
|
248 | targets = self.targets | |
242 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) |
|
249 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) | |
|
250 | ||||
|
251 | #------------------------------------------------------------------- | |||
|
252 | # Decorators | |||
|
253 | #------------------------------------------------------------------- | |||
|
254 | def parallel(self, bound=True, block=True): | |||
|
255 | """Decorator for making a ParallelFunction""" | |||
|
256 | return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) | |||
|
257 | ||||
|
258 | def remote(self, bound=True, block=True): | |||
|
259 | """Decorator for making a RemoteFunction""" | |||
|
260 | return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) | |||
243 |
|
261 | |||
244 |
|
262 | |||
245 |
|
263 | |||
@@ -262,6 +280,62 b' class DirectView(View):' | |||||
262 |
|
280 | |||
263 | """ |
|
281 | """ | |
264 |
|
282 | |||
|
283 | def __init__(self, client=None, targets=None): | |||
|
284 | super(DirectView, self).__init__(client=client, targets=targets) | |||
|
285 | self._balanced = False | |||
|
286 | ||||
|
287 | @spin_after | |||
|
288 | @save_ids | |||
|
289 | def map(self, f, *sequences, **kwargs): | |||
|
290 | """Parallel version of builtin `map`, using this View's `targets`. | |||
|
291 | ||||
|
292 | There will be one task per target, so work will be chunked | |||
|
293 | if the sequences are longer than `targets`. | |||
|
294 | ||||
|
295 | Results can be iterated as they are ready, but will become available in chunks. | |||
|
296 | ||||
|
297 | Parameters | |||
|
298 | ---------- | |||
|
299 | ||||
|
300 | f : callable | |||
|
301 | function to be mapped | |||
|
302 | *sequences: one or more sequences of matching length | |||
|
303 | the sequences to be distributed and passed to `f` | |||
|
304 | block : bool | |||
|
305 | whether to wait for the result or not [default self.block] | |||
|
306 | bound : bool | |||
|
307 | whether to wait for the result or not [default self.bound] | |||
|
308 | ||||
|
309 | Returns | |||
|
310 | ------- | |||
|
311 | ||||
|
312 | if block=False: | |||
|
313 | AsyncMapResult | |||
|
314 | An object like AsyncResult, but which reassembles the sequence of results | |||
|
315 | into a single list. AsyncMapResults can be iterated through before all | |||
|
316 | results are complete. | |||
|
317 | else: | |||
|
318 | the result of map(f,*sequences) | |||
|
319 | """ | |||
|
320 | ||||
|
321 | block = kwargs.get('block', self.block) | |||
|
322 | bound = kwargs.get('bound', self.bound) | |||
|
323 | for k in kwargs.keys(): | |||
|
324 | if k not in ['block', 'bound']: | |||
|
325 | raise TypeError("invalid keyword arg, %r"%k) | |||
|
326 | ||||
|
327 | assert len(sequences) > 0, "must have some sequences to map onto!" | |||
|
328 | pf = ParallelFunction(self.client, f, block=block, | |||
|
329 | bound=bound, targets=self.targets, balanced=False) | |||
|
330 | return pf.map(*sequences) | |||
|
331 | ||||
|
332 | def map_async(self, f, *sequences, **kwargs): | |||
|
333 | """Parallel version of builtin `map`, using this view's engines.""" | |||
|
334 | if 'block' in kwargs: | |||
|
335 | raise TypeError("map_async doesn't take a `block` keyword argument.") | |||
|
336 | kwargs['block'] = True | |||
|
337 | return self.map(f,*sequences,**kwargs) | |||
|
338 | ||||
265 | @sync_results |
|
339 | @sync_results | |
266 | @save_ids |
|
340 | @save_ids | |
267 | def execute(self, code, block=True): |
|
341 | def execute(self, code, block=True): | |
@@ -358,14 +432,13 b' class DirectView(View):' | |||||
358 |
|
432 | |||
359 |
|
433 | |||
360 | class LoadBalancedView(View): |
|
434 | class LoadBalancedView(View): | |
361 |
"""An |
|
435 | """An load-balancing View that only executes via the Task scheduler. | |
362 |
|
436 | |||
363 | Typically created via: |
|
437 | Load-balanced views can be created with the client's `view` method: | |
364 |
|
438 | |||
365 |
>>> v = client |
|
439 | >>> v = client.view(balanced=True) | |
366 | <LoadBalancedView None> |
|
|||
367 |
|
440 | |||
368 | but can also be created with: |
|
441 | or targets can be specified, to restrict the potential destinations: | |
369 |
|
442 | |||
370 | >>> v = client.view([1,3],balanced=True) |
|
443 | >>> v = client.view([1,3],balanced=True) | |
371 |
|
444 | |||
@@ -374,10 +447,126 b' class LoadBalancedView(View):' | |||||
374 | """ |
|
447 | """ | |
375 |
|
448 | |||
376 | _apply_name = 'apply_balanced' |
|
449 | _apply_name = 'apply_balanced' | |
377 |
_default_names = [ |
|
450 | _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] | |
378 |
|
451 | |||
379 | def __init__(self, client, targets=None): |
|
452 | def __init__(self, client=None, targets=None): | |
380 | super(LoadBalancedView, self).__init__(client, targets) |
|
453 | super(LoadBalancedView, self).__init__(client=client, targets=targets) | |
381 | self._ntargets = 1 |
|
454 | self._ntargets = 1 | |
382 | self._apply_name = 'apply_balanced' |
|
455 | ||
|
456 | def _validate_dependency(self, dep): | |||
|
457 | """validate a dependency. | |||
|
458 | ||||
|
459 | For use in `set_flags`. | |||
|
460 | """ | |||
|
461 | if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): | |||
|
462 | return True | |||
|
463 | elif isinstance(dep, (list,set, tuple)): | |||
|
464 | for d in dep: | |||
|
465 | if not isinstance(d, str, AsyncResult): | |||
|
466 | return False | |||
|
467 | elif isinstance(dep, dict): | |||
|
468 | if set(dep.keys()) != set(Dependency().as_dict().keys()): | |||
|
469 | return False | |||
|
470 | if not isinstance(dep['msg_ids'], list): | |||
|
471 | return False | |||
|
472 | for d in dep['msg_ids']: | |||
|
473 | if not isinstance(d, str): | |||
|
474 | return False | |||
|
475 | else: | |||
|
476 | return False | |||
|
477 | ||||
|
478 | def set_flags(self, **kwargs): | |||
|
479 | """set my attribute flags by keyword. | |||
|
480 | ||||
|
481 | A View is a wrapper for the Client's apply method, but | |||
|
482 | with attributes that specify keyword arguments, those attributes | |||
|
483 | can be set by keyword argument with this method. | |||
|
484 | ||||
|
485 | Parameters | |||
|
486 | ---------- | |||
|
487 | ||||
|
488 | block : bool | |||
|
489 | whether to wait for results | |||
|
490 | bound : bool | |||
|
491 | whether to use the engine's namespace | |||
|
492 | follow : Dependency, list, msg_id, AsyncResult | |||
|
493 | the location dependencies of tasks | |||
|
494 | after : Dependency, list, msg_id, AsyncResult | |||
|
495 | the time dependencies of tasks | |||
|
496 | timeout : int,None | |||
|
497 | the timeout to be used for tasks | |||
|
498 | """ | |||
|
499 | ||||
|
500 | super(LoadBalancedView, self).set_flags(**kwargs) | |||
|
501 | for name in ('follow', 'after'): | |||
|
502 | if name in kwargs: | |||
|
503 | value = kwargs[name] | |||
|
504 | if self._validate_dependency(value): | |||
|
505 | setattr(self, name, value) | |||
|
506 | else: | |||
|
507 | raise ValueError("Invalid dependency: %r"%value) | |||
|
508 | if 'timeout' in kwargs: | |||
|
509 | t = kwargs['timeout'] | |||
|
510 | if not isinstance(t, (int, long, float, None)): | |||
|
511 | raise TypeError("Invalid type for timeout: %r"%type(t)) | |||
|
512 | if t is not None: | |||
|
513 | if t < 0: | |||
|
514 | raise ValueError("Invalid timeout: %s"%t) | |||
|
515 | self.timeout = t | |||
|
516 | ||||
|
517 | @spin_after | |||
|
518 | @save_ids | |||
|
519 | def map(self, f, *sequences, **kwargs): | |||
|
520 | """Parallel version of builtin `map`, load-balanced by this View. | |||
|
521 | ||||
|
522 | Each element will be a separate task, and will be load-balanced. This | |||
|
523 | lets individual elements be available for iteration as soon as they arrive. | |||
|
524 | ||||
|
525 | Parameters | |||
|
526 | ---------- | |||
|
527 | ||||
|
528 | f : callable | |||
|
529 | function to be mapped | |||
|
530 | *sequences: one or more sequences of matching length | |||
|
531 | the sequences to be distributed and passed to `f` | |||
|
532 | block : bool | |||
|
533 | whether to wait for the result or not [default self.block] | |||
|
534 | bound : bool | |||
|
535 | whether to use the engine's namespace | |||
|
536 | ||||
|
537 | Returns | |||
|
538 | ------- | |||
|
539 | ||||
|
540 | if block=False: | |||
|
541 | AsyncMapResult | |||
|
542 | An object like AsyncResult, but which reassembles the sequence of results | |||
|
543 | into a single list. AsyncMapResults can be iterated through before all | |||
|
544 | results are complete. | |||
|
545 | else: | |||
|
546 | the result of map(f,*sequences) | |||
|
547 | ||||
|
548 | """ | |||
|
549 | ||||
|
550 | block = kwargs.get('block', self.block) | |||
|
551 | bound = kwargs.get('bound', self.bound) | |||
|
552 | ||||
|
553 | assert len(sequences) > 0, "must have some sequences to map onto!" | |||
|
554 | ||||
|
555 | pf = ParallelFunction(self.client, f, block=block, bound=bound, | |||
|
556 | targets=self.targets, balanced=True) | |||
|
557 | return pf.map(*sequences) | |||
|
558 | ||||
|
559 | def map_async(self, f, *sequences, **kwargs): | |||
|
560 | """Parallel version of builtin `map`, using this view's engines. | |||
|
561 | ||||
|
562 | This is equivalent to map(...block=False) | |||
|
563 | ||||
|
564 | See `map` for details. | |||
|
565 | """ | |||
|
566 | ||||
|
567 | if 'block' in kwargs: | |||
|
568 | raise TypeError("map_async doesn't take a `block` keyword argument.") | |||
|
569 | kwargs['block'] = True | |||
|
570 | return self.map(f,*sequences,**kwargs) | |||
|
571 | ||||
383 |
|
572 |
@@ -37,7 +37,7 b' module and then create a :class:`.Client` instance:' | |||||
37 | In [2]: rc = client.Client() |
|
37 | In [2]: rc = client.Client() | |
38 |
|
38 | |||
39 | This form assumes that the default connection information (stored in |
|
39 | This form assumes that the default connection information (stored in | |
40 |
:file:`ipcontroller-client.json` found in |
|
40 | :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is | |
41 | accurate. If the controller was started on a remote machine, you must copy that connection |
|
41 | accurate. If the controller was started on a remote machine, you must copy that connection | |
42 | file to the client machine, or enter its contents as arguments to the Client constructor: |
|
42 | file to the client machine, or enter its contents as arguments to the Client constructor: | |
43 |
|
43 | |||
@@ -45,17 +45,17 b' file to the client machine, or enter its contents as arguments to the Client con' | |||||
45 |
|
45 | |||
46 | # If you have copied the json connector file from the controller: |
|
46 | # If you have copied the json connector file from the controller: | |
47 | In [2]: rc = client.Client('/path/to/ipcontroller-client.json') |
|
47 | In [2]: rc = client.Client('/path/to/ipcontroller-client.json') | |
48 | # for a remote controller at 10.0.1.5, visible from my.server.com: |
|
48 | # or for a remote controller at 10.0.1.5, visible from my.server.com: | |
49 | In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com') |
|
49 | In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com') | |
50 |
|
50 | |||
51 |
|
51 | |||
52 | To make sure there are engines connected to the controller, use can get a list |
|
52 | To make sure there are engines connected to the controller, users can get a list | |
53 | of engine ids: |
|
53 | of engine ids: | |
54 |
|
54 | |||
55 | .. sourcecode:: ipython |
|
55 | .. sourcecode:: ipython | |
56 |
|
56 | |||
57 | In [3]: rc.ids |
|
57 | In [3]: rc.ids | |
58 |
Out[3]: |
|
58 | Out[3]: [0, 1, 2, 3] | |
59 |
|
59 | |||
60 | Here we see that there are four engines ready to do work for us. |
|
60 | Here we see that there are four engines ready to do work for us. | |
61 |
|
61 | |||
@@ -73,24 +73,25 b' Parallel map' | |||||
73 | Python's builtin :func:`map` functions allows a function to be applied to a |
|
73 | Python's builtin :func:`map` functions allows a function to be applied to a | |
74 | sequence element-by-element. This type of code is typically trivial to |
|
74 | sequence element-by-element. This type of code is typically trivial to | |
75 | parallelize. In fact, since IPython's interface is all about functions anyway, |
|
75 | parallelize. In fact, since IPython's interface is all about functions anyway, | |
76 |
you can just use the builtin :func:`map` |
|
76 | you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a | |
|
77 | DirectView's :meth:`map` method: | |||
77 |
|
78 | |||
78 | .. sourcecode:: ipython |
|
79 | .. sourcecode:: ipython | |
79 |
|
80 | |||
80 | In [62]: serial_result = map(lambda x:x**10, range(32)) |
|
81 | In [62]: serial_result = map(lambda x:x**10, range(32)) | |
81 |
|
82 | |||
82 | In [66]: parallel_result = rc.map(lambda x: x**10, range(32)) |
|
83 | In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32)) | |
83 |
|
84 | |||
84 | In [67]: serial_result==parallel_result |
|
85 | In [67]: serial_result==parallel_result.get() | |
85 | Out[67]: True |
|
86 | Out[67]: True | |
86 |
|
87 | |||
87 |
|
88 | |||
88 | .. note:: |
|
89 | .. note:: | |
89 |
|
90 | |||
90 | The client's own version of :meth:`map` or that of :class:`.DirectView` do |
|
91 | The :class:`DirectView`'s version of :meth:`map` does | |
91 | not do any load balancing. For a load balanced version, use a |
|
92 | not do any load balancing. For a load balanced version, use a | |
92 | :class:`LoadBalancedView`, or a :class:`ParallelFunction` with |
|
93 | :class:`LoadBalancedView`, or a :class:`ParallelFunction` with | |
93 | `targets=None`. |
|
94 | `balanced=True`. | |
94 |
|
95 | |||
95 | .. seealso:: |
|
96 | .. seealso:: | |
96 |
|
97 | |||
@@ -105,16 +106,18 b' some decorators:' | |||||
105 |
|
106 | |||
106 | .. sourcecode:: ipython |
|
107 | .. sourcecode:: ipython | |
107 |
|
108 | |||
108 | In [10]: @rc.remote(block=True) |
|
109 | In [10]: @rc.remote(block=True, targets=0) | |
109 | ....: def f(x): |
|
110 | ....: def f(x): | |
110 | ....: return 10.0*x**4 |
|
111 | ....: return 10.0*x**4 | |
111 | ....: |
|
112 | ....: | |
112 |
|
113 | |||
113 |
In [11]: map(f, range(32)) # this is done |
|
114 | In [11]: map(f, range(32)) # this is done on engine 0 | |
114 | Out[11]: [0.0,10.0,160.0,...] |
|
115 | Out[11]: [0.0,10.0,160.0,...] | |
115 |
|
116 | |||
116 | See the docstring for the :func:`parallel` and :func:`remote` decorators for |
|
117 | .. seealso:: | |
117 | options. |
|
118 | ||
|
119 | See the docstring for the :func:`parallel` and :func:`remote` decorators for | |||
|
120 | options. | |||
118 |
|
121 | |||
119 | Calling Python functions |
|
122 | Calling Python functions | |
120 | ======================== |
|
123 | ======================== | |
@@ -141,7 +144,7 b' Instead, they provide the signature::' | |||||
141 | In order to provide the nicer interface, we have :class:`View` classes, which wrap |
|
144 | In order to provide the nicer interface, we have :class:`View` classes, which wrap | |
142 | :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine |
|
145 | :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine | |
143 | the extra arguments. For instance, performing index-access on a client creates a |
|
146 | the extra arguments. For instance, performing index-access on a client creates a | |
144 |
:class:`. |
|
147 | :class:`.DirectView`. | |
145 |
|
148 | |||
146 | .. sourcecode:: ipython |
|
149 | .. sourcecode:: ipython | |
147 |
|
150 | |||
@@ -218,7 +221,7 b' unbound, unless called by the :meth:`apply_bound` method:' | |||||
218 |
|
221 | |||
219 | .. sourcecode:: ipython |
|
222 | .. sourcecode:: ipython | |
220 |
|
223 | |||
221 |
In [9]: |
|
224 | In [9]: dview['b'] = 5 # assign b to 5 everywhere | |
222 |
|
225 | |||
223 | In [10]: v0 = rc[0] |
|
226 | In [10]: v0 = rc[0] | |
224 |
|
227 | |||
@@ -277,14 +280,14 b' local Python/IPython session:' | |||||
277 | ...: return time.time()-tic |
|
280 | ...: return time.time()-tic | |
278 |
|
281 | |||
279 | # In non-blocking mode |
|
282 | # In non-blocking mode | |
280 |
In [7]: pr = |
|
283 | In [7]: pr = dview.apply_async(wait, 2) | |
281 |
|
284 | |||
282 | # Now block for the result |
|
285 | # Now block for the result | |
283 | In [8]: pr.get() |
|
286 | In [8]: pr.get() | |
284 | Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] |
|
287 | Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] | |
285 |
|
288 | |||
286 | # Again in non-blocking mode |
|
289 | # Again in non-blocking mode | |
287 |
In [9]: pr = |
|
290 | In [9]: pr = dview.apply_async(wait, 10) | |
288 |
|
291 | |||
289 | # Poll to see if the result is ready |
|
292 | # Poll to see if the result is ready | |
290 | In [10]: pr.ready() |
|
293 | In [10]: pr.ready() | |
@@ -321,7 +324,7 b' associated results are ready:' | |||||
321 | In [72]: rc.block=False |
|
324 | In [72]: rc.block=False | |
322 |
|
325 | |||
323 | # A trivial list of AsyncResults objects |
|
326 | # A trivial list of AsyncResults objects | |
324 |
In [73]: pr_list = [ |
|
327 | In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] | |
325 |
|
328 | |||
326 | # Wait until all of them are done |
|
329 | # Wait until all of them are done | |
327 | In [74]: rc.barrier(pr_list) |
|
330 | In [74]: rc.barrier(pr_list) | |
@@ -332,63 +335,38 b' associated results are ready:' | |||||
332 |
|
335 | |||
333 |
|
336 | |||
334 |
|
337 | |||
335 |
The ``block`` |
|
338 | The ``block`` keyword argument and attributes | |
336 |
--------------------------------------------- |
|
339 | --------------------------------------------- | |
337 |
|
||||
338 | .. warning:: |
|
|||
339 |
|
||||
340 | This is different now, I haven't updated this section. |
|
|||
341 | -MinRK |
|
|||
342 |
|
340 | |||
343 | Most methods(like :meth:`apply`) accept |
|
341 | Most methods(like :meth:`apply`) accept | |
344 |
``block`` a |
|
342 | ``block`` as a keyword argument. As we have seen above, these | |
345 |
keyword arguments control the blocking mode |
|
343 | keyword arguments control the blocking mode . The :class:`Client` class also has | |
346 | applied to. The :class:`Client` class also has :attr:`block` and |
|
344 | a :attr:`block` attribute that controls the default behavior when the keyword | |
347 | :attr:`targets` attributes that control the default behavior when the keyword |
|
345 | argument is not provided. Thus the following logic is used for :attr:`block`: | |
348 | arguments are not provided. Thus the following logic is used for :attr:`block` |
|
|||
349 | and :attr:`targets`: |
|
|||
350 |
|
346 | |||
351 | * If no keyword argument is provided, the instance attributes are used. |
|
347 | * If no keyword argument is provided, the instance attributes are used. | |
352 |
* Keyword argument, if provided override the instance attributes |
|
348 | * Keyword argument, if provided override the instance attributes for | |
|
349 | the duration of a single call. | |||
353 |
|
350 | |||
354 | The following examples demonstrate how to use the instance attributes: |
|
351 | The following examples demonstrate how to use the instance attributes: | |
355 |
|
352 | |||
356 | .. sourcecode:: ipython |
|
353 | .. sourcecode:: ipython | |
357 |
|
354 | |||
358 | In [16]: rc.targets = [0,2] |
|
|||
359 |
|
||||
360 | In [17]: rc.block = False |
|
355 | In [17]: rc.block = False | |
361 |
|
356 | |||
362 | In [18]: pr = rc.execute('a=5') |
|
357 | In [18]: ar = rc.apply(lambda : 10, targets=[0,2]) | |
363 |
|
||||
364 | In [19]: pr.r |
|
|||
365 | Out[19]: |
|
|||
366 | <Results List> |
|
|||
367 | [0] In [6]: a=5 |
|
|||
368 | [2] In [6]: a=5 |
|
|||
369 |
|
358 | |||
370 | # Note targets='all' means all engines |
|
359 | In [19]: ar.get() | |
371 | In [20]: rc.targets = 'all' |
|
360 | Out[19]: [10,10] | |
372 |
|
361 | |||
373 | In [21]: rc.block = True |
|
362 | In [21]: rc.block = True | |
374 |
|
363 | |||
375 | In [22]: rc.execute('b=10; print b') |
|
364 | # Note targets='all' means all engines | |
376 | Out[22]: |
|
365 | In [22]: rc.apply(lambda : 42, targets='all') | |
377 | <Results List> |
|
366 | Out[22]: [42, 42, 42, 42] | |
378 | [0] In [7]: b=10; print b |
|
|||
379 | [0] Out[7]: 10 |
|
|||
380 |
|
||||
381 | [1] In [6]: b=10; print b |
|
|||
382 | [1] Out[6]: 10 |
|
|||
383 |
|
||||
384 | [2] In [7]: b=10; print b |
|
|||
385 | [2] Out[7]: 10 |
|
|||
386 |
|
||||
387 | [3] In [6]: b=10; print b |
|
|||
388 | [3] Out[6]: 10 |
|
|||
389 |
|
367 | |||
390 |
The :attr:`block` and :attr:`targets` instance attributes |
|
368 | The :attr:`block` and :attr:`targets` instance attributes of the | |
391 | behavior of the parallel magic commands. |
|
369 | :class:`.DirectView` also determine the behavior of the parallel magic commands. | |
392 |
|
370 | |||
393 |
|
371 | |||
394 | Parallel magic commands |
|
372 | Parallel magic commands | |
@@ -567,9 +545,9 b' appear as a local dictionary. Underneath, this uses :meth:`push` and' | |||||
567 |
|
545 | |||
568 | In [50]: rc.block=True |
|
546 | In [50]: rc.block=True | |
569 |
|
547 | |||
570 |
In [51]: |
|
548 | In [51]: dview['a']=['foo','bar'] | |
571 |
|
549 | |||
572 |
In [52]: |
|
550 | In [52]: dview['a'] | |
573 | Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] |
|
551 | Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] | |
574 |
|
552 | |||
575 | Scatter and gather |
|
553 | Scatter and gather | |
@@ -585,13 +563,13 b' between engines, MPI should be used:' | |||||
585 |
|
563 | |||
586 | .. sourcecode:: ipython |
|
564 | .. sourcecode:: ipython | |
587 |
|
565 | |||
588 |
In [58]: |
|
566 | In [58]: dview.scatter('a',range(16)) | |
589 | Out[58]: [None,None,None,None] |
|
567 | Out[58]: [None,None,None,None] | |
590 |
|
568 | |||
591 |
In [59]: |
|
569 | In [59]: dview['a'] | |
592 | Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] |
|
570 | Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] | |
593 |
|
571 | |||
594 |
In [60]: |
|
572 | In [60]: dview.gather('a') | |
595 | Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] |
|
573 | Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] | |
596 |
|
574 | |||
597 | Other things to look at |
|
575 | Other things to look at | |
@@ -606,14 +584,14 b' basic effect using :meth:`scatter` and :meth:`gather`:' | |||||
606 |
|
584 | |||
607 | .. sourcecode:: ipython |
|
585 | .. sourcecode:: ipython | |
608 |
|
586 | |||
609 |
In [66]: |
|
587 | In [66]: dview.scatter('x',range(64)) | |
610 | Out[66]: [None,None,None,None] |
|
588 | Out[66]: [None,None,None,None] | |
611 |
|
589 | |||
612 | In [67]: px y = [i**10 for i in x] |
|
590 | In [67]: px y = [i**10 for i in x] | |
613 | Parallel execution on engines: [0, 1, 2, 3] |
|
591 | Parallel execution on engines: [0, 1, 2, 3] | |
614 | Out[67]: |
|
592 | Out[67]: | |
615 |
|
593 | |||
616 |
In [68]: y = |
|
594 | In [68]: y = dview.gather('y') | |
617 |
|
595 | |||
618 | In [69]: print y |
|
596 | In [69]: print y | |
619 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] |
|
597 | [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] |
@@ -41,8 +41,8 b' module and then create a :class:`.Client` instance:' | |||||
41 |
|
41 | |||
42 | In [2]: rc = client.Client() |
|
42 | In [2]: rc = client.Client() | |
43 |
|
43 | |||
44 |
In [3]: lview = rc |
|
44 | In [3]: lview = rc.view(balanced=True) | |
45 |
Out[3]: <LoadBalancedView |
|
45 | Out[3]: <LoadBalancedView None> | |
46 |
|
46 | |||
47 |
|
47 | |||
48 | This form assumes that the controller was started on localhost with default |
|
48 | This form assumes that the controller was started on localhost with default | |
@@ -66,7 +66,7 b' objects, but *in parallel*. Like the multiengine interface, these can be' | |||||
66 | implemented via the task interface. The exact same tools can perform these |
|
66 | implemented via the task interface. The exact same tools can perform these | |
67 | actions in load-balanced ways as well as multiplexed ways: a parallel version |
|
67 | actions in load-balanced ways as well as multiplexed ways: a parallel version | |
68 | of :func:`map` and :func:`@parallel` function decorator. If one specifies the |
|
68 | of :func:`map` and :func:`@parallel` function decorator. If one specifies the | |
69 |
argument ` |
|
69 | argument `balanced=True`, then they are dynamically load balanced. Thus, if the | |
70 | execution time per item varies significantly, you should use the versions in |
|
70 | execution time per item varies significantly, you should use the versions in | |
71 | the task interface. |
|
71 | the task interface. | |
72 |
|
72 | |||
@@ -80,7 +80,7 b' for the ``None`` element:' | |||||
80 |
|
80 | |||
81 | In [63]: serial_result = map(lambda x:x**10, range(32)) |
|
81 | In [63]: serial_result = map(lambda x:x**10, range(32)) | |
82 |
|
82 | |||
83 |
In [64]: parallel_result = |
|
83 | In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True) | |
84 |
|
84 | |||
85 | In [65]: serial_result==parallel_result |
|
85 | In [65]: serial_result==parallel_result | |
86 | Out[65]: True |
|
86 | Out[65]: True | |
@@ -258,13 +258,13 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||||
258 |
|
258 | |||
259 | In [14]: client.block=False |
|
259 | In [14]: client.block=False | |
260 |
|
260 | |||
261 |
In [15]: ar = client.apply(f, args, kwargs, |
|
261 | In [15]: ar = client.apply(f, args, kwargs, balanced=True) | |
262 |
|
262 | |||
263 |
In [16]: ar2 = client.apply(f2, |
|
263 | In [16]: ar2 = client.apply(f2, balanced=True) | |
264 |
|
264 | |||
265 | In [17]: ar3 = client.apply(f3, after=[ar,ar2]) |
|
265 | In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True) | |
266 |
|
266 | |||
267 | In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5) |
|
267 | In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True) | |
268 |
|
268 | |||
269 |
|
269 | |||
270 | .. seealso:: |
|
270 | .. seealso:: | |
@@ -383,7 +383,7 b' The following is an overview of how to use these classes together:' | |||||
383 | 1. Create a :class:`Client`. |
|
383 | 1. Create a :class:`Client`. | |
384 | 2. Define some functions to be run as tasks |
|
384 | 2. Define some functions to be run as tasks | |
385 | 3. Submit your tasks to using the :meth:`apply` method of your |
|
385 | 3. Submit your tasks to using the :meth:`apply` method of your | |
386 |
:class:`Client` instance, specifying ` |
|
386 | :class:`Client` instance, specifying `balanced=True`. This signals | |
387 | the :class:`Client` to entrust the Scheduler with assigning tasks to engines. |
|
387 | the :class:`Client` to entrust the Scheduler with assigning tasks to engines. | |
388 | 4. Use :meth:`Client.get_results` to get the results of the |
|
388 | 4. Use :meth:`Client.get_results` to get the results of the | |
389 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
|
389 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
General Comments 0
You need to be logged in to leave comments.
Login now