##// END OF EJS Templates
view decorators for syncing history/results
MinRK -
Show More
@@ -12,34 +12,68 b' def myblock(f, self, *args, **kwargs):'
12 self.client.block = block
12 self.client.block = block
13 return ret
13 return ret
14
14
15 @decorator
16 def save_ids(f, self, *args, **kwargs):
17 ret = f(self, *args, **kwargs)
18 msg_ids = self.client.history[-self._ntargets:]
19 self.history.extend(msg_ids)
20 map(self.outstanding.add, msg_ids)
21 return ret
22
23 @decorator
24 def sync_results(f, self, *args, **kwargs):
25 ret = f(self, *args, **kwargs)
26 delta = self.outstanding.difference(self.client.outstanding)
27 completed = self.outstanding.intersection(delta)
28 self.outstanding = self.outstanding.difference(completed)
29 for msg_id in completed:
30 self.results[msg_id] = self.client.results[msg_id]
31 return ret
32
33 @decorator
34 def spin_after(f, self, *args, **kwargs):
35 ret = f(self, *args, **kwargs)
36 self.spin()
37 return ret
38
39
15 class View(object):
40 class View(object):
16 """Base View class"""
41 """Base View class"""
17 _targets = None
42 _targets = None
43 _ntargets = None
18 block=None
44 block=None
45 history=None
19
46
20 def __init__(self, client, targets):
47 def __init__(self, client, targets):
21 self.client = client
48 self.client = client
22 self._targets = targets
49 self._targets = targets
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
23 self.block = client.block
51 self.block = client.block
24
52 self.history = []
53 self.outstanding = set()
54 self.results = {}
55
25 def __repr__(self):
56 def __repr__(self):
26 strtargets = str(self._targets)
57 strtargets = str(self._targets)
27 if len(strtargets) > 16:
58 if len(strtargets) > 16:
28 strtargets = strtargets[:12]+'...]'
59 strtargets = strtargets[:12]+'...]'
29 return "<%s %s>"%(self.__class__.__name__, strtargets)
60 return "<%s %s>"%(self.__class__.__name__, strtargets)
30
61
31 @property
32 def results(self):
33 return self.client.results
34
35 @property
62 @property
36 def targets(self):
63 def targets(self):
37 return self._targets
64 return self._targets
38
65
39 @targets.setter
66 @targets.setter
40 def targets(self, value):
67 def targets(self, value):
41 raise TypeError("Cannot set my targets argument after construction!")
68 raise TypeError("Cannot set my targets argument after construction!")
42
69
70 @sync_results
71 def spin(self):
72 """spin the client, and sync"""
73 self.client.spin()
74
75 @sync_results
76 @save_ids
43 def apply(self, f, *args, **kwargs):
77 def apply(self, f, *args, **kwargs):
44 """calls f(*args, **kwargs) on remote engines, returning the result.
78 """calls f(*args, **kwargs) on remote engines, returning the result.
45
79
@@ -51,7 +85,8 b' class View(object):'
51 returns actual result of f(*args, **kwargs)
85 returns actual result of f(*args, **kwargs)
52 """
86 """
53 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
54
88
89 @save_ids
55 def apply_async(self, f, *args, **kwargs):
90 def apply_async(self, f, *args, **kwargs):
56 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
91 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
57
92
@@ -60,7 +95,9 b' class View(object):'
60 returns msg_id
95 returns msg_id
61 """
96 """
62 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
97 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
63
98
99 @spin_after
100 @save_ids
64 def apply_sync(self, f, *args, **kwargs):
101 def apply_sync(self, f, *args, **kwargs):
65 """calls f(*args, **kwargs) on remote engines in a blocking manner,
102 """calls f(*args, **kwargs) on remote engines in a blocking manner,
66 returning the result.
103 returning the result.
@@ -70,7 +107,9 b' class View(object):'
70 returns: actual result of f(*args, **kwargs)
107 returns: actual result of f(*args, **kwargs)
71 """
108 """
72 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
109 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
73
110
111 @sync_results
112 @save_ids
74 def apply_bound(self, f, *args, **kwargs):
113 def apply_bound(self, f, *args, **kwargs):
75 """calls f(*args, **kwargs) bound to engine namespace(s).
114 """calls f(*args, **kwargs) bound to engine namespace(s).
76
115
@@ -83,7 +122,9 b' class View(object):'
83
122
84 """
123 """
85 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
124 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
86
125
126 @sync_results
127 @save_ids
87 def apply_async_bound(self, f, *args, **kwargs):
128 def apply_async_bound(self, f, *args, **kwargs):
88 """calls f(*args, **kwargs) bound to engine namespace(s)
129 """calls f(*args, **kwargs) bound to engine namespace(s)
89 in a nonblocking manner.
130 in a nonblocking manner.
@@ -94,7 +135,9 b' class View(object):'
94
135
95 """
136 """
96 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
137 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
97
138
139 @spin_after
140 @save_ids
98 def apply_sync_bound(self, f, *args, **kwargs):
141 def apply_sync_bound(self, f, *args, **kwargs):
99 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
142 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
100
143
@@ -104,7 +147,7 b' class View(object):'
104
147
105 """
148 """
106 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
149 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
107
150
108
151
109 class DirectView(View):
152 class DirectView(View):
110 """Direct Multiplexer View"""
153 """Direct Multiplexer View"""
General Comments 0
You need to be logged in to leave comments. Login now