Show More
@@ -21,8 +21,8 b' from zmq.eventloop import ioloop, zmqstream' | |||||
21 | from IPython.external.decorator import decorator |
|
21 | from IPython.external.decorator import decorator | |
22 |
|
22 | |||
23 | import streamsession as ss |
|
23 | import streamsession as ss | |
24 | from remotenamespace import RemoteNamespace |
|
24 | # from remotenamespace import RemoteNamespace | |
25 | from view import DirectView |
|
25 | from view import DirectView, LoadBalancedView | |
26 | from dependency import Dependency, depend, require |
|
26 | from dependency import Dependency, depend, require | |
27 |
|
27 | |||
28 | def _push(ns): |
|
28 | def _push(ns): | |
@@ -31,8 +31,13 b' def _push(ns):' | |||||
31 | def _pull(keys): |
|
31 | def _pull(keys): | |
32 | g = globals() |
|
32 | g = globals() | |
33 | if isinstance(keys, (list,tuple, set)): |
|
33 | if isinstance(keys, (list,tuple, set)): | |
|
34 | for key in keys: | |||
|
35 | if not g.has_key(key): | |||
|
36 | raise NameError("name '%s' is not defined"%key) | |||
34 | return map(g.get, keys) |
|
37 | return map(g.get, keys) | |
35 | else: |
|
38 | else: | |
|
39 | if not g.has_key(keys): | |||
|
40 | raise NameError("name '%s' is not defined"%keys) | |||
36 | return g.get(keys) |
|
41 | return g.get(keys) | |
37 |
|
42 | |||
38 | def _clear(): |
|
43 | def _clear(): | |
@@ -62,10 +67,35 b' def defaultblock(f, self, *args, **kwargs):' | |||||
62 | self.block = saveblock |
|
67 | self.block = saveblock | |
63 | return ret |
|
68 | return ret | |
64 |
|
69 | |||
|
70 | def remote(client, block=None, targets=None): | |||
|
71 | """Turn a function into a remote function. | |||
|
72 | ||||
|
73 | This method can be used for map: | |||
|
74 | ||||
|
75 | >>> @remote(client,block=True) | |||
|
76 | def func(a) | |||
|
77 | """ | |||
|
78 | def remote_function(f): | |||
|
79 | return RemoteFunction(client, f, block, targets) | |||
|
80 | return remote_function | |||
|
81 | ||||
65 | #-------------------------------------------------------------------------- |
|
82 | #-------------------------------------------------------------------------- | |
66 | # Classes |
|
83 | # Classes | |
67 | #-------------------------------------------------------------------------- |
|
84 | #-------------------------------------------------------------------------- | |
68 |
|
85 | |||
|
86 | class RemoteFunction(object): | |||
|
87 | """Turn an existing function into a remote function""" | |||
|
88 | ||||
|
89 | def __init__(self, client, f, block=None, targets=None): | |||
|
90 | self.client = client | |||
|
91 | self.func = f | |||
|
92 | self.block=block | |||
|
93 | self.targets=targets | |||
|
94 | ||||
|
95 | def __call__(self, *args, **kwargs): | |||
|
96 | return self.client.apply(self.func, args=args, kwargs=kwargs, | |||
|
97 | block=self.block, targets=self.targets) | |||
|
98 | ||||
69 |
|
99 | |||
70 | class AbortedTask(object): |
|
100 | class AbortedTask(object): | |
71 | """A basic wrapper object describing an aborted task.""" |
|
101 | """A basic wrapper object describing an aborted task.""" | |
@@ -84,7 +114,7 b' class Client(object):' | |||||
84 | Parameters |
|
114 | Parameters | |
85 | ---------- |
|
115 | ---------- | |
86 |
|
116 | |||
87 | addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101 |
|
117 | addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101' | |
88 | The address of the controller's registration socket. |
|
118 | The address of the controller's registration socket. | |
89 |
|
119 | |||
90 |
|
120 | |||
@@ -281,7 +311,8 b' class Client(object):' | |||||
281 | elif content['status'] == 'aborted': |
|
311 | elif content['status'] == 'aborted': | |
282 | self.results[msg_id] = AbortedTask(msg_id) |
|
312 | self.results[msg_id] = AbortedTask(msg_id) | |
283 | elif content['status'] == 'resubmitted': |
|
313 | elif content['status'] == 'resubmitted': | |
284 |
|
|
314 | # TODO: handle resubmission | |
|
315 | pass | |||
285 | else: |
|
316 | else: | |
286 | self.results[msg_id] = ss.unwrap_exception(content) |
|
317 | self.results[msg_id] = ss.unwrap_exception(content) | |
287 |
|
318 | |||
@@ -318,7 +349,9 b' class Client(object):' | |||||
318 |
|
349 | |||
319 | def _flush_control(self, sock): |
|
350 | def _flush_control(self, sock): | |
320 | """Flush replies from the control channel waiting |
|
351 | """Flush replies from the control channel waiting | |
321 |
in the ZMQ queue. |
|
352 | in the ZMQ queue. | |
|
353 | ||||
|
354 | Currently: ignore them.""" | |||
322 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
355 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
323 | while msg is not None: |
|
356 | while msg is not None: | |
324 | if self.debug: |
|
357 | if self.debug: | |
@@ -330,7 +363,10 b' class Client(object):' | |||||
330 | #-------------------------------------------------------------------------- |
|
363 | #-------------------------------------------------------------------------- | |
331 |
|
364 | |||
332 | def __getitem__(self, key): |
|
365 | def __getitem__(self, key): | |
333 |
"""Dict access returns DirectView multiplexer objects |
|
366 | """Dict access returns DirectView multiplexer objects or, | |
|
367 | if key is None, a LoadBalancedView.""" | |||
|
368 | if key is None: | |||
|
369 | return LoadBalancedView(self) | |||
334 | if isinstance(key, int): |
|
370 | if isinstance(key, int): | |
335 | if key not in self.ids: |
|
371 | if key not in self.ids: | |
336 | raise IndexError("No such engine: %i"%key) |
|
372 | raise IndexError("No such engine: %i"%key) | |
@@ -412,7 +448,7 b' class Client(object):' | |||||
412 | """Clear the namespace in target(s).""" |
|
448 | """Clear the namespace in target(s).""" | |
413 | targets = self._build_targets(targets)[0] |
|
449 | targets = self._build_targets(targets)[0] | |
414 | for t in targets: |
|
450 | for t in targets: | |
415 | self.session.send(self._control_socket, 'clear_request', content={},ident=t) |
|
451 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | |
416 | error = False |
|
452 | error = False | |
417 | if self.block: |
|
453 | if self.block: | |
418 | for i in range(len(targets)): |
|
454 | for i in range(len(targets)): | |
@@ -420,7 +456,7 b' class Client(object):' | |||||
420 | if self.debug: |
|
456 | if self.debug: | |
421 | pprint(msg) |
|
457 | pprint(msg) | |
422 | if msg['content']['status'] != 'ok': |
|
458 | if msg['content']['status'] != 'ok': | |
423 | error = msg['content'] |
|
459 | error = ss.unwrap_exception(msg['content']) | |
424 | if error: |
|
460 | if error: | |
425 | return error |
|
461 | return error | |
426 |
|
462 | |||
@@ -443,7 +479,7 b' class Client(object):' | |||||
443 | if self.debug: |
|
479 | if self.debug: | |
444 | pprint(msg) |
|
480 | pprint(msg) | |
445 | if msg['content']['status'] != 'ok': |
|
481 | if msg['content']['status'] != 'ok': | |
446 | error = msg['content'] |
|
482 | error = ss.unwrap_exception(msg['content']) | |
447 | if error: |
|
483 | if error: | |
448 | return error |
|
484 | return error | |
449 |
|
485 | |||
@@ -461,7 +497,7 b' class Client(object):' | |||||
461 | if self.debug: |
|
497 | if self.debug: | |
462 | pprint(msg) |
|
498 | pprint(msg) | |
463 | if msg['content']['status'] != 'ok': |
|
499 | if msg['content']['status'] != 'ok': | |
464 | error = msg['content'] |
|
500 | error = ss.unwrap_exception(msg['content']) | |
465 | if error: |
|
501 | if error: | |
466 | return error |
|
502 | return error | |
467 |
|
503 | |||
@@ -719,7 +755,7 b' class Client(object):' | |||||
719 | local_results[msg_id] = self.results[msg_id] |
|
755 | local_results[msg_id] = self.results[msg_id] | |
720 | theids.remove(msg_id) |
|
756 | theids.remove(msg_id) | |
721 |
|
757 | |||
722 |
if |
|
758 | if theids: # some not locally cached | |
723 | content = dict(msg_ids=theids, status_only=status_only) |
|
759 | content = dict(msg_ids=theids, status_only=status_only) | |
724 | msg = self.session.send(self._query_socket, "result_request", content=content) |
|
760 | msg = self.session.send(self._query_socket, "result_request", content=content) | |
725 | zmq.select([self._query_socket], [], []) |
|
761 | zmq.select([self._query_socket], [], []) |
@@ -42,13 +42,15 b' class View(object):' | |||||
42 | _targets = None |
|
42 | _targets = None | |
43 | _ntargets = None |
|
43 | _ntargets = None | |
44 | block=None |
|
44 | block=None | |
|
45 | bound=None | |||
45 | history=None |
|
46 | history=None | |
46 |
|
47 | |||
47 | def __init__(self, client, targets): |
|
48 | def __init__(self, client, targets=None): | |
48 | self.client = client |
|
49 | self.client = client | |
49 | self._targets = targets |
|
50 | self._targets = targets | |
50 | self._ntargets = 1 if isinstance(targets, int) else len(targets) |
|
51 | self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) | |
51 | self.block = client.block |
|
52 | self.block = client.block | |
|
53 | self.bound=True | |||
52 | self.history = [] |
|
54 | self.history = [] | |
53 | self.outstanding = set() |
|
55 | self.outstanding = set() | |
54 | self.results = {} |
|
56 | self.results = {} | |
@@ -84,7 +86,7 b' class View(object):' | |||||
84 | else: |
|
86 | else: | |
85 | returns actual result of f(*args, **kwargs) |
|
87 | returns actual result of f(*args, **kwargs) | |
86 | """ |
|
88 | """ | |
87 |
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound= |
|
89 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound) | |
88 |
|
90 | |||
89 | @save_ids |
|
91 | @save_ids | |
90 | def apply_async(self, f, *args, **kwargs): |
|
92 | def apply_async(self, f, *args, **kwargs): | |
@@ -147,6 +149,29 b' class View(object):' | |||||
147 |
|
149 | |||
148 | """ |
|
150 | """ | |
149 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) |
|
151 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
|
152 | ||||
|
153 | def abort(self, msg_ids=None, block=None): | |||
|
154 | """Abort jobs on my engines. | |||
|
155 | ||||
|
156 | Parameters | |||
|
157 | ---------- | |||
|
158 | ||||
|
159 | msg_ids : None, str, list of strs, optional | |||
|
160 | if None: abort all jobs. | |||
|
161 | else: abort specific msg_id(s). | |||
|
162 | """ | |||
|
163 | block = block if block is not None else self.block | |||
|
164 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) | |||
|
165 | ||||
|
166 | def queue_status(self, verbose=False): | |||
|
167 | """Fetch the Queue status of my engines""" | |||
|
168 | return self.client.queue_status(targets=self.targets, verbose=verbose) | |||
|
169 | ||||
|
170 | def purge_results(self, msg_ids=[],targets=[]): | |||
|
171 | """Instruct the controller to forget specific results.""" | |||
|
172 | if targets is None or targets == 'all': | |||
|
173 | targets = self.targets | |||
|
174 | return self.client.purge_results(msg_ids=msg_ids, targets=targets) | |||
150 |
|
175 | |||
151 |
|
176 | |||
152 | class DirectView(View): |
|
177 | class DirectView(View): | |
@@ -156,45 +181,40 b' class DirectView(View):' | |||||
156 | """update remote namespace with dict `ns`""" |
|
181 | """update remote namespace with dict `ns`""" | |
157 | return self.client.push(ns, targets=self.targets, block=self.block) |
|
182 | return self.client.push(ns, targets=self.targets, block=self.block) | |
158 |
|
183 | |||
|
184 | push = update | |||
|
185 | ||||
159 | def get(self, key_s): |
|
186 | def get(self, key_s): | |
160 | """get object(s) by `key_s` from remote namespace |
|
187 | """get object(s) by `key_s` from remote namespace | |
161 | will return one object if it is a key. |
|
188 | will return one object if it is a key. | |
162 | It also takes a list of keys, and will return a list of objects.""" |
|
189 | It also takes a list of keys, and will return a list of objects.""" | |
163 | # block = block if block is not None else self.block |
|
190 | # block = block if block is not None else self.block | |
164 |
return self.client.pull(key_s, block= |
|
191 | return self.client.pull(key_s, block=True, targets=self.targets) | |
165 |
|
192 | |||
166 | push = update |
|
193 | def pull(self, key_s, block=True): | |
167 | pull = get |
|
194 | """get object(s) by `key_s` from remote namespace | |
|
195 | will return one object if it is a key. | |||
|
196 | It also takes a list of keys, and will return a list of objects.""" | |||
|
197 | block = block if block is not None else self.block | |||
|
198 | return self.client.pull(key_s, block=block, targets=self.targets) | |||
168 |
|
199 | |||
169 | def __getitem__(self, key): |
|
200 | def __getitem__(self, key): | |
170 | return self.get(key) |
|
201 | return self.get(key) | |
171 |
|
202 | |||
172 | def __setitem__(self,key,value): |
|
203 | def __setitem__(self,key, value): | |
173 | self.update({key:value}) |
|
204 | self.update({key:value}) | |
174 |
|
205 | |||
175 | def clear(self, block=False): |
|
206 | def clear(self, block=False): | |
176 | """Clear the remote namespaces on my engines.""" |
|
207 | """Clear the remote namespaces on my engines.""" | |
177 | block = block if block is not None else self.block |
|
208 | block = block if block is not None else self.block | |
178 | return self.client.clear(targets=self.targets,block=block) |
|
209 | return self.client.clear(targets=self.targets, block=block) | |
179 |
|
210 | |||
180 | def kill(self, block=True): |
|
211 | def kill(self, block=True): | |
181 | """Kill my engines.""" |
|
212 | """Kill my engines.""" | |
182 | block = block if block is not None else self.block |
|
213 | block = block if block is not None else self.block | |
183 | return self.client.kill(targets=self.targets,block=block) |
|
214 | return self.client.kill(targets=self.targets, block=block) | |
184 |
|
215 | |||
185 | def abort(self, msg_ids=None, block=None): |
|
|||
186 | """Abort jobs on my engines. |
|
|||
187 |
|
||||
188 | Parameters |
|
|||
189 | ---------- |
|
|||
190 |
|
||||
191 | msg_ids : None, str, list of strs, optional |
|
|||
192 | if None: abort all jobs. |
|
|||
193 | else: abort specific msg_id(s). |
|
|||
194 | """ |
|
|||
195 | block = block if block is not None else self.block |
|
|||
196 | return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) |
|
|||
197 |
|
||||
198 | class LoadBalancedView(View): |
|
216 | class LoadBalancedView(View): | |
199 | _targets=None |
|
217 | def __repr__(self): | |
|
218 | return "<%s %s>"%(self.__class__.__name__, self.client._addr) | |||
|
219 | ||||
200 | No newline at end of file |
|
220 |
General Comments 0
You need to be logged in to leave comments.
Login now