Show More
@@ -1032,8 +1032,14 b' class Client(HasTraits):' | |||
|
1032 | 1032 | def direct_view(self, targets='all'): |
|
1033 | 1033 | """construct a DirectView object. |
|
1034 | 1034 | |
|
1035 | If no targets are specified, create a DirectView | |
|
1036 | using all engines. | |
|
1035 | If no targets are specified, create a DirectView using all engines. | |
|
1036 | ||
|
1037 | rc.direct_view('all') is distinguished from rc[:] in that 'all' will | |
|
1038 | evaluate the target engines at each execution, whereas rc[:] will connect to | |
|
1039 | all *current* engines, and that list will not change. | |
|
1040 | ||
|
1041 | That is, 'all' will always use all engines, whereas rc[:] will not use | |
|
1042 | engines added after the DirectView is constructed. | |
|
1037 | 1043 | |
|
1038 | 1044 | Parameters |
|
1039 | 1045 | ---------- |
@@ -140,6 +140,8 b' class ParallelFunction(RemoteFunction):' | |||
|
140 | 140 | self.mapObject = mapClass() |
|
141 | 141 | |
|
142 | 142 | def __call__(self, *sequences): |
|
143 | client = self.view.client | |
|
144 | ||
|
143 | 145 | # check that the length of sequences match |
|
144 | 146 | len_0 = len(sequences[0]) |
|
145 | 147 | for s in sequences: |
@@ -158,11 +160,12 b' class ParallelFunction(RemoteFunction):' | |||
|
158 | 160 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) |
|
159 | 161 | # multiplexed: |
|
160 | 162 | targets = self.view.targets |
|
163 | # 'all' is lazily evaluated at execution time, which is now: | |
|
164 | if targets == 'all': | |
|
165 | targets = client._build_targets(targets)[1] | |
|
161 | 166 | nparts = len(targets) |
|
162 | 167 | |
|
163 | 168 | msg_ids = [] |
|
164 | # my_f = lambda *a: map(self.func, *a) | |
|
165 | client = self.view.client | |
|
166 | 169 | for index, t in enumerate(targets): |
|
167 | 170 | args = [] |
|
168 | 171 | for seq in sequences: |
@@ -79,7 +79,7 b' class TestClient(ClusterTestCase):' | |||
|
79 | 79 | self.assertEquals(v.targets, None) |
|
80 | 80 | |
|
81 | 81 | def test_dview_targets(self): |
|
82 |
"""test |
|
|
82 | """test direct_view targets""" | |
|
83 | 83 | v = self.client.direct_view() |
|
84 | 84 | self.assertEquals(v.targets, 'all') |
|
85 | 85 | v = self.client.direct_view('all') |
@@ -87,6 +87,41 b' class TestClient(ClusterTestCase):' | |||
|
87 | 87 | v = self.client.direct_view(-1) |
|
88 | 88 | self.assertEquals(v.targets, self.client.ids[-1]) |
|
89 | 89 | |
|
90 | def test_lazy_all_targets(self): | |
|
91 | """test lazy evaluation of rc.direct_view('all')""" | |
|
92 | v = self.client.direct_view() | |
|
93 | self.assertEquals(v.targets, 'all') | |
|
94 | ||
|
95 | def double(x): | |
|
96 | return x*2 | |
|
97 | seq = range(100) | |
|
98 | ref = [ double(x) for x in seq ] | |
|
99 | ||
|
100 | # add some engines, which should be used | |
|
101 | self.add_engines(2) | |
|
102 | n1 = len(self.client.ids) | |
|
103 | ||
|
104 | # simple apply | |
|
105 | r = v.apply_sync(lambda : 1) | |
|
106 | self.assertEquals(r, [1] * n1) | |
|
107 | ||
|
108 | # map goes through remotefunction | |
|
109 | r = v.map_sync(double, seq) | |
|
110 | self.assertEquals(r, ref) | |
|
111 | ||
|
112 | # add a couple more engines, and try again | |
|
113 | self.add_engines(2) | |
|
114 | n2 = len(self.client.ids) | |
|
115 | self.assertNotEquals(n2, n1) | |
|
116 | ||
|
117 | # apply | |
|
118 | r = v.apply_sync(lambda : 1) | |
|
119 | self.assertEquals(r, [1] * n2) | |
|
120 | ||
|
121 | # map | |
|
122 | r = v.map_sync(double, seq) | |
|
123 | self.assertEquals(r, ref) | |
|
124 | ||
|
90 | 125 | def test_targets(self): |
|
91 | 126 | """test various valid targets arguments""" |
|
92 | 127 | build = self.client._build_targets |
General Comments 0
You need to be logged in to leave comments.
Login now