##// END OF EJS Templates
notice nesting of `sync_results` decorator...
MinRK -
Show More
@@ -21,6 +21,7 b' from __future__ import division'
21 import sys
21 import sys
22 import warnings
22 import warnings
23
23
24 from IPython.external.decorator import decorator
24 from IPython.testing.skipdoctest import skip_doctest
25 from IPython.testing.skipdoctest import skip_doctest
25
26
26 from . import map as Map
27 from . import map as Map
@@ -79,6 +80,24 b' def getname(f):'
79
80
80 return str(f)
81 return str(f)
81
82
83 @decorator
84 def sync_view_results(f, self, *args, **kwargs):
85 """sync relevant results from self.client to our results attribute.
86
87 This is a clone of view.sync_results, but for remote functions
88 """
89 view = self.view
90 if view._in_sync_results:
91 return f(self, *args, **kwargs)
92 print 'in sync results', f
93 view._in_sync_results = True
94 try:
95 ret = f(self, *args, **kwargs)
96 finally:
97 view._in_sync_results = False
98 view._sync_results()
99 return ret
100
82 #--------------------------------------------------------------------------
101 #--------------------------------------------------------------------------
83 # Classes
102 # Classes
84 #--------------------------------------------------------------------------
103 #--------------------------------------------------------------------------
@@ -158,6 +177,7 b' class ParallelFunction(RemoteFunction):'
158 mapClass = Map.dists[dist]
177 mapClass = Map.dists[dist]
159 self.mapObject = mapClass()
178 self.mapObject = mapClass()
160
179
180 @sync_view_results
161 def __call__(self, *sequences):
181 def __call__(self, *sequences):
162 client = self.view.client
182 client = self.view.client
163
183
@@ -56,10 +56,14 b' def save_ids(f, self, *args, **kwargs):'
56 @decorator
56 @decorator
57 def sync_results(f, self, *args, **kwargs):
57 def sync_results(f, self, *args, **kwargs):
58 """sync relevant results from self.client to our results attribute."""
58 """sync relevant results from self.client to our results attribute."""
59 ret = f(self, *args, **kwargs)
59 if self._in_sync_results:
60 delta = self.outstanding.difference(self.client.outstanding)
60 return f(self, *args, **kwargs)
61 completed = self.outstanding.intersection(delta)
61 self._in_sync_results = True
62 self.outstanding = self.outstanding.difference(completed)
62 try:
63 ret = f(self, *args, **kwargs)
64 finally:
65 self._in_sync_results = False
66 self._sync_results()
63 return ret
67 return ret
64
68
65 @decorator
69 @decorator
@@ -115,6 +119,7 b' class View(HasTraits):'
115
119
116 _socket = Instance('zmq.Socket')
120 _socket = Instance('zmq.Socket')
117 _flag_names = List(['targets', 'block', 'track'])
121 _flag_names = List(['targets', 'block', 'track'])
122 _in_sync_results = Bool(False)
118 _targets = Any()
123 _targets = Any()
119 _idents = Any()
124 _idents = Any()
120
125
@@ -198,6 +203,15 b' class View(HasTraits):'
198 # apply
203 # apply
199 #----------------------------------------------------------------
204 #----------------------------------------------------------------
200
205
206 def _sync_results(self):
207 """to be called by @sync_results decorator
208
209 after submitting any tasks.
210 """
211 delta = self.outstanding.difference(self.client.outstanding)
212 completed = self.outstanding.intersection(delta)
213 self.outstanding = self.outstanding.difference(completed)
214
201 @sync_results
215 @sync_results
202 @save_ids
216 @save_ids
203 def _really_apply(self, f, args, kwargs, block=None, **options):
217 def _really_apply(self, f, args, kwargs, block=None, **options):
@@ -323,6 +337,7 b' class View(HasTraits):'
323 # Map
337 # Map
324 #-------------------------------------------------------------------
338 #-------------------------------------------------------------------
325
339
340 @sync_results
326 def map(self, f, *sequences, **kwargs):
341 def map(self, f, *sequences, **kwargs):
327 """override in subclasses"""
342 """override in subclasses"""
328 raise NotImplementedError
343 raise NotImplementedError
@@ -554,7 +569,7 b' class DirectView(View):'
554 return ar
569 return ar
555
570
556
571
557 @spin_after
572 @sync_results
558 def map(self, f, *sequences, **kwargs):
573 def map(self, f, *sequences, **kwargs):
559 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
574 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560
575
@@ -1031,7 +1046,7 b' class LoadBalancedView(View):'
1031 pass
1046 pass
1032 return ar
1047 return ar
1033
1048
1034 @spin_after
1049 @sync_results
1035 @save_ids
1050 @save_ids
1036 def map(self, f, *sequences, **kwargs):
1051 def map(self, f, *sequences, **kwargs):
1037 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
1052 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
General Comments 0
You need to be logged in to leave comments. Login now