##// END OF EJS Templates
py3: cast exception to bytes...
Gregory Szorc -
r39870:bce1c1af default
parent child Browse files
Show More
@@ -1,1325 +1,1326 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 util,
24 util,
25 )
25 )
26 from .utils import (
26 from .utils import (
27 cborutil,
27 cborutil,
28 stringutil,
28 stringutil,
29 )
29 )
30
30
31 FRAME_HEADER_SIZE = 8
31 FRAME_HEADER_SIZE = 8
32 DEFAULT_MAX_FRAME_SIZE = 32768
32 DEFAULT_MAX_FRAME_SIZE = 32768
33
33
34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_END_STREAM = 0x02
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37
37
38 STREAM_FLAGS = {
38 STREAM_FLAGS = {
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 }
42 }
43
43
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 FRAME_TYPE_COMMAND_DATA = 0x02
45 FRAME_TYPE_COMMAND_DATA = 0x02
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_PROGRESS = 0x07
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51
51
52 FRAME_TYPES = {
52 FRAME_TYPES = {
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 b'progress': FRAME_TYPE_PROGRESS,
58 b'progress': FRAME_TYPE_PROGRESS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 }
60 }
61
61
62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66
66
67 FLAGS_COMMAND_REQUEST = {
67 FLAGS_COMMAND_REQUEST = {
68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 }
72 }
73
73
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 FLAG_COMMAND_DATA_EOS = 0x02
75 FLAG_COMMAND_DATA_EOS = 0x02
76
76
77 FLAGS_COMMAND_DATA = {
77 FLAGS_COMMAND_DATA = {
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 b'eos': FLAG_COMMAND_DATA_EOS,
79 b'eos': FLAG_COMMAND_DATA_EOS,
80 }
80 }
81
81
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
84
84
85 FLAGS_COMMAND_RESPONSE = {
85 FLAGS_COMMAND_RESPONSE = {
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
88 }
88 }
89
89
90 # Maps frame types to their available flags.
90 # Maps frame types to their available flags.
91 FRAME_TYPE_FLAGS = {
91 FRAME_TYPE_FLAGS = {
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
95 FRAME_TYPE_ERROR_RESPONSE: {},
95 FRAME_TYPE_ERROR_RESPONSE: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
97 FRAME_TYPE_PROGRESS: {},
97 FRAME_TYPE_PROGRESS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
99 }
99 }
100
100
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
102
102
103 def humanflags(mapping, value):
103 def humanflags(mapping, value):
104 """Convert a numeric flags value to a human value, using a mapping table."""
104 """Convert a numeric flags value to a human value, using a mapping table."""
105 namemap = {v: k for k, v in mapping.iteritems()}
105 namemap = {v: k for k, v in mapping.iteritems()}
106 flags = []
106 flags = []
107 val = 1
107 val = 1
108 while value >= val:
108 while value >= val:
109 if value & val:
109 if value & val:
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
111 val <<= 1
111 val <<= 1
112
112
113 return b'|'.join(flags)
113 return b'|'.join(flags)
114
114
115 @attr.s(slots=True)
115 @attr.s(slots=True)
116 class frameheader(object):
116 class frameheader(object):
117 """Represents the data in a frame header."""
117 """Represents the data in a frame header."""
118
118
119 length = attr.ib()
119 length = attr.ib()
120 requestid = attr.ib()
120 requestid = attr.ib()
121 streamid = attr.ib()
121 streamid = attr.ib()
122 streamflags = attr.ib()
122 streamflags = attr.ib()
123 typeid = attr.ib()
123 typeid = attr.ib()
124 flags = attr.ib()
124 flags = attr.ib()
125
125
126 @attr.s(slots=True, repr=False)
126 @attr.s(slots=True, repr=False)
127 class frame(object):
127 class frame(object):
128 """Represents a parsed frame."""
128 """Represents a parsed frame."""
129
129
130 requestid = attr.ib()
130 requestid = attr.ib()
131 streamid = attr.ib()
131 streamid = attr.ib()
132 streamflags = attr.ib()
132 streamflags = attr.ib()
133 typeid = attr.ib()
133 typeid = attr.ib()
134 flags = attr.ib()
134 flags = attr.ib()
135 payload = attr.ib()
135 payload = attr.ib()
136
136
137 @encoding.strmethod
137 @encoding.strmethod
138 def __repr__(self):
138 def __repr__(self):
139 typename = '<unknown 0x%02x>' % self.typeid
139 typename = '<unknown 0x%02x>' % self.typeid
140 for name, value in FRAME_TYPES.iteritems():
140 for name, value in FRAME_TYPES.iteritems():
141 if value == self.typeid:
141 if value == self.typeid:
142 typename = name
142 typename = name
143 break
143 break
144
144
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
146 'type=%s; flags=%s)' % (
146 'type=%s; flags=%s)' % (
147 len(self.payload), self.requestid, self.streamid,
147 len(self.payload), self.requestid, self.streamid,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
150
150
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
152 """Assemble a frame into a byte array."""
152 """Assemble a frame into a byte array."""
153 # TODO assert size of payload.
153 # TODO assert size of payload.
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
155
155
156 # 24 bits length
156 # 24 bits length
157 # 16 bits request id
157 # 16 bits request id
158 # 8 bits stream id
158 # 8 bits stream id
159 # 8 bits stream flags
159 # 8 bits stream flags
160 # 4 bits type
160 # 4 bits type
161 # 4 bits flags
161 # 4 bits flags
162
162
163 l = struct.pack(r'<I', len(payload))
163 l = struct.pack(r'<I', len(payload))
164 frame[0:3] = l[0:3]
164 frame[0:3] = l[0:3]
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
166 frame[7] = (typeid << 4) | flags
166 frame[7] = (typeid << 4) | flags
167 frame[8:] = payload
167 frame[8:] = payload
168
168
169 return frame
169 return frame
170
170
171 def makeframefromhumanstring(s):
171 def makeframefromhumanstring(s):
172 """Create a frame from a human readable string
172 """Create a frame from a human readable string
173
173
174 Strings have the form:
174 Strings have the form:
175
175
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
177
177
178 This can be used by user-facing applications and tests for creating
178 This can be used by user-facing applications and tests for creating
179 frames easily without having to type out a bunch of constants.
179 frames easily without having to type out a bunch of constants.
180
180
181 Request ID and stream IDs are integers.
181 Request ID and stream IDs are integers.
182
182
183 Stream flags, frame type, and flags can be specified by integer or
183 Stream flags, frame type, and flags can be specified by integer or
184 named constant.
184 named constant.
185
185
186 Flags can be delimited by `|` to bitwise OR them together.
186 Flags can be delimited by `|` to bitwise OR them together.
187
187
188 If the payload begins with ``cbor:``, the following string will be
188 If the payload begins with ``cbor:``, the following string will be
189 evaluated as Python literal and the resulting object will be fed into
189 evaluated as Python literal and the resulting object will be fed into
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
191 byte string literal.
191 byte string literal.
192 """
192 """
193 fields = s.split(b' ', 5)
193 fields = s.split(b' ', 5)
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
195
195
196 requestid = int(requestid)
196 requestid = int(requestid)
197 streamid = int(streamid)
197 streamid = int(streamid)
198
198
199 finalstreamflags = 0
199 finalstreamflags = 0
200 for flag in streamflags.split(b'|'):
200 for flag in streamflags.split(b'|'):
201 if flag in STREAM_FLAGS:
201 if flag in STREAM_FLAGS:
202 finalstreamflags |= STREAM_FLAGS[flag]
202 finalstreamflags |= STREAM_FLAGS[flag]
203 else:
203 else:
204 finalstreamflags |= int(flag)
204 finalstreamflags |= int(flag)
205
205
206 if frametype in FRAME_TYPES:
206 if frametype in FRAME_TYPES:
207 frametype = FRAME_TYPES[frametype]
207 frametype = FRAME_TYPES[frametype]
208 else:
208 else:
209 frametype = int(frametype)
209 frametype = int(frametype)
210
210
211 finalflags = 0
211 finalflags = 0
212 validflags = FRAME_TYPE_FLAGS[frametype]
212 validflags = FRAME_TYPE_FLAGS[frametype]
213 for flag in frameflags.split(b'|'):
213 for flag in frameflags.split(b'|'):
214 if flag in validflags:
214 if flag in validflags:
215 finalflags |= validflags[flag]
215 finalflags |= validflags[flag]
216 else:
216 else:
217 finalflags |= int(flag)
217 finalflags |= int(flag)
218
218
219 if payload.startswith(b'cbor:'):
219 if payload.startswith(b'cbor:'):
220 payload = b''.join(cborutil.streamencode(
220 payload = b''.join(cborutil.streamencode(
221 stringutil.evalpythonliteral(payload[5:])))
221 stringutil.evalpythonliteral(payload[5:])))
222
222
223 else:
223 else:
224 payload = stringutil.unescapestr(payload)
224 payload = stringutil.unescapestr(payload)
225
225
226 return makeframe(requestid=requestid, streamid=streamid,
226 return makeframe(requestid=requestid, streamid=streamid,
227 streamflags=finalstreamflags, typeid=frametype,
227 streamflags=finalstreamflags, typeid=frametype,
228 flags=finalflags, payload=payload)
228 flags=finalflags, payload=payload)
229
229
230 def parseheader(data):
230 def parseheader(data):
231 """Parse a unified framing protocol frame header from a buffer.
231 """Parse a unified framing protocol frame header from a buffer.
232
232
233 The header is expected to be in the buffer at offset 0 and the
233 The header is expected to be in the buffer at offset 0 and the
234 buffer is expected to be large enough to hold a full header.
234 buffer is expected to be large enough to hold a full header.
235 """
235 """
236 # 24 bits payload length (little endian)
236 # 24 bits payload length (little endian)
237 # 16 bits request ID
237 # 16 bits request ID
238 # 8 bits stream ID
238 # 8 bits stream ID
239 # 8 bits stream flags
239 # 8 bits stream flags
240 # 4 bits frame type
240 # 4 bits frame type
241 # 4 bits frame flags
241 # 4 bits frame flags
242 # ... payload
242 # ... payload
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
245 typeflags = data[7]
245 typeflags = data[7]
246
246
247 frametype = (typeflags & 0xf0) >> 4
247 frametype = (typeflags & 0xf0) >> 4
248 frameflags = typeflags & 0x0f
248 frameflags = typeflags & 0x0f
249
249
250 return frameheader(framelength, requestid, streamid, streamflags,
250 return frameheader(framelength, requestid, streamid, streamflags,
251 frametype, frameflags)
251 frametype, frameflags)
252
252
253 def readframe(fh):
253 def readframe(fh):
254 """Read a unified framing protocol frame from a file object.
254 """Read a unified framing protocol frame from a file object.
255
255
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
257 None if no frame is available. May raise if a malformed frame is
257 None if no frame is available. May raise if a malformed frame is
258 seen.
258 seen.
259 """
259 """
260 header = bytearray(FRAME_HEADER_SIZE)
260 header = bytearray(FRAME_HEADER_SIZE)
261
261
262 readcount = fh.readinto(header)
262 readcount = fh.readinto(header)
263
263
264 if readcount == 0:
264 if readcount == 0:
265 return None
265 return None
266
266
267 if readcount != FRAME_HEADER_SIZE:
267 if readcount != FRAME_HEADER_SIZE:
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
269 (readcount, header))
269 (readcount, header))
270
270
271 h = parseheader(header)
271 h = parseheader(header)
272
272
273 payload = fh.read(h.length)
273 payload = fh.read(h.length)
274 if len(payload) != h.length:
274 if len(payload) != h.length:
275 raise error.Abort(_('frame length error: expected %d; got %d') %
275 raise error.Abort(_('frame length error: expected %d; got %d') %
276 (h.length, len(payload)))
276 (h.length, len(payload)))
277
277
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
279 payload)
279 payload)
280
280
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
283 """Create frames necessary to transmit a request to run a command.
283 """Create frames necessary to transmit a request to run a command.
284
284
285 This is a generator of bytearrays. Each item represents a frame
285 This is a generator of bytearrays. Each item represents a frame
286 ready to be sent over the wire to a peer.
286 ready to be sent over the wire to a peer.
287 """
287 """
288 data = {b'name': cmd}
288 data = {b'name': cmd}
289 if args:
289 if args:
290 data[b'args'] = args
290 data[b'args'] = args
291
291
292 data = b''.join(cborutil.streamencode(data))
292 data = b''.join(cborutil.streamencode(data))
293
293
294 offset = 0
294 offset = 0
295
295
296 while True:
296 while True:
297 flags = 0
297 flags = 0
298
298
299 # Must set new or continuation flag.
299 # Must set new or continuation flag.
300 if not offset:
300 if not offset:
301 flags |= FLAG_COMMAND_REQUEST_NEW
301 flags |= FLAG_COMMAND_REQUEST_NEW
302 else:
302 else:
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
304
304
305 # Data frames is set on all frames.
305 # Data frames is set on all frames.
306 if datafh:
306 if datafh:
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
308
308
309 payload = data[offset:offset + maxframesize]
309 payload = data[offset:offset + maxframesize]
310 offset += len(payload)
310 offset += len(payload)
311
311
312 if len(payload) == maxframesize and offset < len(data):
312 if len(payload) == maxframesize and offset < len(data):
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
314
314
315 yield stream.makeframe(requestid=requestid,
315 yield stream.makeframe(requestid=requestid,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
317 flags=flags,
317 flags=flags,
318 payload=payload)
318 payload=payload)
319
319
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
321 break
321 break
322
322
323 if datafh:
323 if datafh:
324 while True:
324 while True:
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
326
326
327 done = False
327 done = False
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
329 flags = FLAG_COMMAND_DATA_CONTINUATION
329 flags = FLAG_COMMAND_DATA_CONTINUATION
330 else:
330 else:
331 flags = FLAG_COMMAND_DATA_EOS
331 flags = FLAG_COMMAND_DATA_EOS
332 assert datafh.read(1) == b''
332 assert datafh.read(1) == b''
333 done = True
333 done = True
334
334
335 yield stream.makeframe(requestid=requestid,
335 yield stream.makeframe(requestid=requestid,
336 typeid=FRAME_TYPE_COMMAND_DATA,
336 typeid=FRAME_TYPE_COMMAND_DATA,
337 flags=flags,
337 flags=flags,
338 payload=data)
338 payload=data)
339
339
340 if done:
340 if done:
341 break
341 break
342
342
343 def createcommandresponseframesfrombytes(stream, requestid, data,
343 def createcommandresponseframesfrombytes(stream, requestid, data,
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
345 """Create a raw frame to send a bytes response from static bytes input.
345 """Create a raw frame to send a bytes response from static bytes input.
346
346
347 Returns a generator of bytearrays.
347 Returns a generator of bytearrays.
348 """
348 """
349 # Automatically send the overall CBOR response map.
349 # Automatically send the overall CBOR response map.
350 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
350 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
351 if len(overall) > maxframesize:
351 if len(overall) > maxframesize:
352 raise error.ProgrammingError('not yet implemented')
352 raise error.ProgrammingError('not yet implemented')
353
353
354 # Simple case where we can fit the full response in a single frame.
354 # Simple case where we can fit the full response in a single frame.
355 if len(overall) + len(data) <= maxframesize:
355 if len(overall) + len(data) <= maxframesize:
356 flags = FLAG_COMMAND_RESPONSE_EOS
356 flags = FLAG_COMMAND_RESPONSE_EOS
357 yield stream.makeframe(requestid=requestid,
357 yield stream.makeframe(requestid=requestid,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
359 flags=flags,
359 flags=flags,
360 payload=overall + data)
360 payload=overall + data)
361 return
361 return
362
362
363 # It's easier to send the overall CBOR map in its own frame than to track
363 # It's easier to send the overall CBOR map in its own frame than to track
364 # offsets.
364 # offsets.
365 yield stream.makeframe(requestid=requestid,
365 yield stream.makeframe(requestid=requestid,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
368 payload=overall)
368 payload=overall)
369
369
370 offset = 0
370 offset = 0
371 while True:
371 while True:
372 chunk = data[offset:offset + maxframesize]
372 chunk = data[offset:offset + maxframesize]
373 offset += len(chunk)
373 offset += len(chunk)
374 done = offset == len(data)
374 done = offset == len(data)
375
375
376 if done:
376 if done:
377 flags = FLAG_COMMAND_RESPONSE_EOS
377 flags = FLAG_COMMAND_RESPONSE_EOS
378 else:
378 else:
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
380
380
381 yield stream.makeframe(requestid=requestid,
381 yield stream.makeframe(requestid=requestid,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 flags=flags,
383 flags=flags,
384 payload=chunk)
384 payload=chunk)
385
385
386 if done:
386 if done:
387 break
387 break
388
388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 """Generator of frames from a generator of byte chunks.
391 """Generator of frames from a generator of byte chunks.
392
392
393 This assumes that another frame will follow whatever this emits. i.e.
393 This assumes that another frame will follow whatever this emits. i.e.
394 this always emits the continuation flag and never emits the end-of-stream
394 this always emits the continuation flag and never emits the end-of-stream
395 flag.
395 flag.
396 """
396 """
397 cb = util.chunkbuffer(gen)
397 cb = util.chunkbuffer(gen)
398 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
398 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
399
399
400 while True:
400 while True:
401 chunk = cb.read(maxframesize)
401 chunk = cb.read(maxframesize)
402 if not chunk:
402 if not chunk:
403 break
403 break
404
404
405 yield stream.makeframe(requestid=requestid,
405 yield stream.makeframe(requestid=requestid,
406 typeid=FRAME_TYPE_COMMAND_RESPONSE,
406 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 flags=flags,
407 flags=flags,
408 payload=chunk)
408 payload=chunk)
409
409
410 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
410 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
411
411
412 def createcommandresponseokframe(stream, requestid):
412 def createcommandresponseokframe(stream, requestid):
413 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
413 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
414
414
415 return stream.makeframe(requestid=requestid,
415 return stream.makeframe(requestid=requestid,
416 typeid=FRAME_TYPE_COMMAND_RESPONSE,
416 typeid=FRAME_TYPE_COMMAND_RESPONSE,
417 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
417 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
418 payload=overall)
418 payload=overall)
419
419
420 def createcommandresponseeosframe(stream, requestid):
420 def createcommandresponseeosframe(stream, requestid):
421 """Create an empty payload frame representing command end-of-stream."""
421 """Create an empty payload frame representing command end-of-stream."""
422 return stream.makeframe(requestid=requestid,
422 return stream.makeframe(requestid=requestid,
423 typeid=FRAME_TYPE_COMMAND_RESPONSE,
423 typeid=FRAME_TYPE_COMMAND_RESPONSE,
424 flags=FLAG_COMMAND_RESPONSE_EOS,
424 flags=FLAG_COMMAND_RESPONSE_EOS,
425 payload=b'')
425 payload=b'')
426
426
427 def createcommanderrorresponse(stream, requestid, message, args=None):
427 def createcommanderrorresponse(stream, requestid, message, args=None):
428 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
428 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
429 # formatting works consistently?
429 # formatting works consistently?
430 m = {
430 m = {
431 b'status': b'error',
431 b'status': b'error',
432 b'error': {
432 b'error': {
433 b'message': message,
433 b'message': message,
434 }
434 }
435 }
435 }
436
436
437 if args:
437 if args:
438 m[b'error'][b'args'] = args
438 m[b'error'][b'args'] = args
439
439
440 overall = b''.join(cborutil.streamencode(m))
440 overall = b''.join(cborutil.streamencode(m))
441
441
442 yield stream.makeframe(requestid=requestid,
442 yield stream.makeframe(requestid=requestid,
443 typeid=FRAME_TYPE_COMMAND_RESPONSE,
443 typeid=FRAME_TYPE_COMMAND_RESPONSE,
444 flags=FLAG_COMMAND_RESPONSE_EOS,
444 flags=FLAG_COMMAND_RESPONSE_EOS,
445 payload=overall)
445 payload=overall)
446
446
447 def createerrorframe(stream, requestid, msg, errtype):
447 def createerrorframe(stream, requestid, msg, errtype):
448 # TODO properly handle frame size limits.
448 # TODO properly handle frame size limits.
449 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
449 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
450
450
451 payload = b''.join(cborutil.streamencode({
451 payload = b''.join(cborutil.streamencode({
452 b'type': errtype,
452 b'type': errtype,
453 b'message': [{b'msg': msg}],
453 b'message': [{b'msg': msg}],
454 }))
454 }))
455
455
456 yield stream.makeframe(requestid=requestid,
456 yield stream.makeframe(requestid=requestid,
457 typeid=FRAME_TYPE_ERROR_RESPONSE,
457 typeid=FRAME_TYPE_ERROR_RESPONSE,
458 flags=0,
458 flags=0,
459 payload=payload)
459 payload=payload)
460
460
461 def createtextoutputframe(stream, requestid, atoms,
461 def createtextoutputframe(stream, requestid, atoms,
462 maxframesize=DEFAULT_MAX_FRAME_SIZE):
462 maxframesize=DEFAULT_MAX_FRAME_SIZE):
463 """Create a text output frame to render text to people.
463 """Create a text output frame to render text to people.
464
464
465 ``atoms`` is a 3-tuple of (formatting string, args, labels).
465 ``atoms`` is a 3-tuple of (formatting string, args, labels).
466
466
467 The formatting string contains ``%s`` tokens to be replaced by the
467 The formatting string contains ``%s`` tokens to be replaced by the
468 corresponding indexed entry in ``args``. ``labels`` is an iterable of
468 corresponding indexed entry in ``args``. ``labels`` is an iterable of
469 formatters to be applied at rendering time. In terms of the ``ui``
469 formatters to be applied at rendering time. In terms of the ``ui``
470 class, each atom corresponds to a ``ui.write()``.
470 class, each atom corresponds to a ``ui.write()``.
471 """
471 """
472 atomdicts = []
472 atomdicts = []
473
473
474 for (formatting, args, labels) in atoms:
474 for (formatting, args, labels) in atoms:
475 # TODO look for localstr, other types here?
475 # TODO look for localstr, other types here?
476
476
477 if not isinstance(formatting, bytes):
477 if not isinstance(formatting, bytes):
478 raise ValueError('must use bytes formatting strings')
478 raise ValueError('must use bytes formatting strings')
479 for arg in args:
479 for arg in args:
480 if not isinstance(arg, bytes):
480 if not isinstance(arg, bytes):
481 raise ValueError('must use bytes for arguments')
481 raise ValueError('must use bytes for arguments')
482 for label in labels:
482 for label in labels:
483 if not isinstance(label, bytes):
483 if not isinstance(label, bytes):
484 raise ValueError('must use bytes for labels')
484 raise ValueError('must use bytes for labels')
485
485
486 # Formatting string must be ASCII.
486 # Formatting string must be ASCII.
487 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
487 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
488
488
489 # Arguments must be UTF-8.
489 # Arguments must be UTF-8.
490 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
490 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
491
491
492 # Labels must be ASCII.
492 # Labels must be ASCII.
493 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
493 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
494 for l in labels]
494 for l in labels]
495
495
496 atom = {b'msg': formatting}
496 atom = {b'msg': formatting}
497 if args:
497 if args:
498 atom[b'args'] = args
498 atom[b'args'] = args
499 if labels:
499 if labels:
500 atom[b'labels'] = labels
500 atom[b'labels'] = labels
501
501
502 atomdicts.append(atom)
502 atomdicts.append(atom)
503
503
504 payload = b''.join(cborutil.streamencode(atomdicts))
504 payload = b''.join(cborutil.streamencode(atomdicts))
505
505
506 if len(payload) > maxframesize:
506 if len(payload) > maxframesize:
507 raise ValueError('cannot encode data in a single frame')
507 raise ValueError('cannot encode data in a single frame')
508
508
509 yield stream.makeframe(requestid=requestid,
509 yield stream.makeframe(requestid=requestid,
510 typeid=FRAME_TYPE_TEXT_OUTPUT,
510 typeid=FRAME_TYPE_TEXT_OUTPUT,
511 flags=0,
511 flags=0,
512 payload=payload)
512 payload=payload)
513
513
514 class bufferingcommandresponseemitter(object):
514 class bufferingcommandresponseemitter(object):
515 """Helper object to emit command response frames intelligently.
515 """Helper object to emit command response frames intelligently.
516
516
517 Raw command response data is likely emitted in chunks much smaller
517 Raw command response data is likely emitted in chunks much smaller
518 than what can fit in a single frame. This class exists to buffer
518 than what can fit in a single frame. This class exists to buffer
519 chunks until enough data is available to fit in a single frame.
519 chunks until enough data is available to fit in a single frame.
520
520
521 TODO we'll need something like this when compression is supported.
521 TODO we'll need something like this when compression is supported.
522 So it might make sense to implement this functionality at the stream
522 So it might make sense to implement this functionality at the stream
523 level.
523 level.
524 """
524 """
525 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
525 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
526 self._stream = stream
526 self._stream = stream
527 self._requestid = requestid
527 self._requestid = requestid
528 self._maxsize = maxframesize
528 self._maxsize = maxframesize
529 self._chunks = []
529 self._chunks = []
530 self._chunkssize = 0
530 self._chunkssize = 0
531
531
532 def send(self, data):
532 def send(self, data):
533 """Send new data for emission.
533 """Send new data for emission.
534
534
535 Is a generator of new frames that were derived from the new input.
535 Is a generator of new frames that were derived from the new input.
536
536
537 If the special input ``None`` is received, flushes all buffered
537 If the special input ``None`` is received, flushes all buffered
538 data to frames.
538 data to frames.
539 """
539 """
540
540
541 if data is None:
541 if data is None:
542 for frame in self._flush():
542 for frame in self._flush():
543 yield frame
543 yield frame
544 return
544 return
545
545
546 # There is a ton of potential to do more complicated things here.
546 # There is a ton of potential to do more complicated things here.
547 # Our immediate goal is to coalesce small chunks into big frames,
547 # Our immediate goal is to coalesce small chunks into big frames,
548 # not achieve the fewest number of frames possible. So we go with
548 # not achieve the fewest number of frames possible. So we go with
549 # a simple implementation:
549 # a simple implementation:
550 #
550 #
551 # * If a chunk is too large for a frame, we flush and emit frames
551 # * If a chunk is too large for a frame, we flush and emit frames
552 # for the new chunk.
552 # for the new chunk.
553 # * If a chunk can be buffered without total buffered size limits
553 # * If a chunk can be buffered without total buffered size limits
554 # being exceeded, we do that.
554 # being exceeded, we do that.
555 # * If a chunk causes us to go over our buffering limit, we flush
555 # * If a chunk causes us to go over our buffering limit, we flush
556 # and then buffer the new chunk.
556 # and then buffer the new chunk.
557
557
558 if len(data) > self._maxsize:
558 if len(data) > self._maxsize:
559 for frame in self._flush():
559 for frame in self._flush():
560 yield frame
560 yield frame
561
561
562 # Now emit frames for the big chunk.
562 # Now emit frames for the big chunk.
563 offset = 0
563 offset = 0
564 while True:
564 while True:
565 chunk = data[offset:offset + self._maxsize]
565 chunk = data[offset:offset + self._maxsize]
566 offset += len(chunk)
566 offset += len(chunk)
567
567
568 yield self._stream.makeframe(
568 yield self._stream.makeframe(
569 self._requestid,
569 self._requestid,
570 typeid=FRAME_TYPE_COMMAND_RESPONSE,
570 typeid=FRAME_TYPE_COMMAND_RESPONSE,
571 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
571 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
572 payload=chunk)
572 payload=chunk)
573
573
574 if offset == len(data):
574 if offset == len(data):
575 return
575 return
576
576
577 # If we don't have enough to constitute a full frame, buffer and
577 # If we don't have enough to constitute a full frame, buffer and
578 # return.
578 # return.
579 if len(data) + self._chunkssize < self._maxsize:
579 if len(data) + self._chunkssize < self._maxsize:
580 self._chunks.append(data)
580 self._chunks.append(data)
581 self._chunkssize += len(data)
581 self._chunkssize += len(data)
582 return
582 return
583
583
584 # Else flush what we have and buffer the new chunk. We could do
584 # Else flush what we have and buffer the new chunk. We could do
585 # something more intelligent here, like break the chunk. Let's
585 # something more intelligent here, like break the chunk. Let's
586 # keep things simple for now.
586 # keep things simple for now.
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589
589
590 self._chunks.append(data)
590 self._chunks.append(data)
591 self._chunkssize = len(data)
591 self._chunkssize = len(data)
592
592
593 def _flush(self):
593 def _flush(self):
594 payload = b''.join(self._chunks)
594 payload = b''.join(self._chunks)
595 assert len(payload) <= self._maxsize
595 assert len(payload) <= self._maxsize
596
596
597 self._chunks[:] = []
597 self._chunks[:] = []
598 self._chunkssize = 0
598 self._chunkssize = 0
599
599
600 yield self._stream.makeframe(
600 yield self._stream.makeframe(
601 self._requestid,
601 self._requestid,
602 typeid=FRAME_TYPE_COMMAND_RESPONSE,
602 typeid=FRAME_TYPE_COMMAND_RESPONSE,
603 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
603 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
604 payload=payload)
604 payload=payload)
605
605
606 class stream(object):
606 class stream(object):
607 """Represents a logical unidirectional series of frames."""
607 """Represents a logical unidirectional series of frames."""
608
608
609 def __init__(self, streamid, active=False):
609 def __init__(self, streamid, active=False):
610 self.streamid = streamid
610 self.streamid = streamid
611 self._active = active
611 self._active = active
612
612
613 def makeframe(self, requestid, typeid, flags, payload):
613 def makeframe(self, requestid, typeid, flags, payload):
614 """Create a frame to be sent out over this stream.
614 """Create a frame to be sent out over this stream.
615
615
616 Only returns the frame instance. Does not actually send it.
616 Only returns the frame instance. Does not actually send it.
617 """
617 """
618 streamflags = 0
618 streamflags = 0
619 if not self._active:
619 if not self._active:
620 streamflags |= STREAM_FLAG_BEGIN_STREAM
620 streamflags |= STREAM_FLAG_BEGIN_STREAM
621 self._active = True
621 self._active = True
622
622
623 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
623 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
624 payload)
624 payload)
625
625
626 def ensureserverstream(stream):
626 def ensureserverstream(stream):
627 if stream.streamid % 2:
627 if stream.streamid % 2:
628 raise error.ProgrammingError('server should only write to even '
628 raise error.ProgrammingError('server should only write to even '
629 'numbered streams; %d is not even' %
629 'numbered streams; %d is not even' %
630 stream.streamid)
630 stream.streamid)
631
631
632 class serverreactor(object):
632 class serverreactor(object):
633 """Holds state of a server handling frame-based protocol requests.
633 """Holds state of a server handling frame-based protocol requests.
634
634
635 This class is the "brain" of the unified frame-based protocol server
635 This class is the "brain" of the unified frame-based protocol server
636 component. While the protocol is stateless from the perspective of
636 component. While the protocol is stateless from the perspective of
637 requests/commands, something needs to track which frames have been
637 requests/commands, something needs to track which frames have been
638 received, what frames to expect, etc. This class is that thing.
638 received, what frames to expect, etc. This class is that thing.
639
639
640 Instances are modeled as a state machine of sorts. Instances are also
640 Instances are modeled as a state machine of sorts. Instances are also
641 reactionary to external events. The point of this class is to encapsulate
641 reactionary to external events. The point of this class is to encapsulate
642 the state of the connection and the exchange of frames, not to perform
642 the state of the connection and the exchange of frames, not to perform
643 work. Instead, callers tell this class when something occurs, like a
643 work. Instead, callers tell this class when something occurs, like a
644 frame arriving. If that activity is worthy of a follow-up action (say
644 frame arriving. If that activity is worthy of a follow-up action (say
645 *run a command*), the return value of that handler will say so.
645 *run a command*), the return value of that handler will say so.
646
646
647 I/O and CPU intensive operations are purposefully delegated outside of
647 I/O and CPU intensive operations are purposefully delegated outside of
648 this class.
648 this class.
649
649
650 Consumers are expected to tell instances when events occur. They do so by
650 Consumers are expected to tell instances when events occur. They do so by
651 calling the various ``on*`` methods. These methods return a 2-tuple
651 calling the various ``on*`` methods. These methods return a 2-tuple
652 describing any follow-up action(s) to take. The first element is the
652 describing any follow-up action(s) to take. The first element is the
653 name of an action to perform. The second is a data structure (usually
653 name of an action to perform. The second is a data structure (usually
654 a dict) specific to that action that contains more information. e.g.
654 a dict) specific to that action that contains more information. e.g.
655 if the server wants to send frames back to the client, the data structure
655 if the server wants to send frames back to the client, the data structure
656 will contain a reference to those frames.
656 will contain a reference to those frames.
657
657
658 Valid actions that consumers can be instructed to take are:
658 Valid actions that consumers can be instructed to take are:
659
659
660 sendframes
660 sendframes
661 Indicates that frames should be sent to the client. The ``framegen``
661 Indicates that frames should be sent to the client. The ``framegen``
662 key contains a generator of frames that should be sent. The server
662 key contains a generator of frames that should be sent. The server
663 assumes that all frames are sent to the client.
663 assumes that all frames are sent to the client.
664
664
665 error
665 error
666 Indicates that an error occurred. Consumer should probably abort.
666 Indicates that an error occurred. Consumer should probably abort.
667
667
668 runcommand
668 runcommand
669 Indicates that the consumer should run a wire protocol command. Details
669 Indicates that the consumer should run a wire protocol command. Details
670 of the command to run are given in the data structure.
670 of the command to run are given in the data structure.
671
671
672 wantframe
672 wantframe
673 Indicates that nothing of interest happened and the server is waiting on
673 Indicates that nothing of interest happened and the server is waiting on
674 more frames from the client before anything interesting can be done.
674 more frames from the client before anything interesting can be done.
675
675
676 noop
676 noop
677 Indicates no additional action is required.
677 Indicates no additional action is required.
678
678
679 Known Issues
679 Known Issues
680 ------------
680 ------------
681
681
682 There are no limits to the number of partially received commands or their
682 There are no limits to the number of partially received commands or their
683 size. A malicious client could stream command request data and exhaust the
683 size. A malicious client could stream command request data and exhaust the
684 server's memory.
684 server's memory.
685
685
686 Partially received commands are not acted upon when end of input is
686 Partially received commands are not acted upon when end of input is
687 reached. Should the server error if it receives a partial request?
687 reached. Should the server error if it receives a partial request?
688 Should the client send a message to abort a partially transmitted request
688 Should the client send a message to abort a partially transmitted request
689 to facilitate graceful shutdown?
689 to facilitate graceful shutdown?
690
690
691 Active requests that haven't been responded to aren't tracked. This means
691 Active requests that haven't been responded to aren't tracked. This means
692 that if we receive a command and instruct its dispatch, another command
692 that if we receive a command and instruct its dispatch, another command
693 with its request ID can come in over the wire and there will be a race
693 with its request ID can come in over the wire and there will be a race
694 between who responds to what.
694 between who responds to what.
695 """
695 """
696
696
697 def __init__(self, deferoutput=False):
697 def __init__(self, deferoutput=False):
698 """Construct a new server reactor.
698 """Construct a new server reactor.
699
699
700 ``deferoutput`` can be used to indicate that no output frames should be
700 ``deferoutput`` can be used to indicate that no output frames should be
701 instructed to be sent until input has been exhausted. In this mode,
701 instructed to be sent until input has been exhausted. In this mode,
702 events that would normally generate output frames (such as a command
702 events that would normally generate output frames (such as a command
703 response being ready) will instead defer instructing the consumer to
703 response being ready) will instead defer instructing the consumer to
704 send those frames. This is useful for half-duplex transports where the
704 send those frames. This is useful for half-duplex transports where the
705 sender cannot receive until all data has been transmitted.
705 sender cannot receive until all data has been transmitted.
706 """
706 """
707 self._deferoutput = deferoutput
707 self._deferoutput = deferoutput
708 self._state = 'idle'
708 self._state = 'idle'
709 self._nextoutgoingstreamid = 2
709 self._nextoutgoingstreamid = 2
710 self._bufferedframegens = []
710 self._bufferedframegens = []
711 # stream id -> stream instance for all active streams from the client.
711 # stream id -> stream instance for all active streams from the client.
712 self._incomingstreams = {}
712 self._incomingstreams = {}
713 self._outgoingstreams = {}
713 self._outgoingstreams = {}
714 # request id -> dict of commands that are actively being received.
714 # request id -> dict of commands that are actively being received.
715 self._receivingcommands = {}
715 self._receivingcommands = {}
716 # Request IDs that have been received and are actively being processed.
716 # Request IDs that have been received and are actively being processed.
717 # Once all output for a request has been sent, it is removed from this
717 # Once all output for a request has been sent, it is removed from this
718 # set.
718 # set.
719 self._activecommands = set()
719 self._activecommands = set()
720
720
721 def onframerecv(self, frame):
721 def onframerecv(self, frame):
722 """Process a frame that has been received off the wire.
722 """Process a frame that has been received off the wire.
723
723
724 Returns a dict with an ``action`` key that details what action,
724 Returns a dict with an ``action`` key that details what action,
725 if any, the consumer should take next.
725 if any, the consumer should take next.
726 """
726 """
727 if not frame.streamid % 2:
727 if not frame.streamid % 2:
728 self._state = 'errored'
728 self._state = 'errored'
729 return self._makeerrorresult(
729 return self._makeerrorresult(
730 _('received frame with even numbered stream ID: %d') %
730 _('received frame with even numbered stream ID: %d') %
731 frame.streamid)
731 frame.streamid)
732
732
733 if frame.streamid not in self._incomingstreams:
733 if frame.streamid not in self._incomingstreams:
734 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
734 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
735 self._state = 'errored'
735 self._state = 'errored'
736 return self._makeerrorresult(
736 return self._makeerrorresult(
737 _('received frame on unknown inactive stream without '
737 _('received frame on unknown inactive stream without '
738 'beginning of stream flag set'))
738 'beginning of stream flag set'))
739
739
740 self._incomingstreams[frame.streamid] = stream(frame.streamid)
740 self._incomingstreams[frame.streamid] = stream(frame.streamid)
741
741
742 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
742 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
743 # TODO handle decoding frames
743 # TODO handle decoding frames
744 self._state = 'errored'
744 self._state = 'errored'
745 raise error.ProgrammingError('support for decoding stream payloads '
745 raise error.ProgrammingError('support for decoding stream payloads '
746 'not yet implemented')
746 'not yet implemented')
747
747
748 if frame.streamflags & STREAM_FLAG_END_STREAM:
748 if frame.streamflags & STREAM_FLAG_END_STREAM:
749 del self._incomingstreams[frame.streamid]
749 del self._incomingstreams[frame.streamid]
750
750
751 handlers = {
751 handlers = {
752 'idle': self._onframeidle,
752 'idle': self._onframeidle,
753 'command-receiving': self._onframecommandreceiving,
753 'command-receiving': self._onframecommandreceiving,
754 'errored': self._onframeerrored,
754 'errored': self._onframeerrored,
755 }
755 }
756
756
757 meth = handlers.get(self._state)
757 meth = handlers.get(self._state)
758 if not meth:
758 if not meth:
759 raise error.ProgrammingError('unhandled state: %s' % self._state)
759 raise error.ProgrammingError('unhandled state: %s' % self._state)
760
760
761 return meth(frame)
761 return meth(frame)
762
762
763 def oncommandresponseready(self, stream, requestid, data):
763 def oncommandresponseready(self, stream, requestid, data):
764 """Signal that a bytes response is ready to be sent to the client.
764 """Signal that a bytes response is ready to be sent to the client.
765
765
766 The raw bytes response is passed as an argument.
766 The raw bytes response is passed as an argument.
767 """
767 """
768 ensureserverstream(stream)
768 ensureserverstream(stream)
769
769
770 def sendframes():
770 def sendframes():
771 for frame in createcommandresponseframesfrombytes(stream, requestid,
771 for frame in createcommandresponseframesfrombytes(stream, requestid,
772 data):
772 data):
773 yield frame
773 yield frame
774
774
775 self._activecommands.remove(requestid)
775 self._activecommands.remove(requestid)
776
776
777 result = sendframes()
777 result = sendframes()
778
778
779 if self._deferoutput:
779 if self._deferoutput:
780 self._bufferedframegens.append(result)
780 self._bufferedframegens.append(result)
781 return 'noop', {}
781 return 'noop', {}
782 else:
782 else:
783 return 'sendframes', {
783 return 'sendframes', {
784 'framegen': result,
784 'framegen': result,
785 }
785 }
786
786
787 def oncommandresponsereadyobjects(self, stream, requestid, objs):
787 def oncommandresponsereadyobjects(self, stream, requestid, objs):
788 """Signal that objects are ready to be sent to the client.
788 """Signal that objects are ready to be sent to the client.
789
789
790 ``objs`` is an iterable of objects (typically a generator) that will
790 ``objs`` is an iterable of objects (typically a generator) that will
791 be encoded via CBOR and added to frames, which will be sent to the
791 be encoded via CBOR and added to frames, which will be sent to the
792 client.
792 client.
793 """
793 """
794 ensureserverstream(stream)
794 ensureserverstream(stream)
795
795
796 # We need to take care over exception handling. Uncaught exceptions
796 # We need to take care over exception handling. Uncaught exceptions
797 # when generating frames could lead to premature end of the frame
797 # when generating frames could lead to premature end of the frame
798 # stream and the possibility of the server or client process getting
798 # stream and the possibility of the server or client process getting
799 # in a bad state.
799 # in a bad state.
800 #
800 #
801 # Keep in mind that if ``objs`` is a generator, advancing it could
801 # Keep in mind that if ``objs`` is a generator, advancing it could
802 # raise exceptions that originated in e.g. wire protocol command
802 # raise exceptions that originated in e.g. wire protocol command
803 # functions. That is why we differentiate between exceptions raised
803 # functions. That is why we differentiate between exceptions raised
804 # when iterating versus other exceptions that occur.
804 # when iterating versus other exceptions that occur.
805 #
805 #
806 # In all cases, when the function finishes, the request is fully
806 # In all cases, when the function finishes, the request is fully
807 # handled and no new frames for it should be seen.
807 # handled and no new frames for it should be seen.
808
808
809 def sendframes():
809 def sendframes():
810 emitted = False
810 emitted = False
811 emitter = bufferingcommandresponseemitter(stream, requestid)
811 emitter = bufferingcommandresponseemitter(stream, requestid)
812 while True:
812 while True:
813 try:
813 try:
814 o = next(objs)
814 o = next(objs)
815 except StopIteration:
815 except StopIteration:
816 for frame in emitter.send(None):
816 for frame in emitter.send(None):
817 yield frame
817 yield frame
818
818
819 if emitted:
819 if emitted:
820 yield createcommandresponseeosframe(stream, requestid)
820 yield createcommandresponseeosframe(stream, requestid)
821 break
821 break
822
822
823 except error.WireprotoCommandError as e:
823 except error.WireprotoCommandError as e:
824 for frame in createcommanderrorresponse(
824 for frame in createcommanderrorresponse(
825 stream, requestid, e.message, e.messageargs):
825 stream, requestid, e.message, e.messageargs):
826 yield frame
826 yield frame
827 break
827 break
828
828
829 except Exception as e:
829 except Exception as e:
830 for frame in createerrorframe(stream, requestid,
830 for frame in createerrorframe(
831 '%s' % e,
831 stream, requestid, '%s' % stringutil.forcebytestr(e),
832 errtype='server'):
832 errtype='server'):
833
833 yield frame
834 yield frame
834
835
835 break
836 break
836
837
837 try:
838 try:
838 if not emitted:
839 if not emitted:
839 yield createcommandresponseokframe(stream, requestid)
840 yield createcommandresponseokframe(stream, requestid)
840 emitted = True
841 emitted = True
841
842
842 for chunk in cborutil.streamencode(o):
843 for chunk in cborutil.streamencode(o):
843 for frame in emitter.send(chunk):
844 for frame in emitter.send(chunk):
844 yield frame
845 yield frame
845
846
846 except Exception as e:
847 except Exception as e:
847 for frame in createerrorframe(stream, requestid,
848 for frame in createerrorframe(stream, requestid,
848 '%s' % e,
849 '%s' % e,
849 errtype='server'):
850 errtype='server'):
850 yield frame
851 yield frame
851
852
852 break
853 break
853
854
854 self._activecommands.remove(requestid)
855 self._activecommands.remove(requestid)
855
856
856 return self._handlesendframes(sendframes())
857 return self._handlesendframes(sendframes())
857
858
858 def oninputeof(self):
859 def oninputeof(self):
859 """Signals that end of input has been received.
860 """Signals that end of input has been received.
860
861
861 No more frames will be received. All pending activity should be
862 No more frames will be received. All pending activity should be
862 completed.
863 completed.
863 """
864 """
864 # TODO should we do anything about in-flight commands?
865 # TODO should we do anything about in-flight commands?
865
866
866 if not self._deferoutput or not self._bufferedframegens:
867 if not self._deferoutput or not self._bufferedframegens:
867 return 'noop', {}
868 return 'noop', {}
868
869
869 # If we buffered all our responses, emit those.
870 # If we buffered all our responses, emit those.
870 def makegen():
871 def makegen():
871 for gen in self._bufferedframegens:
872 for gen in self._bufferedframegens:
872 for frame in gen:
873 for frame in gen:
873 yield frame
874 yield frame
874
875
875 return 'sendframes', {
876 return 'sendframes', {
876 'framegen': makegen(),
877 'framegen': makegen(),
877 }
878 }
878
879
879 def _handlesendframes(self, framegen):
880 def _handlesendframes(self, framegen):
880 if self._deferoutput:
881 if self._deferoutput:
881 self._bufferedframegens.append(framegen)
882 self._bufferedframegens.append(framegen)
882 return 'noop', {}
883 return 'noop', {}
883 else:
884 else:
884 return 'sendframes', {
885 return 'sendframes', {
885 'framegen': framegen,
886 'framegen': framegen,
886 }
887 }
887
888
888 def onservererror(self, stream, requestid, msg):
889 def onservererror(self, stream, requestid, msg):
889 ensureserverstream(stream)
890 ensureserverstream(stream)
890
891
891 def sendframes():
892 def sendframes():
892 for frame in createerrorframe(stream, requestid, msg,
893 for frame in createerrorframe(stream, requestid, msg,
893 errtype='server'):
894 errtype='server'):
894 yield frame
895 yield frame
895
896
896 self._activecommands.remove(requestid)
897 self._activecommands.remove(requestid)
897
898
898 return self._handlesendframes(sendframes())
899 return self._handlesendframes(sendframes())
899
900
900 def oncommanderror(self, stream, requestid, message, args=None):
901 def oncommanderror(self, stream, requestid, message, args=None):
901 """Called when a command encountered an error before sending output."""
902 """Called when a command encountered an error before sending output."""
902 ensureserverstream(stream)
903 ensureserverstream(stream)
903
904
904 def sendframes():
905 def sendframes():
905 for frame in createcommanderrorresponse(stream, requestid, message,
906 for frame in createcommanderrorresponse(stream, requestid, message,
906 args):
907 args):
907 yield frame
908 yield frame
908
909
909 self._activecommands.remove(requestid)
910 self._activecommands.remove(requestid)
910
911
911 return self._handlesendframes(sendframes())
912 return self._handlesendframes(sendframes())
912
913
913 def makeoutputstream(self):
914 def makeoutputstream(self):
914 """Create a stream to be used for sending data to the client."""
915 """Create a stream to be used for sending data to the client."""
915 streamid = self._nextoutgoingstreamid
916 streamid = self._nextoutgoingstreamid
916 self._nextoutgoingstreamid += 2
917 self._nextoutgoingstreamid += 2
917
918
918 s = stream(streamid)
919 s = stream(streamid)
919 self._outgoingstreams[streamid] = s
920 self._outgoingstreams[streamid] = s
920
921
921 return s
922 return s
922
923
923 def _makeerrorresult(self, msg):
924 def _makeerrorresult(self, msg):
924 return 'error', {
925 return 'error', {
925 'message': msg,
926 'message': msg,
926 }
927 }
927
928
928 def _makeruncommandresult(self, requestid):
929 def _makeruncommandresult(self, requestid):
929 entry = self._receivingcommands[requestid]
930 entry = self._receivingcommands[requestid]
930
931
931 if not entry['requestdone']:
932 if not entry['requestdone']:
932 self._state = 'errored'
933 self._state = 'errored'
933 raise error.ProgrammingError('should not be called without '
934 raise error.ProgrammingError('should not be called without '
934 'requestdone set')
935 'requestdone set')
935
936
936 del self._receivingcommands[requestid]
937 del self._receivingcommands[requestid]
937
938
938 if self._receivingcommands:
939 if self._receivingcommands:
939 self._state = 'command-receiving'
940 self._state = 'command-receiving'
940 else:
941 else:
941 self._state = 'idle'
942 self._state = 'idle'
942
943
943 # Decode the payloads as CBOR.
944 # Decode the payloads as CBOR.
944 entry['payload'].seek(0)
945 entry['payload'].seek(0)
945 request = cborutil.decodeall(entry['payload'].getvalue())[0]
946 request = cborutil.decodeall(entry['payload'].getvalue())[0]
946
947
947 if b'name' not in request:
948 if b'name' not in request:
948 self._state = 'errored'
949 self._state = 'errored'
949 return self._makeerrorresult(
950 return self._makeerrorresult(
950 _('command request missing "name" field'))
951 _('command request missing "name" field'))
951
952
952 if b'args' not in request:
953 if b'args' not in request:
953 request[b'args'] = {}
954 request[b'args'] = {}
954
955
955 assert requestid not in self._activecommands
956 assert requestid not in self._activecommands
956 self._activecommands.add(requestid)
957 self._activecommands.add(requestid)
957
958
958 return 'runcommand', {
959 return 'runcommand', {
959 'requestid': requestid,
960 'requestid': requestid,
960 'command': request[b'name'],
961 'command': request[b'name'],
961 'args': request[b'args'],
962 'args': request[b'args'],
962 'data': entry['data'].getvalue() if entry['data'] else None,
963 'data': entry['data'].getvalue() if entry['data'] else None,
963 }
964 }
964
965
965 def _makewantframeresult(self):
966 def _makewantframeresult(self):
966 return 'wantframe', {
967 return 'wantframe', {
967 'state': self._state,
968 'state': self._state,
968 }
969 }
969
970
970 def _validatecommandrequestframe(self, frame):
971 def _validatecommandrequestframe(self, frame):
971 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
972 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
972 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
973 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
973
974
974 if new and continuation:
975 if new and continuation:
975 self._state = 'errored'
976 self._state = 'errored'
976 return self._makeerrorresult(
977 return self._makeerrorresult(
977 _('received command request frame with both new and '
978 _('received command request frame with both new and '
978 'continuation flags set'))
979 'continuation flags set'))
979
980
980 if not new and not continuation:
981 if not new and not continuation:
981 self._state = 'errored'
982 self._state = 'errored'
982 return self._makeerrorresult(
983 return self._makeerrorresult(
983 _('received command request frame with neither new nor '
984 _('received command request frame with neither new nor '
984 'continuation flags set'))
985 'continuation flags set'))
985
986
986 def _onframeidle(self, frame):
987 def _onframeidle(self, frame):
987 # The only frame type that should be received in this state is a
988 # The only frame type that should be received in this state is a
988 # command request.
989 # command request.
989 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
990 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
990 self._state = 'errored'
991 self._state = 'errored'
991 return self._makeerrorresult(
992 return self._makeerrorresult(
992 _('expected command request frame; got %d') % frame.typeid)
993 _('expected command request frame; got %d') % frame.typeid)
993
994
994 res = self._validatecommandrequestframe(frame)
995 res = self._validatecommandrequestframe(frame)
995 if res:
996 if res:
996 return res
997 return res
997
998
998 if frame.requestid in self._receivingcommands:
999 if frame.requestid in self._receivingcommands:
999 self._state = 'errored'
1000 self._state = 'errored'
1000 return self._makeerrorresult(
1001 return self._makeerrorresult(
1001 _('request with ID %d already received') % frame.requestid)
1002 _('request with ID %d already received') % frame.requestid)
1002
1003
1003 if frame.requestid in self._activecommands:
1004 if frame.requestid in self._activecommands:
1004 self._state = 'errored'
1005 self._state = 'errored'
1005 return self._makeerrorresult(
1006 return self._makeerrorresult(
1006 _('request with ID %d is already active') % frame.requestid)
1007 _('request with ID %d is already active') % frame.requestid)
1007
1008
1008 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1009 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1009 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1010 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1010 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1011 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1011
1012
1012 if not new:
1013 if not new:
1013 self._state = 'errored'
1014 self._state = 'errored'
1014 return self._makeerrorresult(
1015 return self._makeerrorresult(
1015 _('received command request frame without new flag set'))
1016 _('received command request frame without new flag set'))
1016
1017
1017 payload = util.bytesio()
1018 payload = util.bytesio()
1018 payload.write(frame.payload)
1019 payload.write(frame.payload)
1019
1020
1020 self._receivingcommands[frame.requestid] = {
1021 self._receivingcommands[frame.requestid] = {
1021 'payload': payload,
1022 'payload': payload,
1022 'data': None,
1023 'data': None,
1023 'requestdone': not moreframes,
1024 'requestdone': not moreframes,
1024 'expectingdata': bool(expectingdata),
1025 'expectingdata': bool(expectingdata),
1025 }
1026 }
1026
1027
1027 # This is the final frame for this request. Dispatch it.
1028 # This is the final frame for this request. Dispatch it.
1028 if not moreframes and not expectingdata:
1029 if not moreframes and not expectingdata:
1029 return self._makeruncommandresult(frame.requestid)
1030 return self._makeruncommandresult(frame.requestid)
1030
1031
1031 assert moreframes or expectingdata
1032 assert moreframes or expectingdata
1032 self._state = 'command-receiving'
1033 self._state = 'command-receiving'
1033 return self._makewantframeresult()
1034 return self._makewantframeresult()
1034
1035
1035 def _onframecommandreceiving(self, frame):
1036 def _onframecommandreceiving(self, frame):
1036 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1037 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1037 # Process new command requests as such.
1038 # Process new command requests as such.
1038 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1039 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1039 return self._onframeidle(frame)
1040 return self._onframeidle(frame)
1040
1041
1041 res = self._validatecommandrequestframe(frame)
1042 res = self._validatecommandrequestframe(frame)
1042 if res:
1043 if res:
1043 return res
1044 return res
1044
1045
1045 # All other frames should be related to a command that is currently
1046 # All other frames should be related to a command that is currently
1046 # receiving but is not active.
1047 # receiving but is not active.
1047 if frame.requestid in self._activecommands:
1048 if frame.requestid in self._activecommands:
1048 self._state = 'errored'
1049 self._state = 'errored'
1049 return self._makeerrorresult(
1050 return self._makeerrorresult(
1050 _('received frame for request that is still active: %d') %
1051 _('received frame for request that is still active: %d') %
1051 frame.requestid)
1052 frame.requestid)
1052
1053
1053 if frame.requestid not in self._receivingcommands:
1054 if frame.requestid not in self._receivingcommands:
1054 self._state = 'errored'
1055 self._state = 'errored'
1055 return self._makeerrorresult(
1056 return self._makeerrorresult(
1056 _('received frame for request that is not receiving: %d') %
1057 _('received frame for request that is not receiving: %d') %
1057 frame.requestid)
1058 frame.requestid)
1058
1059
1059 entry = self._receivingcommands[frame.requestid]
1060 entry = self._receivingcommands[frame.requestid]
1060
1061
1061 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1062 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1062 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1063 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1063 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1064 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1064
1065
1065 if entry['requestdone']:
1066 if entry['requestdone']:
1066 self._state = 'errored'
1067 self._state = 'errored'
1067 return self._makeerrorresult(
1068 return self._makeerrorresult(
1068 _('received command request frame when request frames '
1069 _('received command request frame when request frames '
1069 'were supposedly done'))
1070 'were supposedly done'))
1070
1071
1071 if expectingdata != entry['expectingdata']:
1072 if expectingdata != entry['expectingdata']:
1072 self._state = 'errored'
1073 self._state = 'errored'
1073 return self._makeerrorresult(
1074 return self._makeerrorresult(
1074 _('mismatch between expect data flag and previous frame'))
1075 _('mismatch between expect data flag and previous frame'))
1075
1076
1076 entry['payload'].write(frame.payload)
1077 entry['payload'].write(frame.payload)
1077
1078
1078 if not moreframes:
1079 if not moreframes:
1079 entry['requestdone'] = True
1080 entry['requestdone'] = True
1080
1081
1081 if not moreframes and not expectingdata:
1082 if not moreframes and not expectingdata:
1082 return self._makeruncommandresult(frame.requestid)
1083 return self._makeruncommandresult(frame.requestid)
1083
1084
1084 return self._makewantframeresult()
1085 return self._makewantframeresult()
1085
1086
1086 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1087 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1087 if not entry['expectingdata']:
1088 if not entry['expectingdata']:
1088 self._state = 'errored'
1089 self._state = 'errored'
1089 return self._makeerrorresult(_(
1090 return self._makeerrorresult(_(
1090 'received command data frame for request that is not '
1091 'received command data frame for request that is not '
1091 'expecting data: %d') % frame.requestid)
1092 'expecting data: %d') % frame.requestid)
1092
1093
1093 if entry['data'] is None:
1094 if entry['data'] is None:
1094 entry['data'] = util.bytesio()
1095 entry['data'] = util.bytesio()
1095
1096
1096 return self._handlecommanddataframe(frame, entry)
1097 return self._handlecommanddataframe(frame, entry)
1097 else:
1098 else:
1098 self._state = 'errored'
1099 self._state = 'errored'
1099 return self._makeerrorresult(_(
1100 return self._makeerrorresult(_(
1100 'received unexpected frame type: %d') % frame.typeid)
1101 'received unexpected frame type: %d') % frame.typeid)
1101
1102
1102 def _handlecommanddataframe(self, frame, entry):
1103 def _handlecommanddataframe(self, frame, entry):
1103 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1104 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1104
1105
1105 # TODO support streaming data instead of buffering it.
1106 # TODO support streaming data instead of buffering it.
1106 entry['data'].write(frame.payload)
1107 entry['data'].write(frame.payload)
1107
1108
1108 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1109 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1109 return self._makewantframeresult()
1110 return self._makewantframeresult()
1110 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1111 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1111 entry['data'].seek(0)
1112 entry['data'].seek(0)
1112 return self._makeruncommandresult(frame.requestid)
1113 return self._makeruncommandresult(frame.requestid)
1113 else:
1114 else:
1114 self._state = 'errored'
1115 self._state = 'errored'
1115 return self._makeerrorresult(_('command data frame without '
1116 return self._makeerrorresult(_('command data frame without '
1116 'flags'))
1117 'flags'))
1117
1118
1118 def _onframeerrored(self, frame):
1119 def _onframeerrored(self, frame):
1119 return self._makeerrorresult(_('server already errored'))
1120 return self._makeerrorresult(_('server already errored'))
1120
1121
1121 class commandrequest(object):
1122 class commandrequest(object):
1122 """Represents a request to run a command."""
1123 """Represents a request to run a command."""
1123
1124
1124 def __init__(self, requestid, name, args, datafh=None):
1125 def __init__(self, requestid, name, args, datafh=None):
1125 self.requestid = requestid
1126 self.requestid = requestid
1126 self.name = name
1127 self.name = name
1127 self.args = args
1128 self.args = args
1128 self.datafh = datafh
1129 self.datafh = datafh
1129 self.state = 'pending'
1130 self.state = 'pending'
1130
1131
1131 class clientreactor(object):
1132 class clientreactor(object):
1132 """Holds state of a client issuing frame-based protocol requests.
1133 """Holds state of a client issuing frame-based protocol requests.
1133
1134
1134 This is like ``serverreactor`` but for client-side state.
1135 This is like ``serverreactor`` but for client-side state.
1135
1136
1136 Each instance is bound to the lifetime of a connection. For persistent
1137 Each instance is bound to the lifetime of a connection. For persistent
1137 connection transports using e.g. TCP sockets and speaking the raw
1138 connection transports using e.g. TCP sockets and speaking the raw
1138 framing protocol, there will be a single instance for the lifetime of
1139 framing protocol, there will be a single instance for the lifetime of
1139 the TCP socket. For transports where there are multiple discrete
1140 the TCP socket. For transports where there are multiple discrete
1140 interactions (say tunneled within in HTTP request), there will be a
1141 interactions (say tunneled within in HTTP request), there will be a
1141 separate instance for each distinct interaction.
1142 separate instance for each distinct interaction.
1142 """
1143 """
1143 def __init__(self, hasmultiplesend=False, buffersends=True):
1144 def __init__(self, hasmultiplesend=False, buffersends=True):
1144 """Create a new instance.
1145 """Create a new instance.
1145
1146
1146 ``hasmultiplesend`` indicates whether multiple sends are supported
1147 ``hasmultiplesend`` indicates whether multiple sends are supported
1147 by the transport. When True, it is possible to send commands immediately
1148 by the transport. When True, it is possible to send commands immediately
1148 instead of buffering until the caller signals an intent to finish a
1149 instead of buffering until the caller signals an intent to finish a
1149 send operation.
1150 send operation.
1150
1151
1151 ``buffercommands`` indicates whether sends should be buffered until the
1152 ``buffercommands`` indicates whether sends should be buffered until the
1152 last request has been issued.
1153 last request has been issued.
1153 """
1154 """
1154 self._hasmultiplesend = hasmultiplesend
1155 self._hasmultiplesend = hasmultiplesend
1155 self._buffersends = buffersends
1156 self._buffersends = buffersends
1156
1157
1157 self._canissuecommands = True
1158 self._canissuecommands = True
1158 self._cansend = True
1159 self._cansend = True
1159
1160
1160 self._nextrequestid = 1
1161 self._nextrequestid = 1
1161 # We only support a single outgoing stream for now.
1162 # We only support a single outgoing stream for now.
1162 self._outgoingstream = stream(1)
1163 self._outgoingstream = stream(1)
1163 self._pendingrequests = collections.deque()
1164 self._pendingrequests = collections.deque()
1164 self._activerequests = {}
1165 self._activerequests = {}
1165 self._incomingstreams = {}
1166 self._incomingstreams = {}
1166
1167
1167 def callcommand(self, name, args, datafh=None):
1168 def callcommand(self, name, args, datafh=None):
1168 """Request that a command be executed.
1169 """Request that a command be executed.
1169
1170
1170 Receives the command name, a dict of arguments to pass to the command,
1171 Receives the command name, a dict of arguments to pass to the command,
1171 and an optional file object containing the raw data for the command.
1172 and an optional file object containing the raw data for the command.
1172
1173
1173 Returns a 3-tuple of (request, action, action data).
1174 Returns a 3-tuple of (request, action, action data).
1174 """
1175 """
1175 if not self._canissuecommands:
1176 if not self._canissuecommands:
1176 raise error.ProgrammingError('cannot issue new commands')
1177 raise error.ProgrammingError('cannot issue new commands')
1177
1178
1178 requestid = self._nextrequestid
1179 requestid = self._nextrequestid
1179 self._nextrequestid += 2
1180 self._nextrequestid += 2
1180
1181
1181 request = commandrequest(requestid, name, args, datafh=datafh)
1182 request = commandrequest(requestid, name, args, datafh=datafh)
1182
1183
1183 if self._buffersends:
1184 if self._buffersends:
1184 self._pendingrequests.append(request)
1185 self._pendingrequests.append(request)
1185 return request, 'noop', {}
1186 return request, 'noop', {}
1186 else:
1187 else:
1187 if not self._cansend:
1188 if not self._cansend:
1188 raise error.ProgrammingError('sends cannot be performed on '
1189 raise error.ProgrammingError('sends cannot be performed on '
1189 'this instance')
1190 'this instance')
1190
1191
1191 if not self._hasmultiplesend:
1192 if not self._hasmultiplesend:
1192 self._cansend = False
1193 self._cansend = False
1193 self._canissuecommands = False
1194 self._canissuecommands = False
1194
1195
1195 return request, 'sendframes', {
1196 return request, 'sendframes', {
1196 'framegen': self._makecommandframes(request),
1197 'framegen': self._makecommandframes(request),
1197 }
1198 }
1198
1199
1199 def flushcommands(self):
1200 def flushcommands(self):
1200 """Request that all queued commands be sent.
1201 """Request that all queued commands be sent.
1201
1202
1202 If any commands are buffered, this will instruct the caller to send
1203 If any commands are buffered, this will instruct the caller to send
1203 them over the wire. If no commands are buffered it instructs the client
1204 them over the wire. If no commands are buffered it instructs the client
1204 to no-op.
1205 to no-op.
1205
1206
1206 If instances aren't configured for multiple sends, no new command
1207 If instances aren't configured for multiple sends, no new command
1207 requests are allowed after this is called.
1208 requests are allowed after this is called.
1208 """
1209 """
1209 if not self._pendingrequests:
1210 if not self._pendingrequests:
1210 return 'noop', {}
1211 return 'noop', {}
1211
1212
1212 if not self._cansend:
1213 if not self._cansend:
1213 raise error.ProgrammingError('sends cannot be performed on this '
1214 raise error.ProgrammingError('sends cannot be performed on this '
1214 'instance')
1215 'instance')
1215
1216
1216 # If the instance only allows sending once, mark that we have fired
1217 # If the instance only allows sending once, mark that we have fired
1217 # our one shot.
1218 # our one shot.
1218 if not self._hasmultiplesend:
1219 if not self._hasmultiplesend:
1219 self._canissuecommands = False
1220 self._canissuecommands = False
1220 self._cansend = False
1221 self._cansend = False
1221
1222
1222 def makeframes():
1223 def makeframes():
1223 while self._pendingrequests:
1224 while self._pendingrequests:
1224 request = self._pendingrequests.popleft()
1225 request = self._pendingrequests.popleft()
1225 for frame in self._makecommandframes(request):
1226 for frame in self._makecommandframes(request):
1226 yield frame
1227 yield frame
1227
1228
1228 return 'sendframes', {
1229 return 'sendframes', {
1229 'framegen': makeframes(),
1230 'framegen': makeframes(),
1230 }
1231 }
1231
1232
1232 def _makecommandframes(self, request):
1233 def _makecommandframes(self, request):
1233 """Emit frames to issue a command request.
1234 """Emit frames to issue a command request.
1234
1235
1235 As a side-effect, update request accounting to reflect its changed
1236 As a side-effect, update request accounting to reflect its changed
1236 state.
1237 state.
1237 """
1238 """
1238 self._activerequests[request.requestid] = request
1239 self._activerequests[request.requestid] = request
1239 request.state = 'sending'
1240 request.state = 'sending'
1240
1241
1241 res = createcommandframes(self._outgoingstream,
1242 res = createcommandframes(self._outgoingstream,
1242 request.requestid,
1243 request.requestid,
1243 request.name,
1244 request.name,
1244 request.args,
1245 request.args,
1245 request.datafh)
1246 request.datafh)
1246
1247
1247 for frame in res:
1248 for frame in res:
1248 yield frame
1249 yield frame
1249
1250
1250 request.state = 'sent'
1251 request.state = 'sent'
1251
1252
1252 def onframerecv(self, frame):
1253 def onframerecv(self, frame):
1253 """Process a frame that has been received off the wire.
1254 """Process a frame that has been received off the wire.
1254
1255
1255 Returns a 2-tuple of (action, meta) describing further action the
1256 Returns a 2-tuple of (action, meta) describing further action the
1256 caller needs to take as a result of receiving this frame.
1257 caller needs to take as a result of receiving this frame.
1257 """
1258 """
1258 if frame.streamid % 2:
1259 if frame.streamid % 2:
1259 return 'error', {
1260 return 'error', {
1260 'message': (
1261 'message': (
1261 _('received frame with odd numbered stream ID: %d') %
1262 _('received frame with odd numbered stream ID: %d') %
1262 frame.streamid),
1263 frame.streamid),
1263 }
1264 }
1264
1265
1265 if frame.streamid not in self._incomingstreams:
1266 if frame.streamid not in self._incomingstreams:
1266 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1267 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1267 return 'error', {
1268 return 'error', {
1268 'message': _('received frame on unknown stream '
1269 'message': _('received frame on unknown stream '
1269 'without beginning of stream flag set'),
1270 'without beginning of stream flag set'),
1270 }
1271 }
1271
1272
1272 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1273 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1273
1274
1274 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1275 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1275 raise error.ProgrammingError('support for decoding stream '
1276 raise error.ProgrammingError('support for decoding stream '
1276 'payloads not yet implemneted')
1277 'payloads not yet implemneted')
1277
1278
1278 if frame.streamflags & STREAM_FLAG_END_STREAM:
1279 if frame.streamflags & STREAM_FLAG_END_STREAM:
1279 del self._incomingstreams[frame.streamid]
1280 del self._incomingstreams[frame.streamid]
1280
1281
1281 if frame.requestid not in self._activerequests:
1282 if frame.requestid not in self._activerequests:
1282 return 'error', {
1283 return 'error', {
1283 'message': (_('received frame for inactive request ID: %d') %
1284 'message': (_('received frame for inactive request ID: %d') %
1284 frame.requestid),
1285 frame.requestid),
1285 }
1286 }
1286
1287
1287 request = self._activerequests[frame.requestid]
1288 request = self._activerequests[frame.requestid]
1288 request.state = 'receiving'
1289 request.state = 'receiving'
1289
1290
1290 handlers = {
1291 handlers = {
1291 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1292 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1292 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1293 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1293 }
1294 }
1294
1295
1295 meth = handlers.get(frame.typeid)
1296 meth = handlers.get(frame.typeid)
1296 if not meth:
1297 if not meth:
1297 raise error.ProgrammingError('unhandled frame type: %d' %
1298 raise error.ProgrammingError('unhandled frame type: %d' %
1298 frame.typeid)
1299 frame.typeid)
1299
1300
1300 return meth(request, frame)
1301 return meth(request, frame)
1301
1302
1302 def _oncommandresponseframe(self, request, frame):
1303 def _oncommandresponseframe(self, request, frame):
1303 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1304 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1304 request.state = 'received'
1305 request.state = 'received'
1305 del self._activerequests[request.requestid]
1306 del self._activerequests[request.requestid]
1306
1307
1307 return 'responsedata', {
1308 return 'responsedata', {
1308 'request': request,
1309 'request': request,
1309 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1310 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1310 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1311 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1311 'data': frame.payload,
1312 'data': frame.payload,
1312 }
1313 }
1313
1314
1314 def _onerrorresponseframe(self, request, frame):
1315 def _onerrorresponseframe(self, request, frame):
1315 request.state = 'errored'
1316 request.state = 'errored'
1316 del self._activerequests[request.requestid]
1317 del self._activerequests[request.requestid]
1317
1318
1318 # The payload should be a CBOR map.
1319 # The payload should be a CBOR map.
1319 m = cborutil.decodeall(frame.payload)[0]
1320 m = cborutil.decodeall(frame.payload)[0]
1320
1321
1321 return 'error', {
1322 return 'error', {
1322 'request': request,
1323 'request': request,
1323 'type': m['type'],
1324 'type': m['type'],
1324 'message': m['message'],
1325 'message': m['message'],
1325 }
1326 }
General Comments 0
You need to be logged in to leave comments. Login now