Show More
@@ -29,10 +29,16 b' from view import DirectView, LoadBalancedView' | |||||
29 | from dependency import Dependency, depend, require |
|
29 | from dependency import Dependency, depend, require | |
30 | import error |
|
30 | import error | |
31 |
|
31 | |||
|
32 | #-------------------------------------------------------------------------- | |||
|
33 | # helpers for implementing old MEC API via client.apply | |||
|
34 | #-------------------------------------------------------------------------- | |||
|
35 | ||||
32 | def _push(ns): |
|
36 | def _push(ns): | |
|
37 | """helper method for implementing `client.push` via `client.apply`""" | |||
33 | globals().update(ns) |
|
38 | globals().update(ns) | |
34 |
|
39 | |||
35 | def _pull(keys): |
|
40 | def _pull(keys): | |
|
41 | """helper method for implementing `client.pull` via `client.apply`""" | |||
36 | g = globals() |
|
42 | g = globals() | |
37 | if isinstance(keys, (list,tuple, set)): |
|
43 | if isinstance(keys, (list,tuple, set)): | |
38 | for key in keys: |
|
44 | for key in keys: | |
@@ -45,11 +51,14 b' def _pull(keys):' | |||||
45 | return g.get(keys) |
|
51 | return g.get(keys) | |
46 |
|
52 | |||
47 | def _clear(): |
|
53 | def _clear(): | |
|
54 | """helper method for implementing `client.clear` via `client.apply`""" | |||
48 | globals().clear() |
|
55 | globals().clear() | |
49 |
|
56 | |||
50 | def execute(code): |
|
57 | def execute(code): | |
|
58 | """helper method for implementing `client.execute` via `client.apply`""" | |||
51 | exec code in globals() |
|
59 | exec code in globals() | |
52 |
|
60 | |||
|
61 | ||||
53 | #-------------------------------------------------------------------------- |
|
62 | #-------------------------------------------------------------------------- | |
54 | # Decorators for Client methods |
|
63 | # Decorators for Client methods | |
55 | #-------------------------------------------------------------------------- |
|
64 | #-------------------------------------------------------------------------- | |
@@ -613,7 +622,7 b' class Client(object):' | |||||
613 | error = ss.unwrap_exception(msg['content']) |
|
622 | error = ss.unwrap_exception(msg['content']) | |
614 |
|
623 | |||
615 | if error: |
|
624 | if error: | |
616 |
re |
|
625 | raise error | |
617 |
|
626 | |||
618 | #-------------------------------------------------------------------------- |
|
627 | #-------------------------------------------------------------------------- | |
619 | # Execution methods |
|
628 | # Execution methods | |
@@ -623,6 +632,8 b' class Client(object):' | |||||
623 | def execute(self, code, targets='all', block=None): |
|
632 | def execute(self, code, targets='all', block=None): | |
624 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
633 | """Executes `code` on `targets` in blocking or nonblocking manner. | |
625 |
|
634 | |||
|
635 | ``execute`` is always `bound` (affects engine namespace) | |||
|
636 | ||||
626 | Parameters |
|
637 | Parameters | |
627 | ---------- |
|
638 | ---------- | |
628 | code : str |
|
639 | code : str | |
@@ -634,11 +645,7 b' class Client(object):' | |||||
634 | whether or not to wait until done to return |
|
645 | whether or not to wait until done to return | |
635 | default: self.block |
|
646 | default: self.block | |
636 | """ |
|
647 | """ | |
637 | # block = self.block if block is None else block |
|
|||
638 | # saveblock = self.block |
|
|||
639 | # self.block = block |
|
|||
640 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) |
|
648 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |
641 | # self.block = saveblock |
|
|||
642 | return result |
|
649 | return result | |
643 |
|
650 | |||
644 | def run(self, code, block=None): |
|
651 | def run(self, code, block=None): | |
@@ -646,6 +653,8 b' class Client(object):' | |||||
646 |
|
653 | |||
647 | Calls to this are load-balanced. |
|
654 | Calls to this are load-balanced. | |
648 |
|
655 | |||
|
656 | ``run`` is never `bound` (no effect on engine namespace) | |||
|
657 | ||||
649 | Parameters |
|
658 | Parameters | |
650 | ---------- |
|
659 | ---------- | |
651 | code : str |
|
660 | code : str | |
@@ -908,7 +917,7 b' class Client(object):' | |||||
908 | the engines on which to execute |
|
917 | the engines on which to execute | |
909 | default : all |
|
918 | default : all | |
910 | verbose : bool |
|
919 | verbose : bool | |
911 |
|
|
920 | Whether to return lengths only, or lists of ids for each element | |
912 | """ |
|
921 | """ | |
913 | targets = self._build_targets(targets)[1] |
|
922 | targets = self._build_targets(targets)[1] | |
914 | content = dict(targets=targets, verbose=verbose) |
|
923 | content = dict(targets=targets, verbose=verbose) | |
@@ -927,12 +936,16 b' class Client(object):' | |||||
927 | """Tell the controller to forget results. |
|
936 | """Tell the controller to forget results. | |
928 |
|
937 | |||
929 | Individual results can be purged by msg_id, or the entire |
|
938 | Individual results can be purged by msg_id, or the entire | |
930 | history of specific targets can |
|
939 | history of specific targets can be purged. | |
931 |
|
940 | |||
932 | Parameters |
|
941 | Parameters | |
933 | ---------- |
|
942 | ---------- | |
|
943 | msg_ids : str or list of strs | |||
|
944 | the msg_ids whose results should be forgotten. | |||
934 | targets : int/str/list of ints/strs |
|
945 | targets : int/str/list of ints/strs | |
935 | the targets |
|
946 | The targets, by uuid or int_id, whose entire history is to be purged. | |
|
947 | Use `targets='all'` to scrub everything from the controller's memory. | |||
|
948 | ||||
936 | default : None |
|
949 | default : None | |
937 | """ |
|
950 | """ | |
938 | if not targets and not msg_ids: |
|
951 | if not targets and not msg_ids: |
@@ -201,7 +201,7 b' def serialize_object(obj, threshold=64e-6):' | |||||
201 |
|
201 | |||
202 |
|
202 | |||
203 | def unserialize_object(bufs): |
|
203 | def unserialize_object(bufs): | |
204 | """reconstruct an object serialized by serialize_object from data buffers""" |
|
204 | """reconstruct an object serialized by serialize_object from data buffers.""" | |
205 | bufs = list(bufs) |
|
205 | bufs = list(bufs) | |
206 | sobj = pickle.loads(bufs.pop(0)) |
|
206 | sobj = pickle.loads(bufs.pop(0)) | |
207 | if isinstance(sobj, (list, tuple)): |
|
207 | if isinstance(sobj, (list, tuple)): | |
@@ -402,7 +402,7 b' class StreamSession(object):' | |||||
402 | return omsg |
|
402 | return omsg | |
403 |
|
403 | |||
404 | def send_raw(self, stream, msg, flags=0, copy=True, ident=None): |
|
404 | def send_raw(self, stream, msg, flags=0, copy=True, ident=None): | |
405 |
"""Send a raw message via ident |
|
405 | """Send a raw message via ident path. | |
406 |
|
406 | |||
407 | Parameters |
|
407 | Parameters | |
408 | ---------- |
|
408 | ---------- | |
@@ -444,9 +444,23 b' class StreamSession(object):' | |||||
444 | raise e |
|
444 | raise e | |
445 |
|
445 | |||
446 | def feed_identities(self, msg, copy=True): |
|
446 | def feed_identities(self, msg, copy=True): | |
447 | """This is a completely horrible thing, but it strips the zmq |
|
447 | """feed until DELIM is reached, then return the prefix as idents and remainder as | |
448 | ident prefixes off of a message. It will break if any identities |
|
448 | msg. This is easily broken by setting an IDENT to DELIM, but that would be silly. | |
449 | are unpackable by self.unpack.""" |
|
449 | ||
|
450 | Parameters | |||
|
451 | ---------- | |||
|
452 | msg : a list of Message or bytes objects | |||
|
453 | the message to be split | |||
|
454 | copy : bool | |||
|
455 | flag determining whether the arguments are bytes or Messages | |||
|
456 | ||||
|
457 | Returns | |||
|
458 | ------- | |||
|
459 | (idents,msg) : two lists | |||
|
460 | idents will always be a list of bytes - the indentity prefix | |||
|
461 | msg will be a list of bytes or Messages, unchanged from input | |||
|
462 | msg should be unpackable via self.unpack_message at this point. | |||
|
463 | """ | |||
450 | msg = list(msg) |
|
464 | msg = list(msg) | |
451 | idents = [] |
|
465 | idents = [] | |
452 | while len(msg) > 3: |
|
466 | while len(msg) > 3: |
@@ -1,11 +1,24 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | """Views of remote engines""" | |
2 | """Views""" |
|
2 | #----------------------------------------------------------------------------- | |
|
3 | # Copyright (C) 2010 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | # Imports | |||
|
11 | #----------------------------------------------------------------------------- | |||
3 |
|
12 | |||
4 | from IPython.external.decorator import decorator |
|
13 | from IPython.external.decorator import decorator | |
5 |
|
14 | |||
|
15 | #----------------------------------------------------------------------------- | |||
|
16 | # Decorators | |||
|
17 | #----------------------------------------------------------------------------- | |||
6 |
|
18 | |||
7 | @decorator |
|
19 | @decorator | |
8 | def myblock(f, self, *args, **kwargs): |
|
20 | def myblock(f, self, *args, **kwargs): | |
|
21 | """override client.block with self.block during a call""" | |||
9 | block = self.client.block |
|
22 | block = self.client.block | |
10 | self.client.block = self.block |
|
23 | self.client.block = self.block | |
11 | ret = f(self, *args, **kwargs) |
|
24 | ret = f(self, *args, **kwargs) | |
@@ -14,6 +27,7 b' def myblock(f, self, *args, **kwargs):' | |||||
14 |
|
27 | |||
15 | @decorator |
|
28 | @decorator | |
16 | def save_ids(f, self, *args, **kwargs): |
|
29 | def save_ids(f, self, *args, **kwargs): | |
|
30 | """Keep our history and outstanding attributes up to date after a method call.""" | |||
17 | ret = f(self, *args, **kwargs) |
|
31 | ret = f(self, *args, **kwargs) | |
18 | msg_ids = self.client.history[-self._ntargets:] |
|
32 | msg_ids = self.client.history[-self._ntargets:] | |
19 | self.history.extend(msg_ids) |
|
33 | self.history.extend(msg_ids) | |
@@ -22,6 +36,7 b' def save_ids(f, self, *args, **kwargs):' | |||||
22 |
|
36 | |||
23 | @decorator |
|
37 | @decorator | |
24 | def sync_results(f, self, *args, **kwargs): |
|
38 | def sync_results(f, self, *args, **kwargs): | |
|
39 | """sync relevant results from self.client to our results attribute.""" | |||
25 | ret = f(self, *args, **kwargs) |
|
40 | ret = f(self, *args, **kwargs) | |
26 | delta = self.outstanding.difference(self.client.outstanding) |
|
41 | delta = self.outstanding.difference(self.client.outstanding) | |
27 | completed = self.outstanding.intersection(delta) |
|
42 | completed = self.outstanding.intersection(delta) | |
@@ -32,13 +47,20 b' def sync_results(f, self, *args, **kwargs):' | |||||
32 |
|
47 | |||
33 | @decorator |
|
48 | @decorator | |
34 | def spin_after(f, self, *args, **kwargs): |
|
49 | def spin_after(f, self, *args, **kwargs): | |
|
50 | """call spin after the method.""" | |||
35 | ret = f(self, *args, **kwargs) |
|
51 | ret = f(self, *args, **kwargs) | |
36 | self.spin() |
|
52 | self.spin() | |
37 | return ret |
|
53 | return ret | |
38 |
|
54 | |||
|
55 | #----------------------------------------------------------------------------- | |||
|
56 | # Classes | |||
|
57 | #----------------------------------------------------------------------------- | |||
39 |
|
58 | |||
40 | class View(object): |
|
59 | class View(object): | |
41 | """Base View class""" |
|
60 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | |
|
61 | ||||
|
62 | Don't use this class, use subclasses. | |||
|
63 | """ | |||
42 | _targets = None |
|
64 | _targets = None | |
43 | _ntargets = None |
|
65 | _ntargets = None | |
44 | block=None |
|
66 | block=None | |
@@ -67,7 +89,7 b' class View(object):' | |||||
67 |
|
89 | |||
68 | @targets.setter |
|
90 | @targets.setter | |
69 | def targets(self, value): |
|
91 | def targets(self, value): | |
70 |
raise |
|
92 | raise AttributeError("Cannot set my targets argument after construction!") | |
71 |
|
93 | |||
72 | @sync_results |
|
94 | @sync_results | |
73 | def spin(self): |
|
95 | def spin(self): | |
@@ -175,7 +197,16 b' class View(object):' | |||||
175 |
|
197 | |||
176 |
|
198 | |||
177 | class DirectView(View): |
|
199 | class DirectView(View): | |
178 |
"""Direct Multiplexer View |
|
200 | """Direct Multiplexer View of one or more engines. | |
|
201 | ||||
|
202 | These are created via indexed access to a client: | |||
|
203 | ||||
|
204 | >>> dv_1 = client[1] | |||
|
205 | >>> dv_all = client[:] | |||
|
206 | >>> dv_even = client[::2] | |||
|
207 | >>> dv_some = client[1:3] | |||
|
208 | ||||
|
209 | """ | |||
179 |
|
210 | |||
180 | def update(self, ns): |
|
211 | def update(self, ns): | |
181 | """update remote namespace with dict `ns`""" |
|
212 | """update remote namespace with dict `ns`""" | |
@@ -214,6 +245,19 b' class DirectView(View):' | |||||
214 | return self.client.kill(targets=self.targets, block=block) |
|
245 | return self.client.kill(targets=self.targets, block=block) | |
215 |
|
246 | |||
216 | class LoadBalancedView(View): |
|
247 | class LoadBalancedView(View): | |
|
248 | """An engine-agnostic View that only executes via the Task queue. | |||
|
249 | ||||
|
250 | Typically created via: | |||
|
251 | ||||
|
252 | >>> lbv = client[None] | |||
|
253 | <LoadBalancedView tcp://127.0.0.1:12345> | |||
|
254 | ||||
|
255 | but can also be created with: | |||
|
256 | ||||
|
257 | >>> lbc = LoadBalancedView(client) | |||
|
258 | ||||
|
259 | TODO: allow subset of engines across which to balance. | |||
|
260 | """ | |||
217 | def __repr__(self): |
|
261 | def __repr__(self): | |
218 | return "<%s %s>"%(self.__class__.__name__, self.client._addr) |
|
262 | return "<%s %s>"%(self.__class__.__name__, self.client._addr) | |
219 |
|
263 |
General Comments 0
You need to be logged in to leave comments.
Login now