Show More
@@ -1,184 +1,184 b'' | |||
|
1 | 1 | import os |
|
2 | 2 | import uuid |
|
3 | 3 | import pprint |
|
4 | 4 | |
|
5 | 5 | import zmq |
|
6 | 6 | |
|
7 | 7 | from zmq.utils import jsonapi as json |
|
8 | 8 | |
|
9 | 9 | class Message(object): |
|
10 | 10 | """A simple message object that maps dict keys to attributes. |
|
11 | 11 | |
|
12 | 12 | A Message can be created from a dict and a dict from a Message instance |
|
13 | 13 | simply by calling dict(msg_obj).""" |
|
14 | 14 | |
|
15 | 15 | def __init__(self, msg_dict): |
|
16 | 16 | dct = self.__dict__ |
|
17 | 17 | for k, v in msg_dict.iteritems(): |
|
18 | 18 | if isinstance(v, dict): |
|
19 | 19 | v = Message(v) |
|
20 | 20 | dct[k] = v |
|
21 | 21 | |
|
22 | 22 | # Having this iterator lets dict(msg_obj) work out of the box. |
|
23 | 23 | def __iter__(self): |
|
24 | 24 | return iter(self.__dict__.iteritems()) |
|
25 | 25 | |
|
26 | 26 | def __repr__(self): |
|
27 | 27 | return repr(self.__dict__) |
|
28 | 28 | |
|
29 | 29 | def __str__(self): |
|
30 | 30 | return pprint.pformat(self.__dict__) |
|
31 | 31 | |
|
32 | 32 | def __contains__(self, k): |
|
33 | 33 | return k in self.__dict__ |
|
34 | 34 | |
|
35 | 35 | def __getitem__(self, k): |
|
36 | 36 | return self.__dict__[k] |
|
37 | 37 | |
|
38 | 38 | |
|
39 | 39 | def msg_header(msg_id, username, session): |
|
40 | 40 | return { |
|
41 | 41 | 'msg_id' : msg_id, |
|
42 | 42 | 'username' : username, |
|
43 | 43 | 'session' : session |
|
44 | 44 | } |
|
45 | 45 | |
|
46 | 46 | |
|
47 | 47 | def extract_header(msg_or_header): |
|
48 | 48 | """Given a message or header, return the header.""" |
|
49 | 49 | if not msg_or_header: |
|
50 | 50 | return {} |
|
51 | 51 | try: |
|
52 | 52 | # See if msg_or_header is the entire message. |
|
53 | 53 | h = msg_or_header['header'] |
|
54 | 54 | except KeyError: |
|
55 | 55 | try: |
|
56 | 56 | # See if msg_or_header is just the header |
|
57 | 57 | h = msg_or_header['msg_id'] |
|
58 | 58 | except KeyError: |
|
59 | 59 | raise |
|
60 | 60 | else: |
|
61 | 61 | h = msg_or_header |
|
62 | 62 | if not isinstance(h, dict): |
|
63 | 63 | h = dict(h) |
|
64 | 64 | return h |
|
65 | 65 | |
|
66 | 66 | |
|
67 | 67 | class Session(object): |
|
68 | 68 | |
|
69 | 69 | def __init__(self, username=os.environ.get('USER','username'), session=None): |
|
70 | 70 | self.username = username |
|
71 | 71 | if session is None: |
|
72 | 72 | self.session = str(uuid.uuid4()) |
|
73 | 73 | else: |
|
74 | 74 | self.session = session |
|
75 | 75 | self.msg_id = 0 |
|
76 | 76 | |
|
77 | 77 | def msg_header(self): |
|
78 | 78 | h = msg_header(self.msg_id, self.username, self.session) |
|
79 | 79 | self.msg_id += 1 |
|
80 | 80 | return h |
|
81 | 81 | |
|
82 | 82 | def msg(self, msg_type, content=None, parent=None): |
|
83 | 83 | """Construct a standard-form message, with a given type, content, and parent. |
|
84 | 84 | |
|
85 | 85 | NOT to be called directly. |
|
86 | 86 | """ |
|
87 | 87 | msg = {} |
|
88 | 88 | msg['header'] = self.msg_header() |
|
89 | 89 | msg['parent_header'] = {} if parent is None else extract_header(parent) |
|
90 | 90 | msg['msg_type'] = msg_type |
|
91 | 91 | msg['content'] = {} if content is None else content |
|
92 | 92 | return msg |
|
93 | 93 | |
|
94 | 94 | def send(self, socket, msg_or_type, content=None, parent=None, ident=None): |
|
95 | 95 | """send a message via a socket, using a uniform message pattern. |
|
96 | 96 | |
|
97 | 97 | Parameters |
|
98 | 98 | ---------- |
|
99 | 99 | socket : zmq.Socket |
|
100 | 100 | The socket on which to send. |
|
101 | 101 | msg_or_type : Message/dict or str |
|
102 | 102 | if str : then a new message will be constructed from content,parent |
|
103 | 103 | if Message/dict : then content and parent are ignored, and the message |
|
104 | 104 | is sent. This is only for use when sending a Message for a second time. |
|
105 | 105 | content : dict, optional |
|
106 | 106 | The contents of the message |
|
107 | 107 | parent : dict, optional |
|
108 | 108 | The parent header, or parent message, of this message |
|
109 | 109 | ident : bytes, optional |
|
110 | 110 | The zmq.IDENTITY prefix of the destination. |
|
111 | 111 | Only for use on certain socket types. |
|
112 | 112 | |
|
113 | 113 | Returns |
|
114 | 114 | ------- |
|
115 | 115 | msg : dict |
|
116 | 116 | The message, as constructed by self.msg(msg_type,content,parent) |
|
117 | 117 | """ |
|
118 | if isinstance(msg_type, (Message, dict)): | |
|
119 | msg = dict(msg_type) | |
|
118 | if isinstance(msg_or_type, (Message, dict)): | |
|
119 | msg = dict(msg_or_type) | |
|
120 | 120 | else: |
|
121 | msg = self.msg(msg_type, content, parent) | |
|
121 | msg = self.msg(msg_or_type, content, parent) | |
|
122 | 122 | if ident is not None: |
|
123 | 123 | socket.send(ident, zmq.SNDMORE) |
|
124 | 124 | socket.send_json(msg) |
|
125 | 125 | return msg |
|
126 | 126 | |
|
127 | 127 | def recv(self, socket, mode=zmq.NOBLOCK): |
|
128 | 128 | """recv a message on a socket. |
|
129 | 129 | |
|
130 | 130 | Receive an optionally identity-prefixed message, as sent via session.send(). |
|
131 | 131 | |
|
132 | 132 | Parameters |
|
133 | 133 | ---------- |
|
134 | 134 | |
|
135 | 135 | socket : zmq.Socket |
|
136 | 136 | The socket on which to recv a message. |
|
137 | 137 | mode : int, optional |
|
138 | 138 | the mode flag passed to socket.recv |
|
139 | 139 | default: zmq.NOBLOCK |
|
140 | 140 | |
|
141 | 141 | Returns |
|
142 | 142 | ------- |
|
143 | 143 | (ident,msg) : tuple |
|
144 | 144 | always length 2. If no message received, then return is (None,None) |
|
145 | 145 | ident : bytes or None |
|
146 | 146 | the identity prefix is there was one, None otherwise. |
|
147 | 147 | msg : dict or None |
|
148 | 148 | The actual message. If mode==zmq.NOBLOCK and no message was waiting, |
|
149 | 149 | it will be None. |
|
150 | 150 | """ |
|
151 | 151 | try: |
|
152 | 152 | msg = socket.recv_multipart(mode) |
|
153 | 153 | except zmq.ZMQError, e: |
|
154 | 154 | if e.errno == zmq.EAGAIN: |
|
155 | 155 | # We can convert EAGAIN to None as we know in this case |
|
156 | 156 | # recv_json won't return None. |
|
157 | 157 | return None,None |
|
158 | 158 | else: |
|
159 | 159 | raise |
|
160 | 160 | if len(msg) == 1: |
|
161 | 161 | ident=None |
|
162 | 162 | msg = msg[0] |
|
163 | 163 | elif len(msg) == 2: |
|
164 | 164 | ident, msg = msg |
|
165 | 165 | else: |
|
166 | 166 | raise ValueError("Got message with length > 2, which is invalid") |
|
167 | 167 | |
|
168 | 168 | return ident, json.loads(msg) |
|
169 | 169 | |
|
170 | 170 | def test_msg2obj(): |
|
171 | 171 | am = dict(x=1) |
|
172 | 172 | ao = Message(am) |
|
173 | 173 | assert ao.x == am['x'] |
|
174 | 174 | |
|
175 | 175 | am['y'] = dict(z=1) |
|
176 | 176 | ao = Message(am) |
|
177 | 177 | assert ao.y.z == am['y']['z'] |
|
178 | 178 | |
|
179 | 179 | k1, k2 = 'y', 'z' |
|
180 | 180 | assert ao[k1][k2] == am[k1][k2] |
|
181 | 181 | |
|
182 | 182 | am2 = dict(ao) |
|
183 | 183 | assert am['x'] == am2['x'] |
|
184 | 184 | assert am['y']['z'] == am2['y']['z'] |
General Comments 0
You need to be logged in to leave comments.
Login now