##// END OF EJS Templates
wireprotov2: handle sender protocol settings frames...
Gregory Szorc -
r40162:327d40b9 default
parent child Browse files
Show More
@@ -1,1407 +1,1497 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
371
371
372 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
373 """
373 """
374 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
378
378
379 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
384 flags=flags,
385 payload=overall + data)
385 payload=overall + data)
386 return
386 return
387
387
388 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
393 payload=overall)
394
394
395 offset = 0
395 offset = 0
396 while True:
396 while True:
397 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
398 offset += len(chunk)
399 done = offset == len(data)
399 done = offset == len(data)
400
400
401 if done:
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
405
406 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
408 flags=flags,
409 payload=chunk)
409 payload=chunk)
410
410
411 if done:
411 if done:
412 break
412 break
413
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
417
417
418 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
420 flag.
421 """
421 """
422 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
424
425 while True:
425 while True:
426 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
427 if not chunk:
427 if not chunk:
428 break
428 break
429
429
430 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
432 flags=flags,
433 payload=chunk)
433 payload=chunk)
434
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
436
437 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
439
440 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
443 payload=overall)
444
444
445 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
450 payload=b'')
451
451
452 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
453 data = {
454 b'status': b'redirect',
454 b'status': b'redirect',
455 b'location': {
455 b'location': {
456 b'url': location.url,
456 b'url': location.url,
457 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
458 }
458 }
459 }
459 }
460
460
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
462 r'servercadercerts'):
463 value = getattr(location, a)
463 value = getattr(location, a)
464 if value is not None:
464 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
466
466
467 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
471
471
472 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
474 # formatting works consistently?
475 m = {
475 m = {
476 b'status': b'error',
476 b'status': b'error',
477 b'error': {
477 b'error': {
478 b'message': message,
478 b'message': message,
479 }
479 }
480 }
480 }
481
481
482 if args:
482 if args:
483 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
484
484
485 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
486
486
487 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
490 payload=overall)
491
491
492 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
495
496 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
497 b'type': errtype,
498 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
499 }))
499 }))
500
500
501 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
503 flags=0,
504 payload=payload)
504 payload=payload)
505
505
506 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
509
509
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
511
512 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
516 """
516 """
517 atomdicts = []
517 atomdicts = []
518
518
519 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
521
521
522 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
524 for arg in args:
524 for arg in args:
525 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
527 for label in labels:
527 for label in labels:
528 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
530
530
531 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
533
534 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
536
537 # Labels must be ASCII.
537 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
539 for l in labels]
540
540
541 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
542 if args:
542 if args:
543 atom[b'args'] = args
543 atom[b'args'] = args
544 if labels:
544 if labels:
545 atom[b'labels'] = labels
545 atom[b'labels'] = labels
546
546
547 atomdicts.append(atom)
547 atomdicts.append(atom)
548
548
549 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
550
550
551 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
553
553
554 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
556 flags=0,
557 payload=payload)
557 payload=payload)
558
558
559 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
561
561
562 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
565
565
566 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
568 level.
568 level.
569 """
569 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
571 self._stream = stream
572 self._requestid = requestid
572 self._requestid = requestid
573 self._maxsize = maxframesize
573 self._maxsize = maxframesize
574 self._chunks = []
574 self._chunks = []
575 self._chunkssize = 0
575 self._chunkssize = 0
576
576
577 def send(self, data):
577 def send(self, data):
578 """Send new data for emission.
578 """Send new data for emission.
579
579
580 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
581
581
582 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
583 data to frames.
583 data to frames.
584 """
584 """
585
585
586 if data is None:
586 if data is None:
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589 return
589 return
590
590
591 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
594 # a simple implementation:
595 #
595 #
596 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
597 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
599 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
601 # and then buffer the new chunk.
602
602
603 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
604 for frame in self._flush():
604 for frame in self._flush():
605 yield frame
605 yield frame
606
606
607 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
608 offset = 0
608 offset = 0
609 while True:
609 while True:
610 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
611 offset += len(chunk)
612
612
613 yield self._stream.makeframe(
613 yield self._stream.makeframe(
614 self._requestid,
614 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
617 payload=chunk)
618
618
619 if offset == len(data):
619 if offset == len(data):
620 return
620 return
621
621
622 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
623 # return.
623 # return.
624 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
625 self._chunks.append(data)
626 self._chunkssize += len(data)
626 self._chunkssize += len(data)
627 return
627 return
628
628
629 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
631 # keep things simple for now.
632 for frame in self._flush():
632 for frame in self._flush():
633 yield frame
633 yield frame
634
634
635 self._chunks.append(data)
635 self._chunks.append(data)
636 self._chunkssize = len(data)
636 self._chunkssize = len(data)
637
637
638 def _flush(self):
638 def _flush(self):
639 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
641
641
642 self._chunks[:] = []
642 self._chunks[:] = []
643 self._chunkssize = 0
643 self._chunkssize = 0
644
644
645 yield self._stream.makeframe(
645 yield self._stream.makeframe(
646 self._requestid,
646 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 class stream(object):
651 class stream(object):
652 """Represents a logical unidirectional series of frames."""
652 """Represents a logical unidirectional series of frames."""
653
653
654 def __init__(self, streamid, active=False):
654 def __init__(self, streamid, active=False):
655 self.streamid = streamid
655 self.streamid = streamid
656 self._active = active
656 self._active = active
657
657
658 def makeframe(self, requestid, typeid, flags, payload):
658 def makeframe(self, requestid, typeid, flags, payload):
659 """Create a frame to be sent out over this stream.
659 """Create a frame to be sent out over this stream.
660
660
661 Only returns the frame instance. Does not actually send it.
661 Only returns the frame instance. Does not actually send it.
662 """
662 """
663 streamflags = 0
663 streamflags = 0
664 if not self._active:
664 if not self._active:
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 self._active = True
666 self._active = True
667
667
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 payload)
669 payload)
670
670
671 def ensureserverstream(stream):
671 def ensureserverstream(stream):
672 if stream.streamid % 2:
672 if stream.streamid % 2:
673 raise error.ProgrammingError('server should only write to even '
673 raise error.ProgrammingError('server should only write to even '
674 'numbered streams; %d is not even' %
674 'numbered streams; %d is not even' %
675 stream.streamid)
675 stream.streamid)
676
676
677 DEFAULT_PROTOCOL_SETTINGS = {
678 'contentencodings': [b'identity'],
679 }
680
677 class serverreactor(object):
681 class serverreactor(object):
678 """Holds state of a server handling frame-based protocol requests.
682 """Holds state of a server handling frame-based protocol requests.
679
683
680 This class is the "brain" of the unified frame-based protocol server
684 This class is the "brain" of the unified frame-based protocol server
681 component. While the protocol is stateless from the perspective of
685 component. While the protocol is stateless from the perspective of
682 requests/commands, something needs to track which frames have been
686 requests/commands, something needs to track which frames have been
683 received, what frames to expect, etc. This class is that thing.
687 received, what frames to expect, etc. This class is that thing.
684
688
685 Instances are modeled as a state machine of sorts. Instances are also
689 Instances are modeled as a state machine of sorts. Instances are also
686 reactionary to external events. The point of this class is to encapsulate
690 reactionary to external events. The point of this class is to encapsulate
687 the state of the connection and the exchange of frames, not to perform
691 the state of the connection and the exchange of frames, not to perform
688 work. Instead, callers tell this class when something occurs, like a
692 work. Instead, callers tell this class when something occurs, like a
689 frame arriving. If that activity is worthy of a follow-up action (say
693 frame arriving. If that activity is worthy of a follow-up action (say
690 *run a command*), the return value of that handler will say so.
694 *run a command*), the return value of that handler will say so.
691
695
692 I/O and CPU intensive operations are purposefully delegated outside of
696 I/O and CPU intensive operations are purposefully delegated outside of
693 this class.
697 this class.
694
698
695 Consumers are expected to tell instances when events occur. They do so by
699 Consumers are expected to tell instances when events occur. They do so by
696 calling the various ``on*`` methods. These methods return a 2-tuple
700 calling the various ``on*`` methods. These methods return a 2-tuple
697 describing any follow-up action(s) to take. The first element is the
701 describing any follow-up action(s) to take. The first element is the
698 name of an action to perform. The second is a data structure (usually
702 name of an action to perform. The second is a data structure (usually
699 a dict) specific to that action that contains more information. e.g.
703 a dict) specific to that action that contains more information. e.g.
700 if the server wants to send frames back to the client, the data structure
704 if the server wants to send frames back to the client, the data structure
701 will contain a reference to those frames.
705 will contain a reference to those frames.
702
706
703 Valid actions that consumers can be instructed to take are:
707 Valid actions that consumers can be instructed to take are:
704
708
705 sendframes
709 sendframes
706 Indicates that frames should be sent to the client. The ``framegen``
710 Indicates that frames should be sent to the client. The ``framegen``
707 key contains a generator of frames that should be sent. The server
711 key contains a generator of frames that should be sent. The server
708 assumes that all frames are sent to the client.
712 assumes that all frames are sent to the client.
709
713
710 error
714 error
711 Indicates that an error occurred. Consumer should probably abort.
715 Indicates that an error occurred. Consumer should probably abort.
712
716
713 runcommand
717 runcommand
714 Indicates that the consumer should run a wire protocol command. Details
718 Indicates that the consumer should run a wire protocol command. Details
715 of the command to run are given in the data structure.
719 of the command to run are given in the data structure.
716
720
717 wantframe
721 wantframe
718 Indicates that nothing of interest happened and the server is waiting on
722 Indicates that nothing of interest happened and the server is waiting on
719 more frames from the client before anything interesting can be done.
723 more frames from the client before anything interesting can be done.
720
724
721 noop
725 noop
722 Indicates no additional action is required.
726 Indicates no additional action is required.
723
727
724 Known Issues
728 Known Issues
725 ------------
729 ------------
726
730
727 There are no limits to the number of partially received commands or their
731 There are no limits to the number of partially received commands or their
728 size. A malicious client could stream command request data and exhaust the
732 size. A malicious client could stream command request data and exhaust the
729 server's memory.
733 server's memory.
730
734
731 Partially received commands are not acted upon when end of input is
735 Partially received commands are not acted upon when end of input is
732 reached. Should the server error if it receives a partial request?
736 reached. Should the server error if it receives a partial request?
733 Should the client send a message to abort a partially transmitted request
737 Should the client send a message to abort a partially transmitted request
734 to facilitate graceful shutdown?
738 to facilitate graceful shutdown?
735
739
736 Active requests that haven't been responded to aren't tracked. This means
740 Active requests that haven't been responded to aren't tracked. This means
737 that if we receive a command and instruct its dispatch, another command
741 that if we receive a command and instruct its dispatch, another command
738 with its request ID can come in over the wire and there will be a race
742 with its request ID can come in over the wire and there will be a race
739 between who responds to what.
743 between who responds to what.
740 """
744 """
741
745
742 def __init__(self, deferoutput=False):
746 def __init__(self, deferoutput=False):
743 """Construct a new server reactor.
747 """Construct a new server reactor.
744
748
745 ``deferoutput`` can be used to indicate that no output frames should be
749 ``deferoutput`` can be used to indicate that no output frames should be
746 instructed to be sent until input has been exhausted. In this mode,
750 instructed to be sent until input has been exhausted. In this mode,
747 events that would normally generate output frames (such as a command
751 events that would normally generate output frames (such as a command
748 response being ready) will instead defer instructing the consumer to
752 response being ready) will instead defer instructing the consumer to
749 send those frames. This is useful for half-duplex transports where the
753 send those frames. This is useful for half-duplex transports where the
750 sender cannot receive until all data has been transmitted.
754 sender cannot receive until all data has been transmitted.
751 """
755 """
752 self._deferoutput = deferoutput
756 self._deferoutput = deferoutput
753 self._state = 'idle'
757 self._state = 'initial'
754 self._nextoutgoingstreamid = 2
758 self._nextoutgoingstreamid = 2
755 self._bufferedframegens = []
759 self._bufferedframegens = []
756 # stream id -> stream instance for all active streams from the client.
760 # stream id -> stream instance for all active streams from the client.
757 self._incomingstreams = {}
761 self._incomingstreams = {}
758 self._outgoingstreams = {}
762 self._outgoingstreams = {}
759 # request id -> dict of commands that are actively being received.
763 # request id -> dict of commands that are actively being received.
760 self._receivingcommands = {}
764 self._receivingcommands = {}
761 # Request IDs that have been received and are actively being processed.
765 # Request IDs that have been received and are actively being processed.
762 # Once all output for a request has been sent, it is removed from this
766 # Once all output for a request has been sent, it is removed from this
763 # set.
767 # set.
764 self._activecommands = set()
768 self._activecommands = set()
765
769
770 self._protocolsettingsdecoder = None
771
772 # Sender protocol settings are optional. Set implied default values.
773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
774
766 def onframerecv(self, frame):
775 def onframerecv(self, frame):
767 """Process a frame that has been received off the wire.
776 """Process a frame that has been received off the wire.
768
777
769 Returns a dict with an ``action`` key that details what action,
778 Returns a dict with an ``action`` key that details what action,
770 if any, the consumer should take next.
779 if any, the consumer should take next.
771 """
780 """
772 if not frame.streamid % 2:
781 if not frame.streamid % 2:
773 self._state = 'errored'
782 self._state = 'errored'
774 return self._makeerrorresult(
783 return self._makeerrorresult(
775 _('received frame with even numbered stream ID: %d') %
784 _('received frame with even numbered stream ID: %d') %
776 frame.streamid)
785 frame.streamid)
777
786
778 if frame.streamid not in self._incomingstreams:
787 if frame.streamid not in self._incomingstreams:
779 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
780 self._state = 'errored'
789 self._state = 'errored'
781 return self._makeerrorresult(
790 return self._makeerrorresult(
782 _('received frame on unknown inactive stream without '
791 _('received frame on unknown inactive stream without '
783 'beginning of stream flag set'))
792 'beginning of stream flag set'))
784
793
785 self._incomingstreams[frame.streamid] = stream(frame.streamid)
794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
786
795
787 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
788 # TODO handle decoding frames
797 # TODO handle decoding frames
789 self._state = 'errored'
798 self._state = 'errored'
790 raise error.ProgrammingError('support for decoding stream payloads '
799 raise error.ProgrammingError('support for decoding stream payloads '
791 'not yet implemented')
800 'not yet implemented')
792
801
793 if frame.streamflags & STREAM_FLAG_END_STREAM:
802 if frame.streamflags & STREAM_FLAG_END_STREAM:
794 del self._incomingstreams[frame.streamid]
803 del self._incomingstreams[frame.streamid]
795
804
796 handlers = {
805 handlers = {
806 'initial': self._onframeinitial,
807 'protocol-settings-receiving': self._onframeprotocolsettings,
797 'idle': self._onframeidle,
808 'idle': self._onframeidle,
798 'command-receiving': self._onframecommandreceiving,
809 'command-receiving': self._onframecommandreceiving,
799 'errored': self._onframeerrored,
810 'errored': self._onframeerrored,
800 }
811 }
801
812
802 meth = handlers.get(self._state)
813 meth = handlers.get(self._state)
803 if not meth:
814 if not meth:
804 raise error.ProgrammingError('unhandled state: %s' % self._state)
815 raise error.ProgrammingError('unhandled state: %s' % self._state)
805
816
806 return meth(frame)
817 return meth(frame)
807
818
808 def oncommandresponseready(self, stream, requestid, data):
819 def oncommandresponseready(self, stream, requestid, data):
809 """Signal that a bytes response is ready to be sent to the client.
820 """Signal that a bytes response is ready to be sent to the client.
810
821
811 The raw bytes response is passed as an argument.
822 The raw bytes response is passed as an argument.
812 """
823 """
813 ensureserverstream(stream)
824 ensureserverstream(stream)
814
825
815 def sendframes():
826 def sendframes():
816 for frame in createcommandresponseframesfrombytes(stream, requestid,
827 for frame in createcommandresponseframesfrombytes(stream, requestid,
817 data):
828 data):
818 yield frame
829 yield frame
819
830
820 self._activecommands.remove(requestid)
831 self._activecommands.remove(requestid)
821
832
822 result = sendframes()
833 result = sendframes()
823
834
824 if self._deferoutput:
835 if self._deferoutput:
825 self._bufferedframegens.append(result)
836 self._bufferedframegens.append(result)
826 return 'noop', {}
837 return 'noop', {}
827 else:
838 else:
828 return 'sendframes', {
839 return 'sendframes', {
829 'framegen': result,
840 'framegen': result,
830 }
841 }
831
842
832 def oncommandresponsereadyobjects(self, stream, requestid, objs):
843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
833 """Signal that objects are ready to be sent to the client.
844 """Signal that objects are ready to be sent to the client.
834
845
835 ``objs`` is an iterable of objects (typically a generator) that will
846 ``objs`` is an iterable of objects (typically a generator) that will
836 be encoded via CBOR and added to frames, which will be sent to the
847 be encoded via CBOR and added to frames, which will be sent to the
837 client.
848 client.
838 """
849 """
839 ensureserverstream(stream)
850 ensureserverstream(stream)
840
851
841 # We need to take care over exception handling. Uncaught exceptions
852 # We need to take care over exception handling. Uncaught exceptions
842 # when generating frames could lead to premature end of the frame
853 # when generating frames could lead to premature end of the frame
843 # stream and the possibility of the server or client process getting
854 # stream and the possibility of the server or client process getting
844 # in a bad state.
855 # in a bad state.
845 #
856 #
846 # Keep in mind that if ``objs`` is a generator, advancing it could
857 # Keep in mind that if ``objs`` is a generator, advancing it could
847 # raise exceptions that originated in e.g. wire protocol command
858 # raise exceptions that originated in e.g. wire protocol command
848 # functions. That is why we differentiate between exceptions raised
859 # functions. That is why we differentiate between exceptions raised
849 # when iterating versus other exceptions that occur.
860 # when iterating versus other exceptions that occur.
850 #
861 #
851 # In all cases, when the function finishes, the request is fully
862 # In all cases, when the function finishes, the request is fully
852 # handled and no new frames for it should be seen.
863 # handled and no new frames for it should be seen.
853
864
854 def sendframes():
865 def sendframes():
855 emitted = False
866 emitted = False
856 alternatelocationsent = False
867 alternatelocationsent = False
857 emitter = bufferingcommandresponseemitter(stream, requestid)
868 emitter = bufferingcommandresponseemitter(stream, requestid)
858 while True:
869 while True:
859 try:
870 try:
860 o = next(objs)
871 o = next(objs)
861 except StopIteration:
872 except StopIteration:
862 for frame in emitter.send(None):
873 for frame in emitter.send(None):
863 yield frame
874 yield frame
864
875
865 if emitted:
876 if emitted:
866 yield createcommandresponseeosframe(stream, requestid)
877 yield createcommandresponseeosframe(stream, requestid)
867 break
878 break
868
879
869 except error.WireprotoCommandError as e:
880 except error.WireprotoCommandError as e:
870 for frame in createcommanderrorresponse(
881 for frame in createcommanderrorresponse(
871 stream, requestid, e.message, e.messageargs):
882 stream, requestid, e.message, e.messageargs):
872 yield frame
883 yield frame
873 break
884 break
874
885
875 except Exception as e:
886 except Exception as e:
876 for frame in createerrorframe(
887 for frame in createerrorframe(
877 stream, requestid, '%s' % stringutil.forcebytestr(e),
888 stream, requestid, '%s' % stringutil.forcebytestr(e),
878 errtype='server'):
889 errtype='server'):
879
890
880 yield frame
891 yield frame
881
892
882 break
893 break
883
894
884 try:
895 try:
885 # Alternate location responses can only be the first and
896 # Alternate location responses can only be the first and
886 # only object in the output stream.
897 # only object in the output stream.
887 if isinstance(o, wireprototypes.alternatelocationresponse):
898 if isinstance(o, wireprototypes.alternatelocationresponse):
888 if emitted:
899 if emitted:
889 raise error.ProgrammingError(
900 raise error.ProgrammingError(
890 'alternatelocationresponse seen after initial '
901 'alternatelocationresponse seen after initial '
891 'output object')
902 'output object')
892
903
893 yield createalternatelocationresponseframe(
904 yield createalternatelocationresponseframe(
894 stream, requestid, o)
905 stream, requestid, o)
895
906
896 alternatelocationsent = True
907 alternatelocationsent = True
897 emitted = True
908 emitted = True
898 continue
909 continue
899
910
900 if alternatelocationsent:
911 if alternatelocationsent:
901 raise error.ProgrammingError(
912 raise error.ProgrammingError(
902 'object follows alternatelocationresponse')
913 'object follows alternatelocationresponse')
903
914
904 if not emitted:
915 if not emitted:
905 yield createcommandresponseokframe(stream, requestid)
916 yield createcommandresponseokframe(stream, requestid)
906 emitted = True
917 emitted = True
907
918
908 # Objects emitted by command functions can be serializable
919 # Objects emitted by command functions can be serializable
909 # data structures or special types.
920 # data structures or special types.
910 # TODO consider extracting the content normalization to a
921 # TODO consider extracting the content normalization to a
911 # standalone function, as it may be useful for e.g. cachers.
922 # standalone function, as it may be useful for e.g. cachers.
912
923
913 # A pre-encoded object is sent directly to the emitter.
924 # A pre-encoded object is sent directly to the emitter.
914 if isinstance(o, wireprototypes.encodedresponse):
925 if isinstance(o, wireprototypes.encodedresponse):
915 for frame in emitter.send(o.data):
926 for frame in emitter.send(o.data):
916 yield frame
927 yield frame
917
928
918 # A regular object is CBOR encoded.
929 # A regular object is CBOR encoded.
919 else:
930 else:
920 for chunk in cborutil.streamencode(o):
931 for chunk in cborutil.streamencode(o):
921 for frame in emitter.send(chunk):
932 for frame in emitter.send(chunk):
922 yield frame
933 yield frame
923
934
924 except Exception as e:
935 except Exception as e:
925 for frame in createerrorframe(stream, requestid,
936 for frame in createerrorframe(stream, requestid,
926 '%s' % e,
937 '%s' % e,
927 errtype='server'):
938 errtype='server'):
928 yield frame
939 yield frame
929
940
930 break
941 break
931
942
932 self._activecommands.remove(requestid)
943 self._activecommands.remove(requestid)
933
944
934 return self._handlesendframes(sendframes())
945 return self._handlesendframes(sendframes())
935
946
936 def oninputeof(self):
947 def oninputeof(self):
937 """Signals that end of input has been received.
948 """Signals that end of input has been received.
938
949
939 No more frames will be received. All pending activity should be
950 No more frames will be received. All pending activity should be
940 completed.
951 completed.
941 """
952 """
942 # TODO should we do anything about in-flight commands?
953 # TODO should we do anything about in-flight commands?
943
954
944 if not self._deferoutput or not self._bufferedframegens:
955 if not self._deferoutput or not self._bufferedframegens:
945 return 'noop', {}
956 return 'noop', {}
946
957
947 # If we buffered all our responses, emit those.
958 # If we buffered all our responses, emit those.
948 def makegen():
959 def makegen():
949 for gen in self._bufferedframegens:
960 for gen in self._bufferedframegens:
950 for frame in gen:
961 for frame in gen:
951 yield frame
962 yield frame
952
963
953 return 'sendframes', {
964 return 'sendframes', {
954 'framegen': makegen(),
965 'framegen': makegen(),
955 }
966 }
956
967
957 def _handlesendframes(self, framegen):
968 def _handlesendframes(self, framegen):
958 if self._deferoutput:
969 if self._deferoutput:
959 self._bufferedframegens.append(framegen)
970 self._bufferedframegens.append(framegen)
960 return 'noop', {}
971 return 'noop', {}
961 else:
972 else:
962 return 'sendframes', {
973 return 'sendframes', {
963 'framegen': framegen,
974 'framegen': framegen,
964 }
975 }
965
976
966 def onservererror(self, stream, requestid, msg):
977 def onservererror(self, stream, requestid, msg):
967 ensureserverstream(stream)
978 ensureserverstream(stream)
968
979
969 def sendframes():
980 def sendframes():
970 for frame in createerrorframe(stream, requestid, msg,
981 for frame in createerrorframe(stream, requestid, msg,
971 errtype='server'):
982 errtype='server'):
972 yield frame
983 yield frame
973
984
974 self._activecommands.remove(requestid)
985 self._activecommands.remove(requestid)
975
986
976 return self._handlesendframes(sendframes())
987 return self._handlesendframes(sendframes())
977
988
978 def oncommanderror(self, stream, requestid, message, args=None):
989 def oncommanderror(self, stream, requestid, message, args=None):
979 """Called when a command encountered an error before sending output."""
990 """Called when a command encountered an error before sending output."""
980 ensureserverstream(stream)
991 ensureserverstream(stream)
981
992
982 def sendframes():
993 def sendframes():
983 for frame in createcommanderrorresponse(stream, requestid, message,
994 for frame in createcommanderrorresponse(stream, requestid, message,
984 args):
995 args):
985 yield frame
996 yield frame
986
997
987 self._activecommands.remove(requestid)
998 self._activecommands.remove(requestid)
988
999
989 return self._handlesendframes(sendframes())
1000 return self._handlesendframes(sendframes())
990
1001
991 def makeoutputstream(self):
1002 def makeoutputstream(self):
992 """Create a stream to be used for sending data to the client."""
1003 """Create a stream to be used for sending data to the client."""
993 streamid = self._nextoutgoingstreamid
1004 streamid = self._nextoutgoingstreamid
994 self._nextoutgoingstreamid += 2
1005 self._nextoutgoingstreamid += 2
995
1006
996 s = stream(streamid)
1007 s = stream(streamid)
997 self._outgoingstreams[streamid] = s
1008 self._outgoingstreams[streamid] = s
998
1009
999 return s
1010 return s
1000
1011
1001 def _makeerrorresult(self, msg):
1012 def _makeerrorresult(self, msg):
1002 return 'error', {
1013 return 'error', {
1003 'message': msg,
1014 'message': msg,
1004 }
1015 }
1005
1016
1006 def _makeruncommandresult(self, requestid):
1017 def _makeruncommandresult(self, requestid):
1007 entry = self._receivingcommands[requestid]
1018 entry = self._receivingcommands[requestid]
1008
1019
1009 if not entry['requestdone']:
1020 if not entry['requestdone']:
1010 self._state = 'errored'
1021 self._state = 'errored'
1011 raise error.ProgrammingError('should not be called without '
1022 raise error.ProgrammingError('should not be called without '
1012 'requestdone set')
1023 'requestdone set')
1013
1024
1014 del self._receivingcommands[requestid]
1025 del self._receivingcommands[requestid]
1015
1026
1016 if self._receivingcommands:
1027 if self._receivingcommands:
1017 self._state = 'command-receiving'
1028 self._state = 'command-receiving'
1018 else:
1029 else:
1019 self._state = 'idle'
1030 self._state = 'idle'
1020
1031
1021 # Decode the payloads as CBOR.
1032 # Decode the payloads as CBOR.
1022 entry['payload'].seek(0)
1033 entry['payload'].seek(0)
1023 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1024
1035
1025 if b'name' not in request:
1036 if b'name' not in request:
1026 self._state = 'errored'
1037 self._state = 'errored'
1027 return self._makeerrorresult(
1038 return self._makeerrorresult(
1028 _('command request missing "name" field'))
1039 _('command request missing "name" field'))
1029
1040
1030 if b'args' not in request:
1041 if b'args' not in request:
1031 request[b'args'] = {}
1042 request[b'args'] = {}
1032
1043
1033 assert requestid not in self._activecommands
1044 assert requestid not in self._activecommands
1034 self._activecommands.add(requestid)
1045 self._activecommands.add(requestid)
1035
1046
1036 return 'runcommand', {
1047 return 'runcommand', {
1037 'requestid': requestid,
1048 'requestid': requestid,
1038 'command': request[b'name'],
1049 'command': request[b'name'],
1039 'args': request[b'args'],
1050 'args': request[b'args'],
1040 'redirect': request.get(b'redirect'),
1051 'redirect': request.get(b'redirect'),
1041 'data': entry['data'].getvalue() if entry['data'] else None,
1052 'data': entry['data'].getvalue() if entry['data'] else None,
1042 }
1053 }
1043
1054
1044 def _makewantframeresult(self):
1055 def _makewantframeresult(self):
1045 return 'wantframe', {
1056 return 'wantframe', {
1046 'state': self._state,
1057 'state': self._state,
1047 }
1058 }
1048
1059
1049 def _validatecommandrequestframe(self, frame):
1060 def _validatecommandrequestframe(self, frame):
1050 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1051 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1052
1063
1053 if new and continuation:
1064 if new and continuation:
1054 self._state = 'errored'
1065 self._state = 'errored'
1055 return self._makeerrorresult(
1066 return self._makeerrorresult(
1056 _('received command request frame with both new and '
1067 _('received command request frame with both new and '
1057 'continuation flags set'))
1068 'continuation flags set'))
1058
1069
1059 if not new and not continuation:
1070 if not new and not continuation:
1060 self._state = 'errored'
1071 self._state = 'errored'
1061 return self._makeerrorresult(
1072 return self._makeerrorresult(
1062 _('received command request frame with neither new nor '
1073 _('received command request frame with neither new nor '
1063 'continuation flags set'))
1074 'continuation flags set'))
1064
1075
1076 def _onframeinitial(self, frame):
1077 # Called when we receive a frame when in the "initial" state.
1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1079 self._state = 'protocol-settings-receiving'
1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1081 return self._onframeprotocolsettings(frame)
1082
1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1084 self._state = 'idle'
1085 return self._onframeidle(frame)
1086
1087 else:
1088 self._state = 'errored'
1089 return self._makeerrorresult(
1090 _('expected sender protocol settings or command request '
1091 'frame; got %d') % frame.typeid)
1092
1093 def _onframeprotocolsettings(self, frame):
1094 assert self._state == 'protocol-settings-receiving'
1095 assert self._protocolsettingsdecoder is not None
1096
1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1098 self._state = 'errored'
1099 return self._makeerrorresult(
1100 _('expected sender protocol settings frame; got %d') %
1101 frame.typeid)
1102
1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1105
1106 if more and eos:
1107 self._state = 'errored'
1108 return self._makeerrorresult(
1109 _('sender protocol settings frame cannot have both '
1110 'continuation and end of stream flags set'))
1111
1112 if not more and not eos:
1113 self._state = 'errored'
1114 return self._makeerrorresult(
1115 _('sender protocol settings frame must have continuation or '
1116 'end of stream flag set'))
1117
1118 # TODO establish limits for maximum amount of data that can be
1119 # buffered.
1120 try:
1121 self._protocolsettingsdecoder.decode(frame.payload)
1122 except Exception as e:
1123 self._state = 'errored'
1124 return self._makeerrorresult(
1125 _('error decoding CBOR from sender protocol settings frame: %s')
1126 % stringutil.forcebytestr(e))
1127
1128 if more:
1129 return self._makewantframeresult()
1130
1131 assert eos
1132
1133 decoded = self._protocolsettingsdecoder.getavailable()
1134 self._protocolsettingsdecoder = None
1135
1136 if not decoded:
1137 self._state = 'errored'
1138 return self._makeerrorresult(
1139 _('sender protocol settings frame did not contain CBOR data'))
1140 elif len(decoded) > 1:
1141 self._state = 'errored'
1142 return self._makeerrorresult(
1143 _('sender protocol settings frame contained multiple CBOR '
1144 'values'))
1145
1146 d = decoded[0]
1147
1148 if b'contentencodings' in d:
1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1150
1151 self._state = 'idle'
1152
1153 return self._makewantframeresult()
1154
1065 def _onframeidle(self, frame):
1155 def _onframeidle(self, frame):
1066 # The only frame type that should be received in this state is a
1156 # The only frame type that should be received in this state is a
1067 # command request.
1157 # command request.
1068 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1069 self._state = 'errored'
1159 self._state = 'errored'
1070 return self._makeerrorresult(
1160 return self._makeerrorresult(
1071 _('expected command request frame; got %d') % frame.typeid)
1161 _('expected command request frame; got %d') % frame.typeid)
1072
1162
1073 res = self._validatecommandrequestframe(frame)
1163 res = self._validatecommandrequestframe(frame)
1074 if res:
1164 if res:
1075 return res
1165 return res
1076
1166
1077 if frame.requestid in self._receivingcommands:
1167 if frame.requestid in self._receivingcommands:
1078 self._state = 'errored'
1168 self._state = 'errored'
1079 return self._makeerrorresult(
1169 return self._makeerrorresult(
1080 _('request with ID %d already received') % frame.requestid)
1170 _('request with ID %d already received') % frame.requestid)
1081
1171
1082 if frame.requestid in self._activecommands:
1172 if frame.requestid in self._activecommands:
1083 self._state = 'errored'
1173 self._state = 'errored'
1084 return self._makeerrorresult(
1174 return self._makeerrorresult(
1085 _('request with ID %d is already active') % frame.requestid)
1175 _('request with ID %d is already active') % frame.requestid)
1086
1176
1087 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1088 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1089 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1090
1180
1091 if not new:
1181 if not new:
1092 self._state = 'errored'
1182 self._state = 'errored'
1093 return self._makeerrorresult(
1183 return self._makeerrorresult(
1094 _('received command request frame without new flag set'))
1184 _('received command request frame without new flag set'))
1095
1185
1096 payload = util.bytesio()
1186 payload = util.bytesio()
1097 payload.write(frame.payload)
1187 payload.write(frame.payload)
1098
1188
1099 self._receivingcommands[frame.requestid] = {
1189 self._receivingcommands[frame.requestid] = {
1100 'payload': payload,
1190 'payload': payload,
1101 'data': None,
1191 'data': None,
1102 'requestdone': not moreframes,
1192 'requestdone': not moreframes,
1103 'expectingdata': bool(expectingdata),
1193 'expectingdata': bool(expectingdata),
1104 }
1194 }
1105
1195
1106 # This is the final frame for this request. Dispatch it.
1196 # This is the final frame for this request. Dispatch it.
1107 if not moreframes and not expectingdata:
1197 if not moreframes and not expectingdata:
1108 return self._makeruncommandresult(frame.requestid)
1198 return self._makeruncommandresult(frame.requestid)
1109
1199
1110 assert moreframes or expectingdata
1200 assert moreframes or expectingdata
1111 self._state = 'command-receiving'
1201 self._state = 'command-receiving'
1112 return self._makewantframeresult()
1202 return self._makewantframeresult()
1113
1203
1114 def _onframecommandreceiving(self, frame):
1204 def _onframecommandreceiving(self, frame):
1115 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1116 # Process new command requests as such.
1206 # Process new command requests as such.
1117 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1118 return self._onframeidle(frame)
1208 return self._onframeidle(frame)
1119
1209
1120 res = self._validatecommandrequestframe(frame)
1210 res = self._validatecommandrequestframe(frame)
1121 if res:
1211 if res:
1122 return res
1212 return res
1123
1213
1124 # All other frames should be related to a command that is currently
1214 # All other frames should be related to a command that is currently
1125 # receiving but is not active.
1215 # receiving but is not active.
1126 if frame.requestid in self._activecommands:
1216 if frame.requestid in self._activecommands:
1127 self._state = 'errored'
1217 self._state = 'errored'
1128 return self._makeerrorresult(
1218 return self._makeerrorresult(
1129 _('received frame for request that is still active: %d') %
1219 _('received frame for request that is still active: %d') %
1130 frame.requestid)
1220 frame.requestid)
1131
1221
1132 if frame.requestid not in self._receivingcommands:
1222 if frame.requestid not in self._receivingcommands:
1133 self._state = 'errored'
1223 self._state = 'errored'
1134 return self._makeerrorresult(
1224 return self._makeerrorresult(
1135 _('received frame for request that is not receiving: %d') %
1225 _('received frame for request that is not receiving: %d') %
1136 frame.requestid)
1226 frame.requestid)
1137
1227
1138 entry = self._receivingcommands[frame.requestid]
1228 entry = self._receivingcommands[frame.requestid]
1139
1229
1140 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1141 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1142 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1143
1233
1144 if entry['requestdone']:
1234 if entry['requestdone']:
1145 self._state = 'errored'
1235 self._state = 'errored'
1146 return self._makeerrorresult(
1236 return self._makeerrorresult(
1147 _('received command request frame when request frames '
1237 _('received command request frame when request frames '
1148 'were supposedly done'))
1238 'were supposedly done'))
1149
1239
1150 if expectingdata != entry['expectingdata']:
1240 if expectingdata != entry['expectingdata']:
1151 self._state = 'errored'
1241 self._state = 'errored'
1152 return self._makeerrorresult(
1242 return self._makeerrorresult(
1153 _('mismatch between expect data flag and previous frame'))
1243 _('mismatch between expect data flag and previous frame'))
1154
1244
1155 entry['payload'].write(frame.payload)
1245 entry['payload'].write(frame.payload)
1156
1246
1157 if not moreframes:
1247 if not moreframes:
1158 entry['requestdone'] = True
1248 entry['requestdone'] = True
1159
1249
1160 if not moreframes and not expectingdata:
1250 if not moreframes and not expectingdata:
1161 return self._makeruncommandresult(frame.requestid)
1251 return self._makeruncommandresult(frame.requestid)
1162
1252
1163 return self._makewantframeresult()
1253 return self._makewantframeresult()
1164
1254
1165 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1166 if not entry['expectingdata']:
1256 if not entry['expectingdata']:
1167 self._state = 'errored'
1257 self._state = 'errored'
1168 return self._makeerrorresult(_(
1258 return self._makeerrorresult(_(
1169 'received command data frame for request that is not '
1259 'received command data frame for request that is not '
1170 'expecting data: %d') % frame.requestid)
1260 'expecting data: %d') % frame.requestid)
1171
1261
1172 if entry['data'] is None:
1262 if entry['data'] is None:
1173 entry['data'] = util.bytesio()
1263 entry['data'] = util.bytesio()
1174
1264
1175 return self._handlecommanddataframe(frame, entry)
1265 return self._handlecommanddataframe(frame, entry)
1176 else:
1266 else:
1177 self._state = 'errored'
1267 self._state = 'errored'
1178 return self._makeerrorresult(_(
1268 return self._makeerrorresult(_(
1179 'received unexpected frame type: %d') % frame.typeid)
1269 'received unexpected frame type: %d') % frame.typeid)
1180
1270
1181 def _handlecommanddataframe(self, frame, entry):
1271 def _handlecommanddataframe(self, frame, entry):
1182 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1183
1273
1184 # TODO support streaming data instead of buffering it.
1274 # TODO support streaming data instead of buffering it.
1185 entry['data'].write(frame.payload)
1275 entry['data'].write(frame.payload)
1186
1276
1187 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1188 return self._makewantframeresult()
1278 return self._makewantframeresult()
1189 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1190 entry['data'].seek(0)
1280 entry['data'].seek(0)
1191 return self._makeruncommandresult(frame.requestid)
1281 return self._makeruncommandresult(frame.requestid)
1192 else:
1282 else:
1193 self._state = 'errored'
1283 self._state = 'errored'
1194 return self._makeerrorresult(_('command data frame without '
1284 return self._makeerrorresult(_('command data frame without '
1195 'flags'))
1285 'flags'))
1196
1286
1197 def _onframeerrored(self, frame):
1287 def _onframeerrored(self, frame):
1198 return self._makeerrorresult(_('server already errored'))
1288 return self._makeerrorresult(_('server already errored'))
1199
1289
1200 class commandrequest(object):
1290 class commandrequest(object):
1201 """Represents a request to run a command."""
1291 """Represents a request to run a command."""
1202
1292
1203 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1204 self.requestid = requestid
1294 self.requestid = requestid
1205 self.name = name
1295 self.name = name
1206 self.args = args
1296 self.args = args
1207 self.datafh = datafh
1297 self.datafh = datafh
1208 self.redirect = redirect
1298 self.redirect = redirect
1209 self.state = 'pending'
1299 self.state = 'pending'
1210
1300
1211 class clientreactor(object):
1301 class clientreactor(object):
1212 """Holds state of a client issuing frame-based protocol requests.
1302 """Holds state of a client issuing frame-based protocol requests.
1213
1303
1214 This is like ``serverreactor`` but for client-side state.
1304 This is like ``serverreactor`` but for client-side state.
1215
1305
1216 Each instance is bound to the lifetime of a connection. For persistent
1306 Each instance is bound to the lifetime of a connection. For persistent
1217 connection transports using e.g. TCP sockets and speaking the raw
1307 connection transports using e.g. TCP sockets and speaking the raw
1218 framing protocol, there will be a single instance for the lifetime of
1308 framing protocol, there will be a single instance for the lifetime of
1219 the TCP socket. For transports where there are multiple discrete
1309 the TCP socket. For transports where there are multiple discrete
1220 interactions (say tunneled within in HTTP request), there will be a
1310 interactions (say tunneled within in HTTP request), there will be a
1221 separate instance for each distinct interaction.
1311 separate instance for each distinct interaction.
1222 """
1312 """
1223 def __init__(self, hasmultiplesend=False, buffersends=True):
1313 def __init__(self, hasmultiplesend=False, buffersends=True):
1224 """Create a new instance.
1314 """Create a new instance.
1225
1315
1226 ``hasmultiplesend`` indicates whether multiple sends are supported
1316 ``hasmultiplesend`` indicates whether multiple sends are supported
1227 by the transport. When True, it is possible to send commands immediately
1317 by the transport. When True, it is possible to send commands immediately
1228 instead of buffering until the caller signals an intent to finish a
1318 instead of buffering until the caller signals an intent to finish a
1229 send operation.
1319 send operation.
1230
1320
1231 ``buffercommands`` indicates whether sends should be buffered until the
1321 ``buffercommands`` indicates whether sends should be buffered until the
1232 last request has been issued.
1322 last request has been issued.
1233 """
1323 """
1234 self._hasmultiplesend = hasmultiplesend
1324 self._hasmultiplesend = hasmultiplesend
1235 self._buffersends = buffersends
1325 self._buffersends = buffersends
1236
1326
1237 self._canissuecommands = True
1327 self._canissuecommands = True
1238 self._cansend = True
1328 self._cansend = True
1239
1329
1240 self._nextrequestid = 1
1330 self._nextrequestid = 1
1241 # We only support a single outgoing stream for now.
1331 # We only support a single outgoing stream for now.
1242 self._outgoingstream = stream(1)
1332 self._outgoingstream = stream(1)
1243 self._pendingrequests = collections.deque()
1333 self._pendingrequests = collections.deque()
1244 self._activerequests = {}
1334 self._activerequests = {}
1245 self._incomingstreams = {}
1335 self._incomingstreams = {}
1246
1336
1247 def callcommand(self, name, args, datafh=None, redirect=None):
1337 def callcommand(self, name, args, datafh=None, redirect=None):
1248 """Request that a command be executed.
1338 """Request that a command be executed.
1249
1339
1250 Receives the command name, a dict of arguments to pass to the command,
1340 Receives the command name, a dict of arguments to pass to the command,
1251 and an optional file object containing the raw data for the command.
1341 and an optional file object containing the raw data for the command.
1252
1342
1253 Returns a 3-tuple of (request, action, action data).
1343 Returns a 3-tuple of (request, action, action data).
1254 """
1344 """
1255 if not self._canissuecommands:
1345 if not self._canissuecommands:
1256 raise error.ProgrammingError('cannot issue new commands')
1346 raise error.ProgrammingError('cannot issue new commands')
1257
1347
1258 requestid = self._nextrequestid
1348 requestid = self._nextrequestid
1259 self._nextrequestid += 2
1349 self._nextrequestid += 2
1260
1350
1261 request = commandrequest(requestid, name, args, datafh=datafh,
1351 request = commandrequest(requestid, name, args, datafh=datafh,
1262 redirect=redirect)
1352 redirect=redirect)
1263
1353
1264 if self._buffersends:
1354 if self._buffersends:
1265 self._pendingrequests.append(request)
1355 self._pendingrequests.append(request)
1266 return request, 'noop', {}
1356 return request, 'noop', {}
1267 else:
1357 else:
1268 if not self._cansend:
1358 if not self._cansend:
1269 raise error.ProgrammingError('sends cannot be performed on '
1359 raise error.ProgrammingError('sends cannot be performed on '
1270 'this instance')
1360 'this instance')
1271
1361
1272 if not self._hasmultiplesend:
1362 if not self._hasmultiplesend:
1273 self._cansend = False
1363 self._cansend = False
1274 self._canissuecommands = False
1364 self._canissuecommands = False
1275
1365
1276 return request, 'sendframes', {
1366 return request, 'sendframes', {
1277 'framegen': self._makecommandframes(request),
1367 'framegen': self._makecommandframes(request),
1278 }
1368 }
1279
1369
1280 def flushcommands(self):
1370 def flushcommands(self):
1281 """Request that all queued commands be sent.
1371 """Request that all queued commands be sent.
1282
1372
1283 If any commands are buffered, this will instruct the caller to send
1373 If any commands are buffered, this will instruct the caller to send
1284 them over the wire. If no commands are buffered it instructs the client
1374 them over the wire. If no commands are buffered it instructs the client
1285 to no-op.
1375 to no-op.
1286
1376
1287 If instances aren't configured for multiple sends, no new command
1377 If instances aren't configured for multiple sends, no new command
1288 requests are allowed after this is called.
1378 requests are allowed after this is called.
1289 """
1379 """
1290 if not self._pendingrequests:
1380 if not self._pendingrequests:
1291 return 'noop', {}
1381 return 'noop', {}
1292
1382
1293 if not self._cansend:
1383 if not self._cansend:
1294 raise error.ProgrammingError('sends cannot be performed on this '
1384 raise error.ProgrammingError('sends cannot be performed on this '
1295 'instance')
1385 'instance')
1296
1386
1297 # If the instance only allows sending once, mark that we have fired
1387 # If the instance only allows sending once, mark that we have fired
1298 # our one shot.
1388 # our one shot.
1299 if not self._hasmultiplesend:
1389 if not self._hasmultiplesend:
1300 self._canissuecommands = False
1390 self._canissuecommands = False
1301 self._cansend = False
1391 self._cansend = False
1302
1392
1303 def makeframes():
1393 def makeframes():
1304 while self._pendingrequests:
1394 while self._pendingrequests:
1305 request = self._pendingrequests.popleft()
1395 request = self._pendingrequests.popleft()
1306 for frame in self._makecommandframes(request):
1396 for frame in self._makecommandframes(request):
1307 yield frame
1397 yield frame
1308
1398
1309 return 'sendframes', {
1399 return 'sendframes', {
1310 'framegen': makeframes(),
1400 'framegen': makeframes(),
1311 }
1401 }
1312
1402
1313 def _makecommandframes(self, request):
1403 def _makecommandframes(self, request):
1314 """Emit frames to issue a command request.
1404 """Emit frames to issue a command request.
1315
1405
1316 As a side-effect, update request accounting to reflect its changed
1406 As a side-effect, update request accounting to reflect its changed
1317 state.
1407 state.
1318 """
1408 """
1319 self._activerequests[request.requestid] = request
1409 self._activerequests[request.requestid] = request
1320 request.state = 'sending'
1410 request.state = 'sending'
1321
1411
1322 res = createcommandframes(self._outgoingstream,
1412 res = createcommandframes(self._outgoingstream,
1323 request.requestid,
1413 request.requestid,
1324 request.name,
1414 request.name,
1325 request.args,
1415 request.args,
1326 datafh=request.datafh,
1416 datafh=request.datafh,
1327 redirect=request.redirect)
1417 redirect=request.redirect)
1328
1418
1329 for frame in res:
1419 for frame in res:
1330 yield frame
1420 yield frame
1331
1421
1332 request.state = 'sent'
1422 request.state = 'sent'
1333
1423
1334 def onframerecv(self, frame):
1424 def onframerecv(self, frame):
1335 """Process a frame that has been received off the wire.
1425 """Process a frame that has been received off the wire.
1336
1426
1337 Returns a 2-tuple of (action, meta) describing further action the
1427 Returns a 2-tuple of (action, meta) describing further action the
1338 caller needs to take as a result of receiving this frame.
1428 caller needs to take as a result of receiving this frame.
1339 """
1429 """
1340 if frame.streamid % 2:
1430 if frame.streamid % 2:
1341 return 'error', {
1431 return 'error', {
1342 'message': (
1432 'message': (
1343 _('received frame with odd numbered stream ID: %d') %
1433 _('received frame with odd numbered stream ID: %d') %
1344 frame.streamid),
1434 frame.streamid),
1345 }
1435 }
1346
1436
1347 if frame.streamid not in self._incomingstreams:
1437 if frame.streamid not in self._incomingstreams:
1348 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1438 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1349 return 'error', {
1439 return 'error', {
1350 'message': _('received frame on unknown stream '
1440 'message': _('received frame on unknown stream '
1351 'without beginning of stream flag set'),
1441 'without beginning of stream flag set'),
1352 }
1442 }
1353
1443
1354 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1444 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1355
1445
1356 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1446 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1357 raise error.ProgrammingError('support for decoding stream '
1447 raise error.ProgrammingError('support for decoding stream '
1358 'payloads not yet implemneted')
1448 'payloads not yet implemneted')
1359
1449
1360 if frame.streamflags & STREAM_FLAG_END_STREAM:
1450 if frame.streamflags & STREAM_FLAG_END_STREAM:
1361 del self._incomingstreams[frame.streamid]
1451 del self._incomingstreams[frame.streamid]
1362
1452
1363 if frame.requestid not in self._activerequests:
1453 if frame.requestid not in self._activerequests:
1364 return 'error', {
1454 return 'error', {
1365 'message': (_('received frame for inactive request ID: %d') %
1455 'message': (_('received frame for inactive request ID: %d') %
1366 frame.requestid),
1456 frame.requestid),
1367 }
1457 }
1368
1458
1369 request = self._activerequests[frame.requestid]
1459 request = self._activerequests[frame.requestid]
1370 request.state = 'receiving'
1460 request.state = 'receiving'
1371
1461
1372 handlers = {
1462 handlers = {
1373 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1463 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1374 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1464 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1375 }
1465 }
1376
1466
1377 meth = handlers.get(frame.typeid)
1467 meth = handlers.get(frame.typeid)
1378 if not meth:
1468 if not meth:
1379 raise error.ProgrammingError('unhandled frame type: %d' %
1469 raise error.ProgrammingError('unhandled frame type: %d' %
1380 frame.typeid)
1470 frame.typeid)
1381
1471
1382 return meth(request, frame)
1472 return meth(request, frame)
1383
1473
1384 def _oncommandresponseframe(self, request, frame):
1474 def _oncommandresponseframe(self, request, frame):
1385 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1475 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1386 request.state = 'received'
1476 request.state = 'received'
1387 del self._activerequests[request.requestid]
1477 del self._activerequests[request.requestid]
1388
1478
1389 return 'responsedata', {
1479 return 'responsedata', {
1390 'request': request,
1480 'request': request,
1391 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1481 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1392 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1482 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1393 'data': frame.payload,
1483 'data': frame.payload,
1394 }
1484 }
1395
1485
1396 def _onerrorresponseframe(self, request, frame):
1486 def _onerrorresponseframe(self, request, frame):
1397 request.state = 'errored'
1487 request.state = 'errored'
1398 del self._activerequests[request.requestid]
1488 del self._activerequests[request.requestid]
1399
1489
1400 # The payload should be a CBOR map.
1490 # The payload should be a CBOR map.
1401 m = cborutil.decodeall(frame.payload)[0]
1491 m = cborutil.decodeall(frame.payload)[0]
1402
1492
1403 return 'error', {
1493 return 'error', {
1404 'request': request,
1494 'request': request,
1405 'type': m['type'],
1495 'type': m['type'],
1406 'message': m['message'],
1496 'message': m['message'],
1407 }
1497 }
@@ -1,499 +1,602 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial.thirdparty import (
5 from mercurial.thirdparty import (
6 cbor,
6 cbor,
7 )
7 )
8 from mercurial import (
8 from mercurial import (
9 util,
9 util,
10 wireprotoframing as framing,
10 wireprotoframing as framing,
11 )
11 )
12 from mercurial.utils import (
13 cborutil,
14 )
12
15
13 ffs = framing.makeframefromhumanstring
16 ffs = framing.makeframefromhumanstring
14
17
15 OK = cbor.dumps({b'status': b'ok'})
18 OK = cbor.dumps({b'status': b'ok'})
16
19
17 def makereactor(deferoutput=False):
20 def makereactor(deferoutput=False):
18 return framing.serverreactor(deferoutput=deferoutput)
21 return framing.serverreactor(deferoutput=deferoutput)
19
22
20 def sendframes(reactor, gen):
23 def sendframes(reactor, gen):
21 """Send a generator of frame bytearray to a reactor.
24 """Send a generator of frame bytearray to a reactor.
22
25
23 Emits a generator of results from ``onframerecv()`` calls.
26 Emits a generator of results from ``onframerecv()`` calls.
24 """
27 """
25 for frame in gen:
28 for frame in gen:
26 header = framing.parseheader(frame)
29 header = framing.parseheader(frame)
27 payload = frame[framing.FRAME_HEADER_SIZE:]
30 payload = frame[framing.FRAME_HEADER_SIZE:]
28 assert len(payload) == header.length
31 assert len(payload) == header.length
29
32
30 yield reactor.onframerecv(framing.frame(header.requestid,
33 yield reactor.onframerecv(framing.frame(header.requestid,
31 header.streamid,
34 header.streamid,
32 header.streamflags,
35 header.streamflags,
33 header.typeid,
36 header.typeid,
34 header.flags,
37 header.flags,
35 payload))
38 payload))
36
39
37 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
40 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
38 """Generate frames to run a command and send them to a reactor."""
41 """Generate frames to run a command and send them to a reactor."""
39 return sendframes(reactor,
42 return sendframes(reactor,
40 framing.createcommandframes(stream, rid, cmd, args,
43 framing.createcommandframes(stream, rid, cmd, args,
41 datafh))
44 datafh))
42
45
43
46
44 class ServerReactorTests(unittest.TestCase):
47 class ServerReactorTests(unittest.TestCase):
45 def _sendsingleframe(self, reactor, f):
48 def _sendsingleframe(self, reactor, f):
46 results = list(sendframes(reactor, [f]))
49 results = list(sendframes(reactor, [f]))
47 self.assertEqual(len(results), 1)
50 self.assertEqual(len(results), 1)
48
51
49 return results[0]
52 return results[0]
50
53
51 def assertaction(self, res, expected):
54 def assertaction(self, res, expected):
52 self.assertIsInstance(res, tuple)
55 self.assertIsInstance(res, tuple)
53 self.assertEqual(len(res), 2)
56 self.assertEqual(len(res), 2)
54 self.assertIsInstance(res[1], dict)
57 self.assertIsInstance(res[1], dict)
55 self.assertEqual(res[0], expected)
58 self.assertEqual(res[0], expected)
56
59
57 def assertframesequal(self, frames, framestrings):
60 def assertframesequal(self, frames, framestrings):
58 expected = [ffs(s) for s in framestrings]
61 expected = [ffs(s) for s in framestrings]
59 self.assertEqual(list(frames), expected)
62 self.assertEqual(list(frames), expected)
60
63
61 def test1framecommand(self):
64 def test1framecommand(self):
62 """Receiving a command in a single frame yields request to run it."""
65 """Receiving a command in a single frame yields request to run it."""
63 reactor = makereactor()
66 reactor = makereactor()
64 stream = framing.stream(1)
67 stream = framing.stream(1)
65 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
68 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
66 self.assertEqual(len(results), 1)
69 self.assertEqual(len(results), 1)
67 self.assertaction(results[0], b'runcommand')
70 self.assertaction(results[0], b'runcommand')
68 self.assertEqual(results[0][1], {
71 self.assertEqual(results[0][1], {
69 b'requestid': 1,
72 b'requestid': 1,
70 b'command': b'mycommand',
73 b'command': b'mycommand',
71 b'args': {},
74 b'args': {},
72 b'redirect': None,
75 b'redirect': None,
73 b'data': None,
76 b'data': None,
74 })
77 })
75
78
76 result = reactor.oninputeof()
79 result = reactor.oninputeof()
77 self.assertaction(result, b'noop')
80 self.assertaction(result, b'noop')
78
81
79 def test1argument(self):
82 def test1argument(self):
80 reactor = makereactor()
83 reactor = makereactor()
81 stream = framing.stream(1)
84 stream = framing.stream(1)
82 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
85 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
83 {b'foo': b'bar'}))
86 {b'foo': b'bar'}))
84 self.assertEqual(len(results), 1)
87 self.assertEqual(len(results), 1)
85 self.assertaction(results[0], b'runcommand')
88 self.assertaction(results[0], b'runcommand')
86 self.assertEqual(results[0][1], {
89 self.assertEqual(results[0][1], {
87 b'requestid': 41,
90 b'requestid': 41,
88 b'command': b'mycommand',
91 b'command': b'mycommand',
89 b'args': {b'foo': b'bar'},
92 b'args': {b'foo': b'bar'},
90 b'redirect': None,
93 b'redirect': None,
91 b'data': None,
94 b'data': None,
92 })
95 })
93
96
94 def testmultiarguments(self):
97 def testmultiarguments(self):
95 reactor = makereactor()
98 reactor = makereactor()
96 stream = framing.stream(1)
99 stream = framing.stream(1)
97 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
100 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
98 {b'foo': b'bar', b'biz': b'baz'}))
101 {b'foo': b'bar', b'biz': b'baz'}))
99 self.assertEqual(len(results), 1)
102 self.assertEqual(len(results), 1)
100 self.assertaction(results[0], b'runcommand')
103 self.assertaction(results[0], b'runcommand')
101 self.assertEqual(results[0][1], {
104 self.assertEqual(results[0][1], {
102 b'requestid': 1,
105 b'requestid': 1,
103 b'command': b'mycommand',
106 b'command': b'mycommand',
104 b'args': {b'foo': b'bar', b'biz': b'baz'},
107 b'args': {b'foo': b'bar', b'biz': b'baz'},
105 b'redirect': None,
108 b'redirect': None,
106 b'data': None,
109 b'data': None,
107 })
110 })
108
111
109 def testsimplecommanddata(self):
112 def testsimplecommanddata(self):
110 reactor = makereactor()
113 reactor = makereactor()
111 stream = framing.stream(1)
114 stream = framing.stream(1)
112 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
115 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
113 util.bytesio(b'data!')))
116 util.bytesio(b'data!')))
114 self.assertEqual(len(results), 2)
117 self.assertEqual(len(results), 2)
115 self.assertaction(results[0], b'wantframe')
118 self.assertaction(results[0], b'wantframe')
116 self.assertaction(results[1], b'runcommand')
119 self.assertaction(results[1], b'runcommand')
117 self.assertEqual(results[1][1], {
120 self.assertEqual(results[1][1], {
118 b'requestid': 1,
121 b'requestid': 1,
119 b'command': b'mycommand',
122 b'command': b'mycommand',
120 b'args': {},
123 b'args': {},
121 b'redirect': None,
124 b'redirect': None,
122 b'data': b'data!',
125 b'data': b'data!',
123 })
126 })
124
127
125 def testmultipledataframes(self):
128 def testmultipledataframes(self):
126 frames = [
129 frames = [
127 ffs(b'1 1 stream-begin command-request new|have-data '
130 ffs(b'1 1 stream-begin command-request new|have-data '
128 b"cbor:{b'name': b'mycommand'}"),
131 b"cbor:{b'name': b'mycommand'}"),
129 ffs(b'1 1 0 command-data continuation data1'),
132 ffs(b'1 1 0 command-data continuation data1'),
130 ffs(b'1 1 0 command-data continuation data2'),
133 ffs(b'1 1 0 command-data continuation data2'),
131 ffs(b'1 1 0 command-data eos data3'),
134 ffs(b'1 1 0 command-data eos data3'),
132 ]
135 ]
133
136
134 reactor = makereactor()
137 reactor = makereactor()
135 results = list(sendframes(reactor, frames))
138 results = list(sendframes(reactor, frames))
136 self.assertEqual(len(results), 4)
139 self.assertEqual(len(results), 4)
137 for i in range(3):
140 for i in range(3):
138 self.assertaction(results[i], b'wantframe')
141 self.assertaction(results[i], b'wantframe')
139 self.assertaction(results[3], b'runcommand')
142 self.assertaction(results[3], b'runcommand')
140 self.assertEqual(results[3][1], {
143 self.assertEqual(results[3][1], {
141 b'requestid': 1,
144 b'requestid': 1,
142 b'command': b'mycommand',
145 b'command': b'mycommand',
143 b'args': {},
146 b'args': {},
144 b'redirect': None,
147 b'redirect': None,
145 b'data': b'data1data2data3',
148 b'data': b'data1data2data3',
146 })
149 })
147
150
148 def testargumentanddata(self):
151 def testargumentanddata(self):
149 frames = [
152 frames = [
150 ffs(b'1 1 stream-begin command-request new|have-data '
153 ffs(b'1 1 stream-begin command-request new|have-data '
151 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
154 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
152 b"b'foo': b'bar'}}"),
155 b"b'foo': b'bar'}}"),
153 ffs(b'1 1 0 command-data continuation value1'),
156 ffs(b'1 1 0 command-data continuation value1'),
154 ffs(b'1 1 0 command-data eos value2'),
157 ffs(b'1 1 0 command-data eos value2'),
155 ]
158 ]
156
159
157 reactor = makereactor()
160 reactor = makereactor()
158 results = list(sendframes(reactor, frames))
161 results = list(sendframes(reactor, frames))
159
162
160 self.assertaction(results[-1], b'runcommand')
163 self.assertaction(results[-1], b'runcommand')
161 self.assertEqual(results[-1][1], {
164 self.assertEqual(results[-1][1], {
162 b'requestid': 1,
165 b'requestid': 1,
163 b'command': b'command',
166 b'command': b'command',
164 b'args': {
167 b'args': {
165 b'key': b'val',
168 b'key': b'val',
166 b'foo': b'bar',
169 b'foo': b'bar',
167 },
170 },
168 b'redirect': None,
171 b'redirect': None,
169 b'data': b'value1value2',
172 b'data': b'value1value2',
170 })
173 })
171
174
172 def testnewandcontinuation(self):
175 def testnewandcontinuation(self):
173 result = self._sendsingleframe(makereactor(),
176 result = self._sendsingleframe(makereactor(),
174 ffs(b'1 1 stream-begin command-request new|continuation '))
177 ffs(b'1 1 stream-begin command-request new|continuation '))
175 self.assertaction(result, b'error')
178 self.assertaction(result, b'error')
176 self.assertEqual(result[1], {
179 self.assertEqual(result[1], {
177 b'message': b'received command request frame with both new and '
180 b'message': b'received command request frame with both new and '
178 b'continuation flags set',
181 b'continuation flags set',
179 })
182 })
180
183
181 def testneithernewnorcontinuation(self):
184 def testneithernewnorcontinuation(self):
182 result = self._sendsingleframe(makereactor(),
185 result = self._sendsingleframe(makereactor(),
183 ffs(b'1 1 stream-begin command-request 0 '))
186 ffs(b'1 1 stream-begin command-request 0 '))
184 self.assertaction(result, b'error')
187 self.assertaction(result, b'error')
185 self.assertEqual(result[1], {
188 self.assertEqual(result[1], {
186 b'message': b'received command request frame with neither new nor '
189 b'message': b'received command request frame with neither new nor '
187 b'continuation flags set',
190 b'continuation flags set',
188 })
191 })
189
192
190 def testunexpectedcommanddata(self):
193 def testunexpectedcommanddata(self):
191 """Command data frame when not running a command is an error."""
194 """Command data frame when not running a command is an error."""
192 result = self._sendsingleframe(makereactor(),
195 result = self._sendsingleframe(makereactor(),
193 ffs(b'1 1 stream-begin command-data 0 ignored'))
196 ffs(b'1 1 stream-begin command-data 0 ignored'))
194 self.assertaction(result, b'error')
197 self.assertaction(result, b'error')
195 self.assertEqual(result[1], {
198 self.assertEqual(result[1], {
196 b'message': b'expected command request frame; got 2',
199 b'message': b'expected sender protocol settings or command request '
200 b'frame; got 2',
197 })
201 })
198
202
199 def testunexpectedcommanddatareceiving(self):
203 def testunexpectedcommanddatareceiving(self):
200 """Same as above except the command is receiving."""
204 """Same as above except the command is receiving."""
201 results = list(sendframes(makereactor(), [
205 results = list(sendframes(makereactor(), [
202 ffs(b'1 1 stream-begin command-request new|more '
206 ffs(b'1 1 stream-begin command-request new|more '
203 b"cbor:{b'name': b'ignored'}"),
207 b"cbor:{b'name': b'ignored'}"),
204 ffs(b'1 1 0 command-data eos ignored'),
208 ffs(b'1 1 0 command-data eos ignored'),
205 ]))
209 ]))
206
210
207 self.assertaction(results[0], b'wantframe')
211 self.assertaction(results[0], b'wantframe')
208 self.assertaction(results[1], b'error')
212 self.assertaction(results[1], b'error')
209 self.assertEqual(results[1][1], {
213 self.assertEqual(results[1][1], {
210 b'message': b'received command data frame for request that is not '
214 b'message': b'received command data frame for request that is not '
211 b'expecting data: 1',
215 b'expecting data: 1',
212 })
216 })
213
217
214 def testconflictingrequestidallowed(self):
218 def testconflictingrequestidallowed(self):
215 """Multiple fully serviced commands with same request ID is allowed."""
219 """Multiple fully serviced commands with same request ID is allowed."""
216 reactor = makereactor()
220 reactor = makereactor()
217 results = []
221 results = []
218 outstream = reactor.makeoutputstream()
222 outstream = reactor.makeoutputstream()
219 results.append(self._sendsingleframe(
223 results.append(self._sendsingleframe(
220 reactor, ffs(b'1 1 stream-begin command-request new '
224 reactor, ffs(b'1 1 stream-begin command-request new '
221 b"cbor:{b'name': b'command'}")))
225 b"cbor:{b'name': b'command'}")))
222 result = reactor.oncommandresponseready(outstream, 1, b'response1')
226 result = reactor.oncommandresponseready(outstream, 1, b'response1')
223 self.assertaction(result, b'sendframes')
227 self.assertaction(result, b'sendframes')
224 list(result[1][b'framegen'])
228 list(result[1][b'framegen'])
225 results.append(self._sendsingleframe(
229 results.append(self._sendsingleframe(
226 reactor, ffs(b'1 1 stream-begin command-request new '
230 reactor, ffs(b'1 1 stream-begin command-request new '
227 b"cbor:{b'name': b'command'}")))
231 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponseready(outstream, 1, b'response2')
232 result = reactor.oncommandresponseready(outstream, 1, b'response2')
229 self.assertaction(result, b'sendframes')
233 self.assertaction(result, b'sendframes')
230 list(result[1][b'framegen'])
234 list(result[1][b'framegen'])
231 results.append(self._sendsingleframe(
235 results.append(self._sendsingleframe(
232 reactor, ffs(b'1 1 stream-begin command-request new '
236 reactor, ffs(b'1 1 stream-begin command-request new '
233 b"cbor:{b'name': b'command'}")))
237 b"cbor:{b'name': b'command'}")))
234 result = reactor.oncommandresponseready(outstream, 1, b'response3')
238 result = reactor.oncommandresponseready(outstream, 1, b'response3')
235 self.assertaction(result, b'sendframes')
239 self.assertaction(result, b'sendframes')
236 list(result[1][b'framegen'])
240 list(result[1][b'framegen'])
237
241
238 for i in range(3):
242 for i in range(3):
239 self.assertaction(results[i], b'runcommand')
243 self.assertaction(results[i], b'runcommand')
240 self.assertEqual(results[i][1], {
244 self.assertEqual(results[i][1], {
241 b'requestid': 1,
245 b'requestid': 1,
242 b'command': b'command',
246 b'command': b'command',
243 b'args': {},
247 b'args': {},
244 b'redirect': None,
248 b'redirect': None,
245 b'data': None,
249 b'data': None,
246 })
250 })
247
251
248 def testconflictingrequestid(self):
252 def testconflictingrequestid(self):
249 """Request ID for new command matching in-flight command is illegal."""
253 """Request ID for new command matching in-flight command is illegal."""
250 results = list(sendframes(makereactor(), [
254 results = list(sendframes(makereactor(), [
251 ffs(b'1 1 stream-begin command-request new|more '
255 ffs(b'1 1 stream-begin command-request new|more '
252 b"cbor:{b'name': b'command'}"),
256 b"cbor:{b'name': b'command'}"),
253 ffs(b'1 1 0 command-request new '
257 ffs(b'1 1 0 command-request new '
254 b"cbor:{b'name': b'command1'}"),
258 b"cbor:{b'name': b'command1'}"),
255 ]))
259 ]))
256
260
257 self.assertaction(results[0], b'wantframe')
261 self.assertaction(results[0], b'wantframe')
258 self.assertaction(results[1], b'error')
262 self.assertaction(results[1], b'error')
259 self.assertEqual(results[1][1], {
263 self.assertEqual(results[1][1], {
260 b'message': b'request with ID 1 already received',
264 b'message': b'request with ID 1 already received',
261 })
265 })
262
266
263 def testinterleavedcommands(self):
267 def testinterleavedcommands(self):
264 cbor1 = cbor.dumps({
268 cbor1 = cbor.dumps({
265 b'name': b'command1',
269 b'name': b'command1',
266 b'args': {
270 b'args': {
267 b'foo': b'bar',
271 b'foo': b'bar',
268 b'key1': b'val',
272 b'key1': b'val',
269 }
273 }
270 }, canonical=True)
274 }, canonical=True)
271 cbor3 = cbor.dumps({
275 cbor3 = cbor.dumps({
272 b'name': b'command3',
276 b'name': b'command3',
273 b'args': {
277 b'args': {
274 b'biz': b'baz',
278 b'biz': b'baz',
275 b'key': b'val',
279 b'key': b'val',
276 },
280 },
277 }, canonical=True)
281 }, canonical=True)
278
282
279 results = list(sendframes(makereactor(), [
283 results = list(sendframes(makereactor(), [
280 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
284 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
281 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
285 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
282 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
286 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
283 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
287 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
284 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
288 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
285 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
289 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
286 ]))
290 ]))
287
291
288 self.assertEqual([t[0] for t in results], [
292 self.assertEqual([t[0] for t in results], [
289 b'wantframe',
293 b'wantframe',
290 b'wantframe',
294 b'wantframe',
291 b'wantframe',
295 b'wantframe',
292 b'wantframe',
296 b'wantframe',
293 b'runcommand',
297 b'runcommand',
294 b'runcommand',
298 b'runcommand',
295 ])
299 ])
296
300
297 self.assertEqual(results[4][1], {
301 self.assertEqual(results[4][1], {
298 b'requestid': 3,
302 b'requestid': 3,
299 b'command': b'command3',
303 b'command': b'command3',
300 b'args': {b'biz': b'baz', b'key': b'val'},
304 b'args': {b'biz': b'baz', b'key': b'val'},
301 b'redirect': None,
305 b'redirect': None,
302 b'data': None,
306 b'data': None,
303 })
307 })
304 self.assertEqual(results[5][1], {
308 self.assertEqual(results[5][1], {
305 b'requestid': 1,
309 b'requestid': 1,
306 b'command': b'command1',
310 b'command': b'command1',
307 b'args': {b'foo': b'bar', b'key1': b'val'},
311 b'args': {b'foo': b'bar', b'key1': b'val'},
308 b'redirect': None,
312 b'redirect': None,
309 b'data': None,
313 b'data': None,
310 })
314 })
311
315
312 def testmissingcommanddataframe(self):
316 def testmissingcommanddataframe(self):
313 # The reactor doesn't currently handle partially received commands.
317 # The reactor doesn't currently handle partially received commands.
314 # So this test is failing to do anything with request 1.
318 # So this test is failing to do anything with request 1.
315 frames = [
319 frames = [
316 ffs(b'1 1 stream-begin command-request new|have-data '
320 ffs(b'1 1 stream-begin command-request new|have-data '
317 b"cbor:{b'name': b'command1'}"),
321 b"cbor:{b'name': b'command1'}"),
318 ffs(b'3 1 0 command-request new '
322 ffs(b'3 1 0 command-request new '
319 b"cbor:{b'name': b'command2'}"),
323 b"cbor:{b'name': b'command2'}"),
320 ]
324 ]
321 results = list(sendframes(makereactor(), frames))
325 results = list(sendframes(makereactor(), frames))
322 self.assertEqual(len(results), 2)
326 self.assertEqual(len(results), 2)
323 self.assertaction(results[0], b'wantframe')
327 self.assertaction(results[0], b'wantframe')
324 self.assertaction(results[1], b'runcommand')
328 self.assertaction(results[1], b'runcommand')
325
329
326 def testmissingcommanddataframeflags(self):
330 def testmissingcommanddataframeflags(self):
327 frames = [
331 frames = [
328 ffs(b'1 1 stream-begin command-request new|have-data '
332 ffs(b'1 1 stream-begin command-request new|have-data '
329 b"cbor:{b'name': b'command1'}"),
333 b"cbor:{b'name': b'command1'}"),
330 ffs(b'1 1 0 command-data 0 data'),
334 ffs(b'1 1 0 command-data 0 data'),
331 ]
335 ]
332 results = list(sendframes(makereactor(), frames))
336 results = list(sendframes(makereactor(), frames))
333 self.assertEqual(len(results), 2)
337 self.assertEqual(len(results), 2)
334 self.assertaction(results[0], b'wantframe')
338 self.assertaction(results[0], b'wantframe')
335 self.assertaction(results[1], b'error')
339 self.assertaction(results[1], b'error')
336 self.assertEqual(results[1][1], {
340 self.assertEqual(results[1][1], {
337 b'message': b'command data frame without flags',
341 b'message': b'command data frame without flags',
338 })
342 })
339
343
340 def testframefornonreceivingrequest(self):
344 def testframefornonreceivingrequest(self):
341 """Receiving a frame for a command that is not receiving is illegal."""
345 """Receiving a frame for a command that is not receiving is illegal."""
342 results = list(sendframes(makereactor(), [
346 results = list(sendframes(makereactor(), [
343 ffs(b'1 1 stream-begin command-request new '
347 ffs(b'1 1 stream-begin command-request new '
344 b"cbor:{b'name': b'command1'}"),
348 b"cbor:{b'name': b'command1'}"),
345 ffs(b'3 1 0 command-request new|have-data '
349 ffs(b'3 1 0 command-request new|have-data '
346 b"cbor:{b'name': b'command3'}"),
350 b"cbor:{b'name': b'command3'}"),
347 ffs(b'5 1 0 command-data eos ignored'),
351 ffs(b'5 1 0 command-data eos ignored'),
348 ]))
352 ]))
349 self.assertaction(results[2], b'error')
353 self.assertaction(results[2], b'error')
350 self.assertEqual(results[2][1], {
354 self.assertEqual(results[2][1], {
351 b'message': b'received frame for request that is not receiving: 5',
355 b'message': b'received frame for request that is not receiving: 5',
352 })
356 })
353
357
354 def testsimpleresponse(self):
358 def testsimpleresponse(self):
355 """Bytes response to command sends result frames."""
359 """Bytes response to command sends result frames."""
356 reactor = makereactor()
360 reactor = makereactor()
357 instream = framing.stream(1)
361 instream = framing.stream(1)
358 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
362 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
359
363
360 outstream = reactor.makeoutputstream()
364 outstream = reactor.makeoutputstream()
361 result = reactor.oncommandresponseready(outstream, 1, b'response')
365 result = reactor.oncommandresponseready(outstream, 1, b'response')
362 self.assertaction(result, b'sendframes')
366 self.assertaction(result, b'sendframes')
363 self.assertframesequal(result[1][b'framegen'], [
367 self.assertframesequal(result[1][b'framegen'], [
364 b'1 2 stream-begin command-response eos %sresponse' % OK,
368 b'1 2 stream-begin command-response eos %sresponse' % OK,
365 ])
369 ])
366
370
367 def testmultiframeresponse(self):
371 def testmultiframeresponse(self):
368 """Bytes response spanning multiple frames is handled."""
372 """Bytes response spanning multiple frames is handled."""
369 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
373 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
370 second = b'y' * 100
374 second = b'y' * 100
371
375
372 reactor = makereactor()
376 reactor = makereactor()
373 instream = framing.stream(1)
377 instream = framing.stream(1)
374 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
378 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
375
379
376 outstream = reactor.makeoutputstream()
380 outstream = reactor.makeoutputstream()
377 result = reactor.oncommandresponseready(outstream, 1, first + second)
381 result = reactor.oncommandresponseready(outstream, 1, first + second)
378 self.assertaction(result, b'sendframes')
382 self.assertaction(result, b'sendframes')
379 self.assertframesequal(result[1][b'framegen'], [
383 self.assertframesequal(result[1][b'framegen'], [
380 b'1 2 stream-begin command-response continuation %s' % OK,
384 b'1 2 stream-begin command-response continuation %s' % OK,
381 b'1 2 0 command-response continuation %s' % first,
385 b'1 2 0 command-response continuation %s' % first,
382 b'1 2 0 command-response eos %s' % second,
386 b'1 2 0 command-response eos %s' % second,
383 ])
387 ])
384
388
385 def testservererror(self):
389 def testservererror(self):
386 reactor = makereactor()
390 reactor = makereactor()
387 instream = framing.stream(1)
391 instream = framing.stream(1)
388 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
392 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
389
393
390 outstream = reactor.makeoutputstream()
394 outstream = reactor.makeoutputstream()
391 result = reactor.onservererror(outstream, 1, b'some message')
395 result = reactor.onservererror(outstream, 1, b'some message')
392 self.assertaction(result, b'sendframes')
396 self.assertaction(result, b'sendframes')
393 self.assertframesequal(result[1][b'framegen'], [
397 self.assertframesequal(result[1][b'framegen'], [
394 b"1 2 stream-begin error-response 0 "
398 b"1 2 stream-begin error-response 0 "
395 b"cbor:{b'type': b'server', "
399 b"cbor:{b'type': b'server', "
396 b"b'message': [{b'msg': b'some message'}]}",
400 b"b'message': [{b'msg': b'some message'}]}",
397 ])
401 ])
398
402
399 def test1commanddeferresponse(self):
403 def test1commanddeferresponse(self):
400 """Responses when in deferred output mode are delayed until EOF."""
404 """Responses when in deferred output mode are delayed until EOF."""
401 reactor = makereactor(deferoutput=True)
405 reactor = makereactor(deferoutput=True)
402 instream = framing.stream(1)
406 instream = framing.stream(1)
403 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
407 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
404 {}))
408 {}))
405 self.assertEqual(len(results), 1)
409 self.assertEqual(len(results), 1)
406 self.assertaction(results[0], b'runcommand')
410 self.assertaction(results[0], b'runcommand')
407
411
408 outstream = reactor.makeoutputstream()
412 outstream = reactor.makeoutputstream()
409 result = reactor.oncommandresponseready(outstream, 1, b'response')
413 result = reactor.oncommandresponseready(outstream, 1, b'response')
410 self.assertaction(result, b'noop')
414 self.assertaction(result, b'noop')
411 result = reactor.oninputeof()
415 result = reactor.oninputeof()
412 self.assertaction(result, b'sendframes')
416 self.assertaction(result, b'sendframes')
413 self.assertframesequal(result[1][b'framegen'], [
417 self.assertframesequal(result[1][b'framegen'], [
414 b'1 2 stream-begin command-response eos %sresponse' % OK,
418 b'1 2 stream-begin command-response eos %sresponse' % OK,
415 ])
419 ])
416
420
417 def testmultiplecommanddeferresponse(self):
421 def testmultiplecommanddeferresponse(self):
418 reactor = makereactor(deferoutput=True)
422 reactor = makereactor(deferoutput=True)
419 instream = framing.stream(1)
423 instream = framing.stream(1)
420 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
424 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
421 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
425 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
422
426
423 outstream = reactor.makeoutputstream()
427 outstream = reactor.makeoutputstream()
424 result = reactor.oncommandresponseready(outstream, 1, b'response1')
428 result = reactor.oncommandresponseready(outstream, 1, b'response1')
425 self.assertaction(result, b'noop')
429 self.assertaction(result, b'noop')
426 result = reactor.oncommandresponseready(outstream, 3, b'response2')
430 result = reactor.oncommandresponseready(outstream, 3, b'response2')
427 self.assertaction(result, b'noop')
431 self.assertaction(result, b'noop')
428 result = reactor.oninputeof()
432 result = reactor.oninputeof()
429 self.assertaction(result, b'sendframes')
433 self.assertaction(result, b'sendframes')
430 self.assertframesequal(result[1][b'framegen'], [
434 self.assertframesequal(result[1][b'framegen'], [
431 b'1 2 stream-begin command-response eos %sresponse1' % OK,
435 b'1 2 stream-begin command-response eos %sresponse1' % OK,
432 b'3 2 0 command-response eos %sresponse2' % OK,
436 b'3 2 0 command-response eos %sresponse2' % OK,
433 ])
437 ])
434
438
435 def testrequestidtracking(self):
439 def testrequestidtracking(self):
436 reactor = makereactor(deferoutput=True)
440 reactor = makereactor(deferoutput=True)
437 instream = framing.stream(1)
441 instream = framing.stream(1)
438 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
442 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
439 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
443 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
440 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
444 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
441
445
442 # Register results for commands out of order.
446 # Register results for commands out of order.
443 outstream = reactor.makeoutputstream()
447 outstream = reactor.makeoutputstream()
444 reactor.oncommandresponseready(outstream, 3, b'response3')
448 reactor.oncommandresponseready(outstream, 3, b'response3')
445 reactor.oncommandresponseready(outstream, 1, b'response1')
449 reactor.oncommandresponseready(outstream, 1, b'response1')
446 reactor.oncommandresponseready(outstream, 5, b'response5')
450 reactor.oncommandresponseready(outstream, 5, b'response5')
447
451
448 result = reactor.oninputeof()
452 result = reactor.oninputeof()
449 self.assertaction(result, b'sendframes')
453 self.assertaction(result, b'sendframes')
450 self.assertframesequal(result[1][b'framegen'], [
454 self.assertframesequal(result[1][b'framegen'], [
451 b'3 2 stream-begin command-response eos %sresponse3' % OK,
455 b'3 2 stream-begin command-response eos %sresponse3' % OK,
452 b'1 2 0 command-response eos %sresponse1' % OK,
456 b'1 2 0 command-response eos %sresponse1' % OK,
453 b'5 2 0 command-response eos %sresponse5' % OK,
457 b'5 2 0 command-response eos %sresponse5' % OK,
454 ])
458 ])
455
459
456 def testduplicaterequestonactivecommand(self):
460 def testduplicaterequestonactivecommand(self):
457 """Receiving a request ID that matches a request that isn't finished."""
461 """Receiving a request ID that matches a request that isn't finished."""
458 reactor = makereactor()
462 reactor = makereactor()
459 stream = framing.stream(1)
463 stream = framing.stream(1)
460 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
464 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
461 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
465 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
462
466
463 self.assertaction(results[0], b'error')
467 self.assertaction(results[0], b'error')
464 self.assertEqual(results[0][1], {
468 self.assertEqual(results[0][1], {
465 b'message': b'request with ID 1 is already active',
469 b'message': b'request with ID 1 is already active',
466 })
470 })
467
471
468 def testduplicaterequestonactivecommandnosend(self):
472 def testduplicaterequestonactivecommandnosend(self):
469 """Same as above but we've registered a response but haven't sent it."""
473 """Same as above but we've registered a response but haven't sent it."""
470 reactor = makereactor()
474 reactor = makereactor()
471 instream = framing.stream(1)
475 instream = framing.stream(1)
472 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
476 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
473 outstream = reactor.makeoutputstream()
477 outstream = reactor.makeoutputstream()
474 reactor.oncommandresponseready(outstream, 1, b'response')
478 reactor.oncommandresponseready(outstream, 1, b'response')
475
479
476 # We've registered the response but haven't sent it. From the
480 # We've registered the response but haven't sent it. From the
477 # perspective of the reactor, the command is still active.
481 # perspective of the reactor, the command is still active.
478
482
479 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
483 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
480 self.assertaction(results[0], b'error')
484 self.assertaction(results[0], b'error')
481 self.assertEqual(results[0][1], {
485 self.assertEqual(results[0][1], {
482 b'message': b'request with ID 1 is already active',
486 b'message': b'request with ID 1 is already active',
483 })
487 })
484
488
485 def testduplicaterequestaftersend(self):
489 def testduplicaterequestaftersend(self):
486 """We can use a duplicate request ID after we've sent the response."""
490 """We can use a duplicate request ID after we've sent the response."""
487 reactor = makereactor()
491 reactor = makereactor()
488 instream = framing.stream(1)
492 instream = framing.stream(1)
489 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
493 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
490 outstream = reactor.makeoutputstream()
494 outstream = reactor.makeoutputstream()
491 res = reactor.oncommandresponseready(outstream, 1, b'response')
495 res = reactor.oncommandresponseready(outstream, 1, b'response')
492 list(res[1][b'framegen'])
496 list(res[1][b'framegen'])
493
497
494 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
498 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
495 self.assertaction(results[0], b'runcommand')
499 self.assertaction(results[0], b'runcommand')
496
500
501 def testprotocolsettingsnoflags(self):
502 result = self._sendsingleframe(
503 makereactor(),
504 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
505 self.assertaction(result, b'error')
506 self.assertEqual(result[1], {
507 b'message': b'sender protocol settings frame must have '
508 b'continuation or end of stream flag set',
509 })
510
511 def testprotocolsettingsconflictflags(self):
512 result = self._sendsingleframe(
513 makereactor(),
514 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
515 self.assertaction(result, b'error')
516 self.assertEqual(result[1], {
517 b'message': b'sender protocol settings frame cannot have both '
518 b'continuation and end of stream flags set',
519 })
520
521 def testprotocolsettingsemptypayload(self):
522 result = self._sendsingleframe(
523 makereactor(),
524 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
525 self.assertaction(result, b'error')
526 self.assertEqual(result[1], {
527 b'message': b'sender protocol settings frame did not contain CBOR '
528 b'data',
529 })
530
531 def testprotocolsettingsmultipleobjects(self):
532 result = self._sendsingleframe(
533 makereactor(),
534 ffs(b'0 1 stream-begin sender-protocol-settings eos '
535 b'\x46foobar\x43foo'))
536 self.assertaction(result, b'error')
537 self.assertEqual(result[1], {
538 b'message': b'sender protocol settings frame contained multiple '
539 b'CBOR values',
540 })
541
542 def testprotocolsettingscontentencodings(self):
543 reactor = makereactor()
544
545 result = self._sendsingleframe(
546 reactor,
547 ffs(b'0 1 stream-begin sender-protocol-settings eos '
548 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
549 self.assertaction(result, b'wantframe')
550
551 self.assertEqual(reactor._state, b'idle')
552 self.assertEqual(reactor._sendersettings[b'contentencodings'],
553 [b'a', b'b'])
554
555 def testprotocolsettingsmultipleframes(self):
556 reactor = makereactor()
557
558 data = b''.join(cborutil.streamencode({
559 b'contentencodings': [b'value1', b'value2'],
560 }))
561
562 results = list(sendframes(reactor, [
563 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
564 data[0:5]),
565 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
566 ]))
567
568 self.assertEqual(len(results), 2)
569
570 self.assertaction(results[0], b'wantframe')
571 self.assertaction(results[1], b'wantframe')
572
573 self.assertEqual(reactor._state, b'idle')
574 self.assertEqual(reactor._sendersettings[b'contentencodings'],
575 [b'value1', b'value2'])
576
577 def testprotocolsettingsbadcbor(self):
578 result = self._sendsingleframe(
579 makereactor(),
580 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
581 self.assertaction(result, b'error')
582
583 def testprotocolsettingsnoninitial(self):
584 # Cannot have protocol settings frames as non-initial frames.
585 reactor = makereactor()
586
587 stream = framing.stream(1)
588 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
589 self.assertEqual(len(results), 1)
590 self.assertaction(results[0], b'runcommand')
591
592 result = self._sendsingleframe(
593 reactor,
594 ffs(b'0 1 0 sender-protocol-settings eos '))
595 self.assertaction(result, b'error')
596 self.assertEqual(result[1], {
597 b'message': b'expected command request frame; got 8',
598 })
599
497 if __name__ == '__main__':
600 if __name__ == '__main__':
498 import silenttestrunner
601 import silenttestrunner
499 silenttestrunner.main(__name__)
602 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now