##// END OF EJS Templates
whitespace
MinRK -
Show More
@@ -1,200 +1,200 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Views"""
2 """Views"""
3
3
4 from IPython.external.decorator import decorator
4 from IPython.external.decorator import decorator
5
5
6
6
7 @decorator
7 @decorator
8 def myblock(f, self, *args, **kwargs):
8 def myblock(f, self, *args, **kwargs):
9 block = self.client.block
9 block = self.client.block
10 self.client.block = self.block
10 self.client.block = self.block
11 ret = f(self, *args, **kwargs)
11 ret = f(self, *args, **kwargs)
12 self.client.block = block
12 self.client.block = block
13 return ret
13 return ret
14
14
15 @decorator
15 @decorator
16 def save_ids(f, self, *args, **kwargs):
16 def save_ids(f, self, *args, **kwargs):
17 ret = f(self, *args, **kwargs)
17 ret = f(self, *args, **kwargs)
18 msg_ids = self.client.history[-self._ntargets:]
18 msg_ids = self.client.history[-self._ntargets:]
19 self.history.extend(msg_ids)
19 self.history.extend(msg_ids)
20 map(self.outstanding.add, msg_ids)
20 map(self.outstanding.add, msg_ids)
21 return ret
21 return ret
22
22
23 @decorator
23 @decorator
24 def sync_results(f, self, *args, **kwargs):
24 def sync_results(f, self, *args, **kwargs):
25 ret = f(self, *args, **kwargs)
25 ret = f(self, *args, **kwargs)
26 delta = self.outstanding.difference(self.client.outstanding)
26 delta = self.outstanding.difference(self.client.outstanding)
27 completed = self.outstanding.intersection(delta)
27 completed = self.outstanding.intersection(delta)
28 self.outstanding = self.outstanding.difference(completed)
28 self.outstanding = self.outstanding.difference(completed)
29 for msg_id in completed:
29 for msg_id in completed:
30 self.results[msg_id] = self.client.results[msg_id]
30 self.results[msg_id] = self.client.results[msg_id]
31 return ret
31 return ret
32
32
33 @decorator
33 @decorator
34 def spin_after(f, self, *args, **kwargs):
34 def spin_after(f, self, *args, **kwargs):
35 ret = f(self, *args, **kwargs)
35 ret = f(self, *args, **kwargs)
36 self.spin()
36 self.spin()
37 return ret
37 return ret
38
38
39
39
40 class View(object):
40 class View(object):
41 """Base View class"""
41 """Base View class"""
42 _targets = None
42 _targets = None
43 _ntargets = None
43 _ntargets = None
44 block=None
44 block=None
45 history=None
45 history=None
46
46
47 def __init__(self, client, targets):
47 def __init__(self, client, targets):
48 self.client = client
48 self.client = client
49 self._targets = targets
49 self._targets = targets
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
50 self._ntargets = 1 if isinstance(targets, int) else len(targets)
51 self.block = client.block
51 self.block = client.block
52 self.history = []
52 self.history = []
53 self.outstanding = set()
53 self.outstanding = set()
54 self.results = {}
54 self.results = {}
55
55
56 def __repr__(self):
56 def __repr__(self):
57 strtargets = str(self._targets)
57 strtargets = str(self._targets)
58 if len(strtargets) > 16:
58 if len(strtargets) > 16:
59 strtargets = strtargets[:12]+'...]'
59 strtargets = strtargets[:12]+'...]'
60 return "<%s %s>"%(self.__class__.__name__, strtargets)
60 return "<%s %s>"%(self.__class__.__name__, strtargets)
61
61
62 @property
62 @property
63 def targets(self):
63 def targets(self):
64 return self._targets
64 return self._targets
65
65
66 @targets.setter
66 @targets.setter
67 def targets(self, value):
67 def targets(self, value):
68 raise TypeError("Cannot set my targets argument after construction!")
68 raise TypeError("Cannot set my targets argument after construction!")
69
69
70 @sync_results
70 @sync_results
71 def spin(self):
71 def spin(self):
72 """spin the client, and sync"""
72 """spin the client, and sync"""
73 self.client.spin()
73 self.client.spin()
74
74
75 @sync_results
75 @sync_results
76 @save_ids
76 @save_ids
77 def apply(self, f, *args, **kwargs):
77 def apply(self, f, *args, **kwargs):
78 """calls f(*args, **kwargs) on remote engines, returning the result.
78 """calls f(*args, **kwargs) on remote engines, returning the result.
79
79
80 This method does not involve the engine's namespace.
80 This method does not involve the engine's namespace.
81
81
82 if self.block is False:
82 if self.block is False:
83 returns msg_id
83 returns msg_id
84 else:
84 else:
85 returns actual result of f(*args, **kwargs)
85 returns actual result of f(*args, **kwargs)
86 """
86 """
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
87 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
88
88
89 @save_ids
89 @save_ids
90 def apply_async(self, f, *args, **kwargs):
90 def apply_async(self, f, *args, **kwargs):
91 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
91 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
92
92
93 This method does not involve the engine's namespace.
93 This method does not involve the engine's namespace.
94
94
95 returns msg_id
95 returns msg_id
96 """
96 """
97 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
97 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
98
98
99 @spin_after
99 @spin_after
100 @save_ids
100 @save_ids
101 def apply_sync(self, f, *args, **kwargs):
101 def apply_sync(self, f, *args, **kwargs):
102 """calls f(*args, **kwargs) on remote engines in a blocking manner,
102 """calls f(*args, **kwargs) on remote engines in a blocking manner,
103 returning the result.
103 returning the result.
104
104
105 This method does not involve the engine's namespace.
105 This method does not involve the engine's namespace.
106
106
107 returns: actual result of f(*args, **kwargs)
107 returns: actual result of f(*args, **kwargs)
108 """
108 """
109 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
109 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
110
110
111 @sync_results
111 @sync_results
112 @save_ids
112 @save_ids
113 def apply_bound(self, f, *args, **kwargs):
113 def apply_bound(self, f, *args, **kwargs):
114 """calls f(*args, **kwargs) bound to engine namespace(s).
114 """calls f(*args, **kwargs) bound to engine namespace(s).
115
115
116 if self.block is False:
116 if self.block is False:
117 returns msg_id
117 returns msg_id
118 else:
118 else:
119 returns actual result of f(*args, **kwargs)
119 returns actual result of f(*args, **kwargs)
120
120
121 This method has access to the targets' globals
121 This method has access to the targets' globals
122
122
123 """
123 """
124 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
124 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
125
125
126 @sync_results
126 @sync_results
127 @save_ids
127 @save_ids
128 def apply_async_bound(self, f, *args, **kwargs):
128 def apply_async_bound(self, f, *args, **kwargs):
129 """calls f(*args, **kwargs) bound to engine namespace(s)
129 """calls f(*args, **kwargs) bound to engine namespace(s)
130 in a nonblocking manner.
130 in a nonblocking manner.
131
131
132 returns: msg_id
132 returns: msg_id
133
133
134 This method has access to the targets' globals
134 This method has access to the targets' globals
135
135
136 """
136 """
137 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
137 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
138
138
139 @spin_after
139 @spin_after
140 @save_ids
140 @save_ids
141 def apply_sync_bound(self, f, *args, **kwargs):
141 def apply_sync_bound(self, f, *args, **kwargs):
142 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
142 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
143
143
144 returns: actual result of f(*args, **kwargs)
144 returns: actual result of f(*args, **kwargs)
145
145
146 This method has access to the targets' globals
146 This method has access to the targets' globals
147
147
148 """
148 """
149 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
149 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
150
150
151
151
152 class DirectView(View):
152 class DirectView(View):
153 """Direct Multiplexer View"""
153 """Direct Multiplexer View"""
154
154
155 def update(self, ns):
155 def update(self, ns):
156 """update remote namespace with dict `ns`"""
156 """update remote namespace with dict `ns`"""
157 return self.client.push(ns, targets=self.targets, block=self.block)
157 return self.client.push(ns, targets=self.targets, block=self.block)
158
158
159 def get(self, key_s):
159 def get(self, key_s):
160 """get object(s) by `key_s` from remote namespace
160 """get object(s) by `key_s` from remote namespace
161 will return one object if it is a key.
161 will return one object if it is a key.
162 It also takes a list of keys, and will return a list of objects."""
162 It also takes a list of keys, and will return a list of objects."""
163 # block = block if block is not None else self.block
163 # block = block if block is not None else self.block
164 return self.client.pull(key_s, block=self.block, targets=self.targets)
164 return self.client.pull(key_s, block=self.block, targets=self.targets)
165
165
166 push = update
166 push = update
167 pull = get
167 pull = get
168
168
169 def __getitem__(self, key):
169 def __getitem__(self, key):
170 return self.get(key)
170 return self.get(key)
171
171
172 def __setitem__(self,key,value):
172 def __setitem__(self,key,value):
173 self.update({key:value})
173 self.update({key:value})
174
174
175 def clear(self, block=False):
175 def clear(self, block=False):
176 """Clear the remote namespaces on my engines."""
176 """Clear the remote namespaces on my engines."""
177 block = block if block is not None else self.block
177 block = block if block is not None else self.block
178 return self.client.clear(targets=self.targets,block=block)
178 return self.client.clear(targets=self.targets,block=block)
179
179
180 def kill(self, block=True):
180 def kill(self, block=True):
181 """Kill my engines."""
181 """Kill my engines."""
182 block = block if block is not None else self.block
182 block = block if block is not None else self.block
183 return self.client.kill(targets=self.targets,block=block)
183 return self.client.kill(targets=self.targets,block=block)
184
184
185 def abort(self, msg_ids=None, block=None):
185 def abort(self, msg_ids=None, block=None):
186 """Abort jobs on my engines.
186 """Abort jobs on my engines.
187
187
188 Parameters
188 Parameters
189 ----------
189 ----------
190
190
191 msg_ids : None, str, list of strs, optional
191 msg_ids : None, str, list of strs, optional
192 if None: abort all jobs.
192 if None: abort all jobs.
193 else: abort specific msg_id(s).
193 else: abort specific msg_id(s).
194 """
194 """
195 block = block if block is not None else self.block
195 block = block if block is not None else self.block
196 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
196 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
197
197
198 class LoadBalancedView(View):
198 class LoadBalancedView(View):
199 _targets=None
199 _targets=None
200 No newline at end of file
200
General Comments 0
You need to be logged in to leave comments. Login now