Show More
@@ -29,10 +29,16 b' from view import DirectView, LoadBalancedView' | |||
|
29 | 29 | from dependency import Dependency, depend, require |
|
30 | 30 | import error |
|
31 | 31 | |
|
32 | #-------------------------------------------------------------------------- | |
|
33 | # helpers for implementing old MEC API via client.apply | |
|
34 | #-------------------------------------------------------------------------- | |
|
35 | ||
|
32 | 36 | def _push(ns): |
|
37 | """helper method for implementing `client.push` via `client.apply`""" | |
|
33 | 38 | globals().update(ns) |
|
34 | 39 | |
|
35 | 40 | def _pull(keys): |
|
41 | """helper method for implementing `client.pull` via `client.apply`""" | |
|
36 | 42 | g = globals() |
|
37 | 43 | if isinstance(keys, (list,tuple, set)): |
|
38 | 44 | for key in keys: |
@@ -45,10 +51,13 b' def _pull(keys):' | |||
|
45 | 51 | return g.get(keys) |
|
46 | 52 | |
|
47 | 53 | def _clear(): |
|
54 | """helper method for implementing `client.clear` via `client.apply`""" | |
|
48 | 55 | globals().clear() |
|
49 | 56 | |
|
50 | 57 | def execute(code): |
|
58 | """helper method for implementing `client.execute` via `client.apply`""" | |
|
51 | 59 | exec code in globals() |
|
60 | ||
|
52 | 61 | |
|
53 | 62 | #-------------------------------------------------------------------------- |
|
54 | 63 | # Decorators for Client methods |
@@ -613,7 +622,7 b' class Client(object):' | |||
|
613 | 622 | error = ss.unwrap_exception(msg['content']) |
|
614 | 623 | |
|
615 | 624 | if error: |
|
616 |
re |
|
|
625 | raise error | |
|
617 | 626 | |
|
618 | 627 | #-------------------------------------------------------------------------- |
|
619 | 628 | # Execution methods |
@@ -623,6 +632,8 b' class Client(object):' | |||
|
623 | 632 | def execute(self, code, targets='all', block=None): |
|
624 | 633 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
625 | 634 | |
|
635 | ``execute`` is always `bound` (affects engine namespace) | |
|
636 | ||
|
626 | 637 | Parameters |
|
627 | 638 | ---------- |
|
628 | 639 | code : str |
@@ -634,11 +645,7 b' class Client(object):' | |||
|
634 | 645 | whether or not to wait until done to return |
|
635 | 646 | default: self.block |
|
636 | 647 | """ |
|
637 | # block = self.block if block is None else block | |
|
638 | # saveblock = self.block | |
|
639 | # self.block = block | |
|
640 | 648 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) |
|
641 | # self.block = saveblock | |
|
642 | 649 | return result |
|
643 | 650 | |
|
644 | 651 | def run(self, code, block=None): |
@@ -646,6 +653,8 b' class Client(object):' | |||
|
646 | 653 | |
|
647 | 654 | Calls to this are load-balanced. |
|
648 | 655 | |
|
656 | ``run`` is never `bound` (no effect on engine namespace) | |
|
657 | ||
|
649 | 658 | Parameters |
|
650 | 659 | ---------- |
|
651 | 660 | code : str |
@@ -908,7 +917,7 b' class Client(object):' | |||
|
908 | 917 | the engines on which to execute |
|
909 | 918 | default : all |
|
910 | 919 | verbose : bool |
|
911 |
|
|
|
920 | Whether to return lengths only, or lists of ids for each element | |
|
912 | 921 | """ |
|
913 | 922 | targets = self._build_targets(targets)[1] |
|
914 | 923 | content = dict(targets=targets, verbose=verbose) |
@@ -927,12 +936,16 b' class Client(object):' | |||
|
927 | 936 | """Tell the controller to forget results. |
|
928 | 937 | |
|
929 | 938 | Individual results can be purged by msg_id, or the entire |
|
930 | history of specific targets can | |
|
939 | history of specific targets can be purged. | |
|
931 | 940 | |
|
932 | 941 | Parameters |
|
933 | 942 | ---------- |
|
943 | msg_ids : str or list of strs | |
|
944 | the msg_ids whose results should be forgotten. | |
|
934 | 945 | targets : int/str/list of ints/strs |
|
935 | the targets | |
|
946 | The targets, by uuid or int_id, whose entire history is to be purged. | |
|
947 | Use `targets='all'` to scrub everything from the controller's memory. | |
|
948 | ||
|
936 | 949 | default : None |
|
937 | 950 | """ |
|
938 | 951 | if not targets and not msg_ids: |
@@ -201,7 +201,7 b' def serialize_object(obj, threshold=64e-6):' | |||
|
201 | 201 | |
|
202 | 202 | |
|
203 | 203 | def unserialize_object(bufs): |
|
204 | """reconstruct an object serialized by serialize_object from data buffers""" | |
|
204 | """reconstruct an object serialized by serialize_object from data buffers.""" | |
|
205 | 205 | bufs = list(bufs) |
|
206 | 206 | sobj = pickle.loads(bufs.pop(0)) |
|
207 | 207 | if isinstance(sobj, (list, tuple)): |
@@ -402,7 +402,7 b' class StreamSession(object):' | |||
|
402 | 402 | return omsg |
|
403 | 403 | |
|
404 | 404 | def send_raw(self, stream, msg, flags=0, copy=True, ident=None): |
|
405 |
"""Send a raw message via ident |
|
|
405 | """Send a raw message via ident path. | |
|
406 | 406 | |
|
407 | 407 | Parameters |
|
408 | 408 | ---------- |
@@ -444,9 +444,23 b' class StreamSession(object):' | |||
|
444 | 444 | raise e |
|
445 | 445 | |
|
446 | 446 | def feed_identities(self, msg, copy=True): |
|
447 | """This is a completely horrible thing, but it strips the zmq | |
|
448 | ident prefixes off of a message. It will break if any identities | |
|
449 | are unpackable by self.unpack.""" | |
|
447 | """feed until DELIM is reached, then return the prefix as idents and remainder as | |
|
448 | msg. This is easily broken by setting an IDENT to DELIM, but that would be silly. | |
|
449 | ||
|
450 | Parameters | |
|
451 | ---------- | |
|
452 | msg : a list of Message or bytes objects | |
|
453 | the message to be split | |
|
454 | copy : bool | |
|
455 | flag determining whether the arguments are bytes or Messages | |
|
456 | ||
|
457 | Returns | |
|
458 | ------- | |
|
459 | (idents,msg) : two lists | |
|
460 | idents will always be a list of bytes - the indentity prefix | |
|
461 | msg will be a list of bytes or Messages, unchanged from input | |
|
462 | msg should be unpackable via self.unpack_message at this point. | |
|
463 | """ | |
|
450 | 464 | msg = list(msg) |
|
451 | 465 | idents = [] |
|
452 | 466 | while len(msg) > 3: |
@@ -1,11 +1,24 b'' | |||
|
1 | #!/usr/bin/env python | |
|
2 | """Views""" | |
|
1 | """Views of remote engines""" | |
|
2 | #----------------------------------------------------------------------------- | |
|
3 | # Copyright (C) 2010 The IPython Development Team | |
|
4 | # | |
|
5 | # Distributed under the terms of the BSD License. The full license is in | |
|
6 | # the file COPYING, distributed as part of this software. | |
|
7 | #----------------------------------------------------------------------------- | |
|
8 | ||
|
9 | #----------------------------------------------------------------------------- | |
|
10 | # Imports | |
|
11 | #----------------------------------------------------------------------------- | |
|
3 | 12 | |
|
4 | 13 | from IPython.external.decorator import decorator |
|
5 | 14 | |
|
15 | #----------------------------------------------------------------------------- | |
|
16 | # Decorators | |
|
17 | #----------------------------------------------------------------------------- | |
|
6 | 18 | |
|
7 | 19 | @decorator |
|
8 | 20 | def myblock(f, self, *args, **kwargs): |
|
21 | """override client.block with self.block during a call""" | |
|
9 | 22 | block = self.client.block |
|
10 | 23 | self.client.block = self.block |
|
11 | 24 | ret = f(self, *args, **kwargs) |
@@ -14,6 +27,7 b' def myblock(f, self, *args, **kwargs):' | |||
|
14 | 27 | |
|
15 | 28 | @decorator |
|
16 | 29 | def save_ids(f, self, *args, **kwargs): |
|
30 | """Keep our history and outstanding attributes up to date after a method call.""" | |
|
17 | 31 | ret = f(self, *args, **kwargs) |
|
18 | 32 | msg_ids = self.client.history[-self._ntargets:] |
|
19 | 33 | self.history.extend(msg_ids) |
@@ -22,6 +36,7 b' def save_ids(f, self, *args, **kwargs):' | |||
|
22 | 36 | |
|
23 | 37 | @decorator |
|
24 | 38 | def sync_results(f, self, *args, **kwargs): |
|
39 | """sync relevant results from self.client to our results attribute.""" | |
|
25 | 40 | ret = f(self, *args, **kwargs) |
|
26 | 41 | delta = self.outstanding.difference(self.client.outstanding) |
|
27 | 42 | completed = self.outstanding.intersection(delta) |
@@ -32,13 +47,20 b' def sync_results(f, self, *args, **kwargs):' | |||
|
32 | 47 | |
|
33 | 48 | @decorator |
|
34 | 49 | def spin_after(f, self, *args, **kwargs): |
|
50 | """call spin after the method.""" | |
|
35 | 51 | ret = f(self, *args, **kwargs) |
|
36 | 52 | self.spin() |
|
37 | 53 | return ret |
|
38 | 54 | |
|
55 | #----------------------------------------------------------------------------- | |
|
56 | # Classes | |
|
57 | #----------------------------------------------------------------------------- | |
|
39 | 58 | |
|
40 | 59 | class View(object): |
|
41 | """Base View class""" | |
|
60 | """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. | |
|
61 | ||
|
62 | Don't use this class, use subclasses. | |
|
63 | """ | |
|
42 | 64 | _targets = None |
|
43 | 65 | _ntargets = None |
|
44 | 66 | block=None |
@@ -67,7 +89,7 b' class View(object):' | |||
|
67 | 89 | |
|
68 | 90 | @targets.setter |
|
69 | 91 | def targets(self, value): |
|
70 |
raise |
|
|
92 | raise AttributeError("Cannot set my targets argument after construction!") | |
|
71 | 93 | |
|
72 | 94 | @sync_results |
|
73 | 95 | def spin(self): |
@@ -175,7 +197,16 b' class View(object):' | |||
|
175 | 197 | |
|
176 | 198 | |
|
177 | 199 | class DirectView(View): |
|
178 |
"""Direct Multiplexer View |
|
|
200 | """Direct Multiplexer View of one or more engines. | |
|
201 | ||
|
202 | These are created via indexed access to a client: | |
|
203 | ||
|
204 | >>> dv_1 = client[1] | |
|
205 | >>> dv_all = client[:] | |
|
206 | >>> dv_even = client[::2] | |
|
207 | >>> dv_some = client[1:3] | |
|
208 | ||
|
209 | """ | |
|
179 | 210 | |
|
180 | 211 | def update(self, ns): |
|
181 | 212 | """update remote namespace with dict `ns`""" |
@@ -214,6 +245,19 b' class DirectView(View):' | |||
|
214 | 245 | return self.client.kill(targets=self.targets, block=block) |
|
215 | 246 | |
|
216 | 247 | class LoadBalancedView(View): |
|
248 | """An engine-agnostic View that only executes via the Task queue. | |
|
249 | ||
|
250 | Typically created via: | |
|
251 | ||
|
252 | >>> lbv = client[None] | |
|
253 | <LoadBalancedView tcp://127.0.0.1:12345> | |
|
254 | ||
|
255 | but can also be created with: | |
|
256 | ||
|
257 | >>> lbc = LoadBalancedView(client) | |
|
258 | ||
|
259 | TODO: allow subset of engines across which to balance. | |
|
260 | """ | |
|
217 | 261 | def __repr__(self): |
|
218 | 262 | return "<%s %s>"%(self.__class__.__name__, self.client._addr) |
|
219 | 263 |
General Comments 0
You need to be logged in to leave comments.
Login now