Show More
@@ -1032,8 +1032,14 b' class Client(HasTraits):' | |||||
1032 | def direct_view(self, targets='all'): |
|
1032 | def direct_view(self, targets='all'): | |
1033 | """construct a DirectView object. |
|
1033 | """construct a DirectView object. | |
1034 |
|
1034 | |||
1035 | If no targets are specified, create a DirectView |
|
1035 | If no targets are specified, create a DirectView using all engines. | |
1036 | using all engines. |
|
1036 | ||
|
1037 | rc.direct_view('all') is distinguished from rc[:] in that 'all' will | |||
|
1038 | evaluate the target engines at each execution, whereas rc[:] will connect to | |||
|
1039 | all *current* engines, and that list will not change. | |||
|
1040 | ||||
|
1041 | That is, 'all' will always use all engines, whereas rc[:] will not use | |||
|
1042 | engines added after the DirectView is constructed. | |||
1037 |
|
1043 | |||
1038 | Parameters |
|
1044 | Parameters | |
1039 | ---------- |
|
1045 | ---------- |
@@ -140,6 +140,8 b' class ParallelFunction(RemoteFunction):' | |||||
140 | self.mapObject = mapClass() |
|
140 | self.mapObject = mapClass() | |
141 |
|
141 | |||
142 | def __call__(self, *sequences): |
|
142 | def __call__(self, *sequences): | |
|
143 | client = self.view.client | |||
|
144 | ||||
143 | # check that the length of sequences match |
|
145 | # check that the length of sequences match | |
144 | len_0 = len(sequences[0]) |
|
146 | len_0 = len(sequences[0]) | |
145 | for s in sequences: |
|
147 | for s in sequences: | |
@@ -158,11 +160,12 b' class ParallelFunction(RemoteFunction):' | |||||
158 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) |
|
160 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | |
159 | # multiplexed: |
|
161 | # multiplexed: | |
160 | targets = self.view.targets |
|
162 | targets = self.view.targets | |
|
163 | # 'all' is lazily evaluated at execution time, which is now: | |||
|
164 | if targets == 'all': | |||
|
165 | targets = client._build_targets(targets)[1] | |||
161 | nparts = len(targets) |
|
166 | nparts = len(targets) | |
162 |
|
167 | |||
163 | msg_ids = [] |
|
168 | msg_ids = [] | |
164 | # my_f = lambda *a: map(self.func, *a) |
|
|||
165 | client = self.view.client |
|
|||
166 | for index, t in enumerate(targets): |
|
169 | for index, t in enumerate(targets): | |
167 | args = [] |
|
170 | args = [] | |
168 | for seq in sequences: |
|
171 | for seq in sequences: |
@@ -79,7 +79,7 b' class TestClient(ClusterTestCase):' | |||||
79 | self.assertEquals(v.targets, None) |
|
79 | self.assertEquals(v.targets, None) | |
80 |
|
80 | |||
81 | def test_dview_targets(self): |
|
81 | def test_dview_targets(self): | |
82 |
"""test |
|
82 | """test direct_view targets""" | |
83 | v = self.client.direct_view() |
|
83 | v = self.client.direct_view() | |
84 | self.assertEquals(v.targets, 'all') |
|
84 | self.assertEquals(v.targets, 'all') | |
85 | v = self.client.direct_view('all') |
|
85 | v = self.client.direct_view('all') | |
@@ -87,6 +87,41 b' class TestClient(ClusterTestCase):' | |||||
87 | v = self.client.direct_view(-1) |
|
87 | v = self.client.direct_view(-1) | |
88 | self.assertEquals(v.targets, self.client.ids[-1]) |
|
88 | self.assertEquals(v.targets, self.client.ids[-1]) | |
89 |
|
89 | |||
|
90 | def test_lazy_all_targets(self): | |||
|
91 | """test lazy evaluation of rc.direct_view('all')""" | |||
|
92 | v = self.client.direct_view() | |||
|
93 | self.assertEquals(v.targets, 'all') | |||
|
94 | ||||
|
95 | def double(x): | |||
|
96 | return x*2 | |||
|
97 | seq = range(100) | |||
|
98 | ref = [ double(x) for x in seq ] | |||
|
99 | ||||
|
100 | # add some engines, which should be used | |||
|
101 | self.add_engines(2) | |||
|
102 | n1 = len(self.client.ids) | |||
|
103 | ||||
|
104 | # simple apply | |||
|
105 | r = v.apply_sync(lambda : 1) | |||
|
106 | self.assertEquals(r, [1] * n1) | |||
|
107 | ||||
|
108 | # map goes through remotefunction | |||
|
109 | r = v.map_sync(double, seq) | |||
|
110 | self.assertEquals(r, ref) | |||
|
111 | ||||
|
112 | # add a couple more engines, and try again | |||
|
113 | self.add_engines(2) | |||
|
114 | n2 = len(self.client.ids) | |||
|
115 | self.assertNotEquals(n2, n1) | |||
|
116 | ||||
|
117 | # apply | |||
|
118 | r = v.apply_sync(lambda : 1) | |||
|
119 | self.assertEquals(r, [1] * n2) | |||
|
120 | ||||
|
121 | # map | |||
|
122 | r = v.map_sync(double, seq) | |||
|
123 | self.assertEquals(r, ref) | |||
|
124 | ||||
90 | def test_targets(self): |
|
125 | def test_targets(self): | |
91 | """test various valid targets arguments""" |
|
126 | """test various valid targets arguments""" | |
92 | build = self.client._build_targets |
|
127 | build = self.client._build_targets |
General Comments 0
You need to be logged in to leave comments.
Login now