Show More
@@ -21,8 +21,8 b' from zmq.eventloop import ioloop, zmqstream' | |||
|
21 | 21 | from IPython.external.decorator import decorator |
|
22 | 22 | |
|
23 | 23 | import streamsession as ss |
|
24 | from remotenamespace import RemoteNamespace | |
|
25 | from view import DirectView | |
|
24 | # from remotenamespace import RemoteNamespace | |
|
25 | from view import DirectView, LoadBalancedView | |
|
26 | 26 | from dependency import Dependency, depend, require |
|
27 | 27 | |
|
28 | 28 | def _push(ns): |
@@ -31,8 +31,13 b' def _push(ns):' | |||
|
31 | 31 | def _pull(keys): |
|
32 | 32 | g = globals() |
|
33 | 33 | if isinstance(keys, (list,tuple, set)): |
|
34 | for key in keys: | |
|
35 | if not g.has_key(key): | |
|
36 | raise NameError("name '%s' is not defined"%key) | |
|
34 | 37 | return map(g.get, keys) |
|
35 | 38 | else: |
|
39 | if not g.has_key(keys): | |
|
40 | raise NameError("name '%s' is not defined"%keys) | |
|
36 | 41 | return g.get(keys) |
|
37 | 42 | |
|
38 | 43 | def _clear(): |
@@ -62,10 +67,35 b' def defaultblock(f, self, *args, **kwargs):' | |||
|
62 | 67 | self.block = saveblock |
|
63 | 68 | return ret |
|
64 | 69 | |
|
70 | def remote(client, block=None, targets=None): | |
|
71 | """Turn a function into a remote function. | |
|
72 | ||
|
73 | This method can be used for map: | |
|
74 | ||
|
75 | >>> @remote(client,block=True) | |
|
76 | def func(a) | |
|
77 | """ | |
|
78 | def remote_function(f): | |
|
79 | return RemoteFunction(client, f, block, targets) | |
|
80 | return remote_function | |
|
81 | ||
|
65 | 82 | #-------------------------------------------------------------------------- |
|
66 | 83 | # Classes |
|
67 | 84 | #-------------------------------------------------------------------------- |
|
68 | 85 | |
|
86 | class RemoteFunction(object): | |
|
87 | """Turn an existing function into a remote function""" | |
|
88 | ||
|
89 | def __init__(self, client, f, block=None, targets=None): | |
|
90 | self.client = client | |
|
91 | self.func = f | |
|
92 | self.block=block | |
|
93 | self.targets=targets | |
|
94 | ||
|
95 | def __call__(self, *args, **kwargs): | |
|
96 | return self.client.apply(self.func, args=args, kwargs=kwargs, | |
|
97 | block=self.block, targets=self.targets) | |
|
98 | ||
|
69 | 99 | |
|
70 | 100 | class AbortedTask(object): |
|
71 | 101 | """A basic wrapper object describing an aborted task.""" |
@@ -84,7 +114,7 b' class Client(object):' | |||
|
84 | 114 | Parameters |
|
85 | 115 | ---------- |
|
86 | 116 | |
|
87 | addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101 | |
|
117 | addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101' | |
|
88 | 118 | The address of the controller's registration socket. |
|
89 | 119 | |
|
90 | 120 | |
@@ -281,7 +311,8 b' class Client(object):' | |||
|
281 | 311 | elif content['status'] == 'aborted': |
|
282 | 312 | self.results[msg_id] = AbortedTask(msg_id) |
|
283 | 313 | elif content['status'] == 'resubmitted': |
|
284 |
|
|
|
314 | # TODO: handle resubmission | |
|
315 | pass | |
|
285 | 316 | else: |
|
286 | 317 | self.results[msg_id] = ss.unwrap_exception(content) |
|
287 | 318 | |
@@ -318,7 +349,9 b' class Client(object):' | |||
|
318 | 349 | |
|
319 | 350 | def _flush_control(self, sock): |
|
320 | 351 | """Flush replies from the control channel waiting |
|
321 |
in the ZMQ queue. |
|
|
352 | in the ZMQ queue. | |
|
353 | ||
|
354 | Currently: ignore them.""" | |
|
322 | 355 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
323 | 356 | while msg is not None: |
|
324 | 357 | if self.debug: |
@@ -330,7 +363,10 b' class Client(object):' | |||
|
330 | 363 | #-------------------------------------------------------------------------- |
|
331 | 364 | |
|
332 | 365 | def __getitem__(self, key): |
|
333 |
"""Dict access returns DirectView multiplexer objects |
|
|
366 | """Dict access returns DirectView multiplexer objects or, | |
|
367 | if key is None, a LoadBalancedView.""" | |
|
368 | if key is None: | |
|
369 | return LoadBalancedView(self) | |
|
334 | 370 | if isinstance(key, int): |
|
335 | 371 | if key not in self.ids: |
|
336 | 372 | raise IndexError("No such engine: %i"%key) |
@@ -412,7 +448,7 b' class Client(object):' | |||
|
412 | 448 | """Clear the namespace in target(s).""" |
|
413 | 449 | targets = self._build_targets(targets)[0] |
|
414 | 450 | for t in targets: |
|
415 | self.session.send(self._control_socket, 'clear_request', content={},ident=t) | |
|
451 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | |
|
416 | 452 | error = False |
|
417 | 453 | if self.block: |
|
418 | 454 | for i in range(len(targets)): |
@@ -420,7 +456,7 b' class Client(object):' | |||
|
420 | 456 | if self.debug: |
|
421 | 457 | pprint(msg) |
|
422 | 458 | if msg['content']['status'] != 'ok': |
|
423 | error = msg['content'] | |
|
459 | error = ss.unwrap_exception(msg['content']) | |
|
424 | 460 | if error: |
|
425 | 461 | return error |
|
426 | 462 | |
@@ -443,7 +479,7 b' class Client(object):' | |||
|
443 | 479 | if self.debug: |
|
444 | 480 | pprint(msg) |
|
445 | 481 | if msg['content']['status'] != 'ok': |
|
446 | error = msg['content'] | |
|
482 | error = ss.unwrap_exception(msg['content']) | |
|
447 | 483 | if error: |
|
448 | 484 | return error |
|
449 | 485 | |
@@ -461,7 +497,7 b' class Client(object):' | |||
|
461 | 497 | if self.debug: |
|
462 | 498 | pprint(msg) |
|
463 | 499 | if msg['content']['status'] != 'ok': |
|
464 | error = msg['content'] | |
|
500 | error = ss.unwrap_exception(msg['content']) | |
|
465 | 501 | if error: |
|
466 | 502 | return error |
|
467 | 503 | |
@@ -719,7 +755,7 b' class Client(object):' | |||
|
719 | 755 | local_results[msg_id] = self.results[msg_id] |
|
720 | 756 | theids.remove(msg_id) |
|
721 | 757 | |
|
722 |
if |
|
|
758 | if theids: # some not locally cached | |
|
723 | 759 | content = dict(msg_ids=theids, status_only=status_only) |
|
724 | 760 | msg = self.session.send(self._query_socket, "result_request", content=content) |
|
725 | 761 | zmq.select([self._query_socket], [], []) |
@@ -42,13 +42,15 b' class View(object):' | |||
|
42 | 42 | _targets = None |
|
43 | 43 | _ntargets = None |
|
44 | 44 | block=None |
|
45 | bound=None | |
|
45 | 46 | history=None |
|
46 | 47 | |
|
47 | def __init__(self, client, targets): | |
|
48 | def __init__(self, client, targets=None): | |
|
48 | 49 | self.client = client |
|
49 | 50 | self._targets = targets |
|
50 | self._ntargets = 1 if isinstance(targets, int) else len(targets) | |
|
51 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | |
|
51 | 52 | self.block = client.block |
|
53 | self.bound=True | |
|
52 | 54 | self.history = [] |
|
53 | 55 | self.outstanding = set() |
|
54 | 56 | self.results = {} |
@@ -84,7 +86,7 b' class View(object):' | |||
|
84 | 86 | else: |
|
85 | 87 | returns actual result of f(*args, **kwargs) |
|
86 | 88 | """ |
|
87 |
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound= |
|
|
89 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound) | |
|
88 | 90 | |
|
89 | 91 | @save_ids |
|
90 | 92 | def apply_async(self, f, *args, **kwargs): |
@@ -147,6 +149,29 b' class View(object):' | |||
|
147 | 149 | |
|
148 | 150 | """ |
|
149 | 151 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) |
|
152 | ||
|
153 | def abort(self, msg_ids=None, block=None): | |
|
154 | """Abort jobs on my engines. | |
|
155 | ||
|
156 | Parameters | |
|
157 | ---------- | |
|
158 | ||
|
159 | msg_ids : None, str, list of strs, optional | |
|
160 | if None: abort all jobs. | |
|
161 | else: abort specific msg_id(s). | |
|
162 | """ | |
|
163 | block = block if block is not None else self.block | |
|
164 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |
|
165 | ||
|
166 | def queue_status(self, verbose=False): | |
|
167 | """Fetch the Queue status of my engines""" | |
|
168 | return self.client.queue_status(targets=self.targets, verbose=verbose) | |
|
169 | ||
|
170 | def purge_results(self, msg_ids=[],targets=[]): | |
|
171 | """Instruct the controller to forget specific results.""" | |
|
172 | if targets is None or targets == 'all': | |
|
173 | targets = self.targets | |
|
174 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) | |
|
150 | 175 | |
|
151 | 176 | |
|
152 | 177 | class DirectView(View): |
@@ -156,45 +181,40 b' class DirectView(View):' | |||
|
156 | 181 | """update remote namespace with dict `ns`""" |
|
157 | 182 | return self.client.push(ns, targets=self.targets, block=self.block) |
|
158 | 183 | |
|
184 | push = update | |
|
185 | ||
|
159 | 186 | def get(self, key_s): |
|
160 | 187 | """get object(s) by `key_s` from remote namespace |
|
161 | 188 | will return one object if it is a key. |
|
162 | 189 | It also takes a list of keys, and will return a list of objects.""" |
|
163 | 190 | # block = block if block is not None else self.block |
|
164 |
return self.client.pull(key_s, block= |
|
|
191 | return self.client.pull(key_s, block=True, targets=self.targets) | |
|
165 | 192 | |
|
166 | push = update | |
|
167 | pull = get | |
|
193 | def pull(self, key_s, block=True): | |
|
194 | """get object(s) by `key_s` from remote namespace | |
|
195 | will return one object if it is a key. | |
|
196 | It also takes a list of keys, and will return a list of objects.""" | |
|
197 | block = block if block is not None else self.block | |
|
198 | return self.client.pull(key_s, block=block, targets=self.targets) | |
|
168 | 199 | |
|
169 | 200 | def __getitem__(self, key): |
|
170 | 201 | return self.get(key) |
|
171 | 202 | |
|
172 | def __setitem__(self,key,value): | |
|
203 | def __setitem__(self,key, value): | |
|
173 | 204 | self.update({key:value}) |
|
174 | 205 | |
|
175 | 206 | def clear(self, block=False): |
|
176 | 207 | """Clear the remote namespaces on my engines.""" |
|
177 | 208 | block = block if block is not None else self.block |
|
178 | return self.client.clear(targets=self.targets,block=block) | |
|
209 | return self.client.clear(targets=self.targets, block=block) | |
|
179 | 210 | |
|
180 | 211 | def kill(self, block=True): |
|
181 | 212 | """Kill my engines.""" |
|
182 | 213 | block = block if block is not None else self.block |
|
183 | return self.client.kill(targets=self.targets,block=block) | |
|
214 | return self.client.kill(targets=self.targets, block=block) | |
|
184 | 215 | |
|
185 | def abort(self, msg_ids=None, block=None): | |
|
186 | """Abort jobs on my engines. | |
|
187 | ||
|
188 | Parameters | |
|
189 | ---------- | |
|
190 | ||
|
191 | msg_ids : None, str, list of strs, optional | |
|
192 | if None: abort all jobs. | |
|
193 | else: abort specific msg_id(s). | |
|
194 | """ | |
|
195 | block = block if block is not None else self.block | |
|
196 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |
|
197 | ||
|
198 | 216 | class LoadBalancedView(View): |
|
199 | _targets=None | |
|
217 | def __repr__(self): | |
|
218 | return "<%s %s>"%(self.__class__.__name__, self.client._addr) | |
|
219 | ||
|
200 | 220 | No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now