Show More
@@ -1,200 +1,200 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """Views""" |
|
2 | """Views""" | |
3 |
|
3 | |||
4 | from IPython.external.decorator import decorator |
|
4 | from IPython.external.decorator import decorator | |
5 |
|
5 | |||
6 |
|
6 | |||
7 | @decorator |
|
7 | @decorator | |
8 | def myblock(f, self, *args, **kwargs): |
|
8 | def myblock(f, self, *args, **kwargs): | |
9 | block = self.client.block |
|
9 | block = self.client.block | |
10 | self.client.block = self.block |
|
10 | self.client.block = self.block | |
11 | ret = f(self, *args, **kwargs) |
|
11 | ret = f(self, *args, **kwargs) | |
12 | self.client.block = block |
|
12 | self.client.block = block | |
13 | return ret |
|
13 | return ret | |
14 |
|
14 | |||
15 | @decorator |
|
15 | @decorator | |
16 | def save_ids(f, self, *args, **kwargs): |
|
16 | def save_ids(f, self, *args, **kwargs): | |
17 | ret = f(self, *args, **kwargs) |
|
17 | ret = f(self, *args, **kwargs) | |
18 | msg_ids = self.client.history[-self._ntargets:] |
|
18 | msg_ids = self.client.history[-self._ntargets:] | |
19 | self.history.extend(msg_ids) |
|
19 | self.history.extend(msg_ids) | |
20 | map(self.outstanding.add, msg_ids) |
|
20 | map(self.outstanding.add, msg_ids) | |
21 | return ret |
|
21 | return ret | |
22 |
|
22 | |||
23 | @decorator |
|
23 | @decorator | |
24 | def sync_results(f, self, *args, **kwargs): |
|
24 | def sync_results(f, self, *args, **kwargs): | |
25 | ret = f(self, *args, **kwargs) |
|
25 | ret = f(self, *args, **kwargs) | |
26 | delta = self.outstanding.difference(self.client.outstanding) |
|
26 | delta = self.outstanding.difference(self.client.outstanding) | |
27 | completed = self.outstanding.intersection(delta) |
|
27 | completed = self.outstanding.intersection(delta) | |
28 | self.outstanding = self.outstanding.difference(completed) |
|
28 | self.outstanding = self.outstanding.difference(completed) | |
29 | for msg_id in completed: |
|
29 | for msg_id in completed: | |
30 | self.results[msg_id] = self.client.results[msg_id] |
|
30 | self.results[msg_id] = self.client.results[msg_id] | |
31 | return ret |
|
31 | return ret | |
32 |
|
32 | |||
33 | @decorator |
|
33 | @decorator | |
34 | def spin_after(f, self, *args, **kwargs): |
|
34 | def spin_after(f, self, *args, **kwargs): | |
35 | ret = f(self, *args, **kwargs) |
|
35 | ret = f(self, *args, **kwargs) | |
36 | self.spin() |
|
36 | self.spin() | |
37 | return ret |
|
37 | return ret | |
38 |
|
38 | |||
39 |
|
39 | |||
40 | class View(object): |
|
40 | class View(object): | |
41 | """Base View class""" |
|
41 | """Base View class""" | |
42 | _targets = None |
|
42 | _targets = None | |
43 | _ntargets = None |
|
43 | _ntargets = None | |
44 | block=None |
|
44 | block=None | |
45 | history=None |
|
45 | history=None | |
46 |
|
46 | |||
47 | def __init__(self, client, targets): |
|
47 | def __init__(self, client, targets): | |
48 | self.client = client |
|
48 | self.client = client | |
49 | self._targets = targets |
|
49 | self._targets = targets | |
50 | self._ntargets = 1 if isinstance(targets, int) else len(targets) |
|
50 | self._ntargets = 1 if isinstance(targets, int) else len(targets) | |
51 | self.block = client.block |
|
51 | self.block = client.block | |
52 | self.history = [] |
|
52 | self.history = [] | |
53 | self.outstanding = set() |
|
53 | self.outstanding = set() | |
54 | self.results = {} |
|
54 | self.results = {} | |
55 |
|
55 | |||
56 | def __repr__(self): |
|
56 | def __repr__(self): | |
57 | strtargets = str(self._targets) |
|
57 | strtargets = str(self._targets) | |
58 | if len(strtargets) > 16: |
|
58 | if len(strtargets) > 16: | |
59 | strtargets = strtargets[:12]+'...]' |
|
59 | strtargets = strtargets[:12]+'...]' | |
60 | return "<%s %s>"%(self.__class__.__name__, strtargets) |
|
60 | return "<%s %s>"%(self.__class__.__name__, strtargets) | |
61 |
|
61 | |||
62 | @property |
|
62 | @property | |
63 | def targets(self): |
|
63 | def targets(self): | |
64 | return self._targets |
|
64 | return self._targets | |
65 |
|
65 | |||
66 | @targets.setter |
|
66 | @targets.setter | |
67 | def targets(self, value): |
|
67 | def targets(self, value): | |
68 | raise TypeError("Cannot set my targets argument after construction!") |
|
68 | raise TypeError("Cannot set my targets argument after construction!") | |
69 |
|
69 | |||
70 | @sync_results |
|
70 | @sync_results | |
71 | def spin(self): |
|
71 | def spin(self): | |
72 | """spin the client, and sync""" |
|
72 | """spin the client, and sync""" | |
73 | self.client.spin() |
|
73 | self.client.spin() | |
74 |
|
74 | |||
75 | @sync_results |
|
75 | @sync_results | |
76 | @save_ids |
|
76 | @save_ids | |
77 | def apply(self, f, *args, **kwargs): |
|
77 | def apply(self, f, *args, **kwargs): | |
78 | """calls f(*args, **kwargs) on remote engines, returning the result. |
|
78 | """calls f(*args, **kwargs) on remote engines, returning the result. | |
79 |
|
79 | |||
80 | This method does not involve the engine's namespace. |
|
80 | This method does not involve the engine's namespace. | |
81 |
|
81 | |||
82 | if self.block is False: |
|
82 | if self.block is False: | |
83 | returns msg_id |
|
83 | returns msg_id | |
84 | else: |
|
84 | else: | |
85 | returns actual result of f(*args, **kwargs) |
|
85 | returns actual result of f(*args, **kwargs) | |
86 | """ |
|
86 | """ | |
87 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) |
|
87 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) | |
88 |
|
88 | |||
89 | @save_ids |
|
89 | @save_ids | |
90 | def apply_async(self, f, *args, **kwargs): |
|
90 | def apply_async(self, f, *args, **kwargs): | |
91 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. |
|
91 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |
92 |
|
92 | |||
93 | This method does not involve the engine's namespace. |
|
93 | This method does not involve the engine's namespace. | |
94 |
|
94 | |||
95 | returns msg_id |
|
95 | returns msg_id | |
96 | """ |
|
96 | """ | |
97 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) |
|
97 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) | |
98 |
|
98 | |||
99 | @spin_after |
|
99 | @spin_after | |
100 | @save_ids |
|
100 | @save_ids | |
101 | def apply_sync(self, f, *args, **kwargs): |
|
101 | def apply_sync(self, f, *args, **kwargs): | |
102 | """calls f(*args, **kwargs) on remote engines in a blocking manner, |
|
102 | """calls f(*args, **kwargs) on remote engines in a blocking manner, | |
103 | returning the result. |
|
103 | returning the result. | |
104 |
|
104 | |||
105 | This method does not involve the engine's namespace. |
|
105 | This method does not involve the engine's namespace. | |
106 |
|
106 | |||
107 | returns: actual result of f(*args, **kwargs) |
|
107 | returns: actual result of f(*args, **kwargs) | |
108 | """ |
|
108 | """ | |
109 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) |
|
109 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) | |
110 |
|
110 | |||
111 | @sync_results |
|
111 | @sync_results | |
112 | @save_ids |
|
112 | @save_ids | |
113 | def apply_bound(self, f, *args, **kwargs): |
|
113 | def apply_bound(self, f, *args, **kwargs): | |
114 | """calls f(*args, **kwargs) bound to engine namespace(s). |
|
114 | """calls f(*args, **kwargs) bound to engine namespace(s). | |
115 |
|
115 | |||
116 | if self.block is False: |
|
116 | if self.block is False: | |
117 | returns msg_id |
|
117 | returns msg_id | |
118 | else: |
|
118 | else: | |
119 | returns actual result of f(*args, **kwargs) |
|
119 | returns actual result of f(*args, **kwargs) | |
120 |
|
120 | |||
121 | This method has access to the targets' globals |
|
121 | This method has access to the targets' globals | |
122 |
|
122 | |||
123 | """ |
|
123 | """ | |
124 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) |
|
124 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) | |
125 |
|
125 | |||
126 | @sync_results |
|
126 | @sync_results | |
127 | @save_ids |
|
127 | @save_ids | |
128 | def apply_async_bound(self, f, *args, **kwargs): |
|
128 | def apply_async_bound(self, f, *args, **kwargs): | |
129 | """calls f(*args, **kwargs) bound to engine namespace(s) |
|
129 | """calls f(*args, **kwargs) bound to engine namespace(s) | |
130 | in a nonblocking manner. |
|
130 | in a nonblocking manner. | |
131 |
|
131 | |||
132 | returns: msg_id |
|
132 | returns: msg_id | |
133 |
|
133 | |||
134 | This method has access to the targets' globals |
|
134 | This method has access to the targets' globals | |
135 |
|
135 | |||
136 | """ |
|
136 | """ | |
137 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) |
|
137 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |
138 |
|
138 | |||
139 |
|
|
139 | @spin_after | |
140 | @save_ids |
|
140 | @save_ids | |
141 | def apply_sync_bound(self, f, *args, **kwargs): |
|
141 | def apply_sync_bound(self, f, *args, **kwargs): | |
142 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. |
|
142 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |
143 |
|
143 | |||
144 | returns: actual result of f(*args, **kwargs) |
|
144 | returns: actual result of f(*args, **kwargs) | |
145 |
|
145 | |||
146 | This method has access to the targets' globals |
|
146 | This method has access to the targets' globals | |
147 |
|
147 | |||
148 | """ |
|
148 | """ | |
149 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) |
|
149 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
150 |
|
150 | |||
151 |
|
151 | |||
152 | class DirectView(View): |
|
152 | class DirectView(View): | |
153 | """Direct Multiplexer View""" |
|
153 | """Direct Multiplexer View""" | |
154 |
|
154 | |||
155 | def update(self, ns): |
|
155 | def update(self, ns): | |
156 | """update remote namespace with dict `ns`""" |
|
156 | """update remote namespace with dict `ns`""" | |
157 | return self.client.push(ns, targets=self.targets, block=self.block) |
|
157 | return self.client.push(ns, targets=self.targets, block=self.block) | |
158 |
|
158 | |||
159 | def get(self, key_s): |
|
159 | def get(self, key_s): | |
160 | """get object(s) by `key_s` from remote namespace |
|
160 | """get object(s) by `key_s` from remote namespace | |
161 | will return one object if it is a key. |
|
161 | will return one object if it is a key. | |
162 | It also takes a list of keys, and will return a list of objects.""" |
|
162 | It also takes a list of keys, and will return a list of objects.""" | |
163 | # block = block if block is not None else self.block |
|
163 | # block = block if block is not None else self.block | |
164 | return self.client.pull(key_s, block=self.block, targets=self.targets) |
|
164 | return self.client.pull(key_s, block=self.block, targets=self.targets) | |
165 |
|
165 | |||
166 | push = update |
|
166 | push = update | |
167 | pull = get |
|
167 | pull = get | |
168 |
|
168 | |||
169 | def __getitem__(self, key): |
|
169 | def __getitem__(self, key): | |
170 | return self.get(key) |
|
170 | return self.get(key) | |
171 |
|
171 | |||
172 | def __setitem__(self,key,value): |
|
172 | def __setitem__(self,key,value): | |
173 | self.update({key:value}) |
|
173 | self.update({key:value}) | |
174 |
|
174 | |||
175 | def clear(self, block=False): |
|
175 | def clear(self, block=False): | |
176 | """Clear the remote namespaces on my engines.""" |
|
176 | """Clear the remote namespaces on my engines.""" | |
177 | block = block if block is not None else self.block |
|
177 | block = block if block is not None else self.block | |
178 | return self.client.clear(targets=self.targets,block=block) |
|
178 | return self.client.clear(targets=self.targets,block=block) | |
179 |
|
179 | |||
180 | def kill(self, block=True): |
|
180 | def kill(self, block=True): | |
181 | """Kill my engines.""" |
|
181 | """Kill my engines.""" | |
182 | block = block if block is not None else self.block |
|
182 | block = block if block is not None else self.block | |
183 | return self.client.kill(targets=self.targets,block=block) |
|
183 | return self.client.kill(targets=self.targets,block=block) | |
184 |
|
184 | |||
185 | def abort(self, msg_ids=None, block=None): |
|
185 | def abort(self, msg_ids=None, block=None): | |
186 | """Abort jobs on my engines. |
|
186 | """Abort jobs on my engines. | |
187 |
|
187 | |||
188 | Parameters |
|
188 | Parameters | |
189 | ---------- |
|
189 | ---------- | |
190 |
|
190 | |||
191 | msg_ids : None, str, list of strs, optional |
|
191 | msg_ids : None, str, list of strs, optional | |
192 | if None: abort all jobs. |
|
192 | if None: abort all jobs. | |
193 | else: abort specific msg_id(s). |
|
193 | else: abort specific msg_id(s). | |
194 | """ |
|
194 | """ | |
195 | block = block if block is not None else self.block |
|
195 | block = block if block is not None else self.block | |
196 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) |
|
196 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |
197 |
|
197 | |||
198 | class LoadBalancedView(View): |
|
198 | class LoadBalancedView(View): | |
199 | _targets=None |
|
199 | _targets=None | |
200 | No newline at end of file |
|
200 |
General Comments 0
You need to be logged in to leave comments.
Login now