Show More
@@ -21,6 +21,7 b' from __future__ import division' | |||||
21 | import sys |
|
21 | import sys | |
22 | import warnings |
|
22 | import warnings | |
23 |
|
23 | |||
|
24 | from IPython.external.decorator import decorator | |||
24 | from IPython.testing.skipdoctest import skip_doctest |
|
25 | from IPython.testing.skipdoctest import skip_doctest | |
25 |
|
26 | |||
26 | from . import map as Map |
|
27 | from . import map as Map | |
@@ -79,6 +80,24 b' def getname(f):' | |||||
79 |
|
80 | |||
80 | return str(f) |
|
81 | return str(f) | |
81 |
|
82 | |||
|
83 | @decorator | |||
|
84 | def sync_view_results(f, self, *args, **kwargs): | |||
|
85 | """sync relevant results from self.client to our results attribute. | |||
|
86 | ||||
|
87 | This is a clone of view.sync_results, but for remote functions | |||
|
88 | """ | |||
|
89 | view = self.view | |||
|
90 | if view._in_sync_results: | |||
|
91 | return f(self, *args, **kwargs) | |||
|
92 | print 'in sync results', f | |||
|
93 | view._in_sync_results = True | |||
|
94 | try: | |||
|
95 | ret = f(self, *args, **kwargs) | |||
|
96 | finally: | |||
|
97 | view._in_sync_results = False | |||
|
98 | view._sync_results() | |||
|
99 | return ret | |||
|
100 | ||||
82 | #-------------------------------------------------------------------------- |
|
101 | #-------------------------------------------------------------------------- | |
83 | # Classes |
|
102 | # Classes | |
84 | #-------------------------------------------------------------------------- |
|
103 | #-------------------------------------------------------------------------- | |
@@ -158,6 +177,7 b' class ParallelFunction(RemoteFunction):' | |||||
158 | mapClass = Map.dists[dist] |
|
177 | mapClass = Map.dists[dist] | |
159 | self.mapObject = mapClass() |
|
178 | self.mapObject = mapClass() | |
160 |
|
179 | |||
|
180 | @sync_view_results | |||
161 | def __call__(self, *sequences): |
|
181 | def __call__(self, *sequences): | |
162 | client = self.view.client |
|
182 | client = self.view.client | |
163 |
|
183 |
@@ -56,10 +56,14 b' def save_ids(f, self, *args, **kwargs):' | |||||
56 | @decorator |
|
56 | @decorator | |
57 | def sync_results(f, self, *args, **kwargs): |
|
57 | def sync_results(f, self, *args, **kwargs): | |
58 | """sync relevant results from self.client to our results attribute.""" |
|
58 | """sync relevant results from self.client to our results attribute.""" | |
59 | ret = f(self, *args, **kwargs) |
|
59 | if self._in_sync_results: | |
60 | delta = self.outstanding.difference(self.client.outstanding) |
|
60 | return f(self, *args, **kwargs) | |
61 | completed = self.outstanding.intersection(delta) |
|
61 | self._in_sync_results = True | |
62 | self.outstanding = self.outstanding.difference(completed) |
|
62 | try: | |
|
63 | ret = f(self, *args, **kwargs) | |||
|
64 | finally: | |||
|
65 | self._in_sync_results = False | |||
|
66 | self._sync_results() | |||
63 | return ret |
|
67 | return ret | |
64 |
|
68 | |||
65 | @decorator |
|
69 | @decorator | |
@@ -115,6 +119,7 b' class View(HasTraits):' | |||||
115 |
|
119 | |||
116 | _socket = Instance('zmq.Socket') |
|
120 | _socket = Instance('zmq.Socket') | |
117 | _flag_names = List(['targets', 'block', 'track']) |
|
121 | _flag_names = List(['targets', 'block', 'track']) | |
|
122 | _in_sync_results = Bool(False) | |||
118 | _targets = Any() |
|
123 | _targets = Any() | |
119 | _idents = Any() |
|
124 | _idents = Any() | |
120 |
|
125 | |||
@@ -198,6 +203,15 b' class View(HasTraits):' | |||||
198 | # apply |
|
203 | # apply | |
199 | #---------------------------------------------------------------- |
|
204 | #---------------------------------------------------------------- | |
200 |
|
205 | |||
|
206 | def _sync_results(self): | |||
|
207 | """to be called by @sync_results decorator | |||
|
208 | ||||
|
209 | after submitting any tasks. | |||
|
210 | """ | |||
|
211 | delta = self.outstanding.difference(self.client.outstanding) | |||
|
212 | completed = self.outstanding.intersection(delta) | |||
|
213 | self.outstanding = self.outstanding.difference(completed) | |||
|
214 | ||||
201 | @sync_results |
|
215 | @sync_results | |
202 | @save_ids |
|
216 | @save_ids | |
203 | def _really_apply(self, f, args, kwargs, block=None, **options): |
|
217 | def _really_apply(self, f, args, kwargs, block=None, **options): | |
@@ -323,6 +337,7 b' class View(HasTraits):' | |||||
323 | # Map |
|
337 | # Map | |
324 | #------------------------------------------------------------------- |
|
338 | #------------------------------------------------------------------- | |
325 |
|
339 | |||
|
340 | @sync_results | |||
326 | def map(self, f, *sequences, **kwargs): |
|
341 | def map(self, f, *sequences, **kwargs): | |
327 | """override in subclasses""" |
|
342 | """override in subclasses""" | |
328 | raise NotImplementedError |
|
343 | raise NotImplementedError | |
@@ -554,7 +569,7 b' class DirectView(View):' | |||||
554 | return ar |
|
569 | return ar | |
555 |
|
570 | |||
556 |
|
571 | |||
557 | @spin_after |
|
572 | @sync_results | |
558 | def map(self, f, *sequences, **kwargs): |
|
573 | def map(self, f, *sequences, **kwargs): | |
559 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult |
|
574 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult | |
560 |
|
575 | |||
@@ -1031,7 +1046,7 b' class LoadBalancedView(View):' | |||||
1031 | pass |
|
1046 | pass | |
1032 | return ar |
|
1047 | return ar | |
1033 |
|
1048 | |||
1034 | @spin_after |
|
1049 | @sync_results | |
1035 | @save_ids |
|
1050 | @save_ids | |
1036 | def map(self, f, *sequences, **kwargs): |
|
1051 | def map(self, f, *sequences, **kwargs): | |
1037 | """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult |
|
1052 | """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult |
General Comments 0
You need to be logged in to leave comments.
Login now