##// END OF EJS Templates
wireprotov2: handle stream encoding settings frames...
Gregory Szorc -
r40164:57782791 default
parent child Browse files
Show More
@@ -1,1531 +1,1600 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
371
371
372 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
373 """
373 """
374 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
378
378
379 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
384 flags=flags,
385 payload=overall + data)
385 payload=overall + data)
386 return
386 return
387
387
388 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
393 payload=overall)
394
394
395 offset = 0
395 offset = 0
396 while True:
396 while True:
397 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
398 offset += len(chunk)
399 done = offset == len(data)
399 done = offset == len(data)
400
400
401 if done:
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
405
406 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
408 flags=flags,
409 payload=chunk)
409 payload=chunk)
410
410
411 if done:
411 if done:
412 break
412 break
413
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
417
417
418 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
420 flag.
421 """
421 """
422 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
424
425 while True:
425 while True:
426 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
427 if not chunk:
427 if not chunk:
428 break
428 break
429
429
430 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
432 flags=flags,
433 payload=chunk)
433 payload=chunk)
434
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
436
437 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
439
440 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
443 payload=overall)
444
444
445 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
450 payload=b'')
451
451
452 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
453 data = {
454 b'status': b'redirect',
454 b'status': b'redirect',
455 b'location': {
455 b'location': {
456 b'url': location.url,
456 b'url': location.url,
457 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
458 }
458 }
459 }
459 }
460
460
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
462 r'servercadercerts'):
463 value = getattr(location, a)
463 value = getattr(location, a)
464 if value is not None:
464 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
466
466
467 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
471
471
472 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
474 # formatting works consistently?
475 m = {
475 m = {
476 b'status': b'error',
476 b'status': b'error',
477 b'error': {
477 b'error': {
478 b'message': message,
478 b'message': message,
479 }
479 }
480 }
480 }
481
481
482 if args:
482 if args:
483 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
484
484
485 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
486
486
487 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
490 payload=overall)
491
491
492 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
495
496 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
497 b'type': errtype,
498 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
499 }))
499 }))
500
500
501 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
503 flags=0,
504 payload=payload)
504 payload=payload)
505
505
506 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
509
509
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
511
512 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
516 """
516 """
517 atomdicts = []
517 atomdicts = []
518
518
519 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
521
521
522 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
524 for arg in args:
524 for arg in args:
525 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
527 for label in labels:
527 for label in labels:
528 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
530
530
531 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
533
534 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
536
537 # Labels must be ASCII.
537 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
539 for l in labels]
540
540
541 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
542 if args:
542 if args:
543 atom[b'args'] = args
543 atom[b'args'] = args
544 if labels:
544 if labels:
545 atom[b'labels'] = labels
545 atom[b'labels'] = labels
546
546
547 atomdicts.append(atom)
547 atomdicts.append(atom)
548
548
549 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
550
550
551 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
553
553
554 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
556 flags=0,
557 payload=payload)
557 payload=payload)
558
558
559 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
561
561
562 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
565
565
566 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
568 level.
568 level.
569 """
569 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
571 self._stream = stream
572 self._requestid = requestid
572 self._requestid = requestid
573 self._maxsize = maxframesize
573 self._maxsize = maxframesize
574 self._chunks = []
574 self._chunks = []
575 self._chunkssize = 0
575 self._chunkssize = 0
576
576
577 def send(self, data):
577 def send(self, data):
578 """Send new data for emission.
578 """Send new data for emission.
579
579
580 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
581
581
582 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
583 data to frames.
583 data to frames.
584 """
584 """
585
585
586 if data is None:
586 if data is None:
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589 return
589 return
590
590
591 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
594 # a simple implementation:
595 #
595 #
596 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
597 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
599 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
601 # and then buffer the new chunk.
602
602
603 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
604 for frame in self._flush():
604 for frame in self._flush():
605 yield frame
605 yield frame
606
606
607 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
608 offset = 0
608 offset = 0
609 while True:
609 while True:
610 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
611 offset += len(chunk)
612
612
613 yield self._stream.makeframe(
613 yield self._stream.makeframe(
614 self._requestid,
614 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
617 payload=chunk)
618
618
619 if offset == len(data):
619 if offset == len(data):
620 return
620 return
621
621
622 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
623 # return.
623 # return.
624 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
625 self._chunks.append(data)
626 self._chunkssize += len(data)
626 self._chunkssize += len(data)
627 return
627 return
628
628
629 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
631 # keep things simple for now.
632 for frame in self._flush():
632 for frame in self._flush():
633 yield frame
633 yield frame
634
634
635 self._chunks.append(data)
635 self._chunks.append(data)
636 self._chunkssize = len(data)
636 self._chunkssize = len(data)
637
637
638 def _flush(self):
638 def _flush(self):
639 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
641
641
642 self._chunks[:] = []
642 self._chunks[:] = []
643 self._chunkssize = 0
643 self._chunkssize = 0
644
644
645 yield self._stream.makeframe(
645 yield self._stream.makeframe(
646 self._requestid,
646 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 class stream(object):
651 class stream(object):
652 """Represents a logical unidirectional series of frames."""
652 """Represents a logical unidirectional series of frames."""
653
653
654 def __init__(self, streamid, active=False):
654 def __init__(self, streamid, active=False):
655 self.streamid = streamid
655 self.streamid = streamid
656 self._active = active
656 self._active = active
657
657
658 def makeframe(self, requestid, typeid, flags, payload):
658 def makeframe(self, requestid, typeid, flags, payload):
659 """Create a frame to be sent out over this stream.
659 """Create a frame to be sent out over this stream.
660
660
661 Only returns the frame instance. Does not actually send it.
661 Only returns the frame instance. Does not actually send it.
662 """
662 """
663 streamflags = 0
663 streamflags = 0
664 if not self._active:
664 if not self._active:
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 self._active = True
666 self._active = True
667
667
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 payload)
669 payload)
670
670
671 def setdecoder(self, name, extraobjs):
672 """Set the decoder for this stream.
673
674 Receives the stream profile name and any additional CBOR objects
675 decoded from the stream encoding settings frame payloads.
676 """
677
671 def ensureserverstream(stream):
678 def ensureserverstream(stream):
672 if stream.streamid % 2:
679 if stream.streamid % 2:
673 raise error.ProgrammingError('server should only write to even '
680 raise error.ProgrammingError('server should only write to even '
674 'numbered streams; %d is not even' %
681 'numbered streams; %d is not even' %
675 stream.streamid)
682 stream.streamid)
676
683
677 DEFAULT_PROTOCOL_SETTINGS = {
684 DEFAULT_PROTOCOL_SETTINGS = {
678 'contentencodings': [b'identity'],
685 'contentencodings': [b'identity'],
679 }
686 }
680
687
681 class serverreactor(object):
688 class serverreactor(object):
682 """Holds state of a server handling frame-based protocol requests.
689 """Holds state of a server handling frame-based protocol requests.
683
690
684 This class is the "brain" of the unified frame-based protocol server
691 This class is the "brain" of the unified frame-based protocol server
685 component. While the protocol is stateless from the perspective of
692 component. While the protocol is stateless from the perspective of
686 requests/commands, something needs to track which frames have been
693 requests/commands, something needs to track which frames have been
687 received, what frames to expect, etc. This class is that thing.
694 received, what frames to expect, etc. This class is that thing.
688
695
689 Instances are modeled as a state machine of sorts. Instances are also
696 Instances are modeled as a state machine of sorts. Instances are also
690 reactionary to external events. The point of this class is to encapsulate
697 reactionary to external events. The point of this class is to encapsulate
691 the state of the connection and the exchange of frames, not to perform
698 the state of the connection and the exchange of frames, not to perform
692 work. Instead, callers tell this class when something occurs, like a
699 work. Instead, callers tell this class when something occurs, like a
693 frame arriving. If that activity is worthy of a follow-up action (say
700 frame arriving. If that activity is worthy of a follow-up action (say
694 *run a command*), the return value of that handler will say so.
701 *run a command*), the return value of that handler will say so.
695
702
696 I/O and CPU intensive operations are purposefully delegated outside of
703 I/O and CPU intensive operations are purposefully delegated outside of
697 this class.
704 this class.
698
705
699 Consumers are expected to tell instances when events occur. They do so by
706 Consumers are expected to tell instances when events occur. They do so by
700 calling the various ``on*`` methods. These methods return a 2-tuple
707 calling the various ``on*`` methods. These methods return a 2-tuple
701 describing any follow-up action(s) to take. The first element is the
708 describing any follow-up action(s) to take. The first element is the
702 name of an action to perform. The second is a data structure (usually
709 name of an action to perform. The second is a data structure (usually
703 a dict) specific to that action that contains more information. e.g.
710 a dict) specific to that action that contains more information. e.g.
704 if the server wants to send frames back to the client, the data structure
711 if the server wants to send frames back to the client, the data structure
705 will contain a reference to those frames.
712 will contain a reference to those frames.
706
713
707 Valid actions that consumers can be instructed to take are:
714 Valid actions that consumers can be instructed to take are:
708
715
709 sendframes
716 sendframes
710 Indicates that frames should be sent to the client. The ``framegen``
717 Indicates that frames should be sent to the client. The ``framegen``
711 key contains a generator of frames that should be sent. The server
718 key contains a generator of frames that should be sent. The server
712 assumes that all frames are sent to the client.
719 assumes that all frames are sent to the client.
713
720
714 error
721 error
715 Indicates that an error occurred. Consumer should probably abort.
722 Indicates that an error occurred. Consumer should probably abort.
716
723
717 runcommand
724 runcommand
718 Indicates that the consumer should run a wire protocol command. Details
725 Indicates that the consumer should run a wire protocol command. Details
719 of the command to run are given in the data structure.
726 of the command to run are given in the data structure.
720
727
721 wantframe
728 wantframe
722 Indicates that nothing of interest happened and the server is waiting on
729 Indicates that nothing of interest happened and the server is waiting on
723 more frames from the client before anything interesting can be done.
730 more frames from the client before anything interesting can be done.
724
731
725 noop
732 noop
726 Indicates no additional action is required.
733 Indicates no additional action is required.
727
734
728 Known Issues
735 Known Issues
729 ------------
736 ------------
730
737
731 There are no limits to the number of partially received commands or their
738 There are no limits to the number of partially received commands or their
732 size. A malicious client could stream command request data and exhaust the
739 size. A malicious client could stream command request data and exhaust the
733 server's memory.
740 server's memory.
734
741
735 Partially received commands are not acted upon when end of input is
742 Partially received commands are not acted upon when end of input is
736 reached. Should the server error if it receives a partial request?
743 reached. Should the server error if it receives a partial request?
737 Should the client send a message to abort a partially transmitted request
744 Should the client send a message to abort a partially transmitted request
738 to facilitate graceful shutdown?
745 to facilitate graceful shutdown?
739
746
740 Active requests that haven't been responded to aren't tracked. This means
747 Active requests that haven't been responded to aren't tracked. This means
741 that if we receive a command and instruct its dispatch, another command
748 that if we receive a command and instruct its dispatch, another command
742 with its request ID can come in over the wire and there will be a race
749 with its request ID can come in over the wire and there will be a race
743 between who responds to what.
750 between who responds to what.
744 """
751 """
745
752
746 def __init__(self, deferoutput=False):
753 def __init__(self, deferoutput=False):
747 """Construct a new server reactor.
754 """Construct a new server reactor.
748
755
749 ``deferoutput`` can be used to indicate that no output frames should be
756 ``deferoutput`` can be used to indicate that no output frames should be
750 instructed to be sent until input has been exhausted. In this mode,
757 instructed to be sent until input has been exhausted. In this mode,
751 events that would normally generate output frames (such as a command
758 events that would normally generate output frames (such as a command
752 response being ready) will instead defer instructing the consumer to
759 response being ready) will instead defer instructing the consumer to
753 send those frames. This is useful for half-duplex transports where the
760 send those frames. This is useful for half-duplex transports where the
754 sender cannot receive until all data has been transmitted.
761 sender cannot receive until all data has been transmitted.
755 """
762 """
756 self._deferoutput = deferoutput
763 self._deferoutput = deferoutput
757 self._state = 'initial'
764 self._state = 'initial'
758 self._nextoutgoingstreamid = 2
765 self._nextoutgoingstreamid = 2
759 self._bufferedframegens = []
766 self._bufferedframegens = []
760 # stream id -> stream instance for all active streams from the client.
767 # stream id -> stream instance for all active streams from the client.
761 self._incomingstreams = {}
768 self._incomingstreams = {}
762 self._outgoingstreams = {}
769 self._outgoingstreams = {}
763 # request id -> dict of commands that are actively being received.
770 # request id -> dict of commands that are actively being received.
764 self._receivingcommands = {}
771 self._receivingcommands = {}
765 # Request IDs that have been received and are actively being processed.
772 # Request IDs that have been received and are actively being processed.
766 # Once all output for a request has been sent, it is removed from this
773 # Once all output for a request has been sent, it is removed from this
767 # set.
774 # set.
768 self._activecommands = set()
775 self._activecommands = set()
769
776
770 self._protocolsettingsdecoder = None
777 self._protocolsettingsdecoder = None
771
778
772 # Sender protocol settings are optional. Set implied default values.
779 # Sender protocol settings are optional. Set implied default values.
773 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
780 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
774
781
775 def onframerecv(self, frame):
782 def onframerecv(self, frame):
776 """Process a frame that has been received off the wire.
783 """Process a frame that has been received off the wire.
777
784
778 Returns a dict with an ``action`` key that details what action,
785 Returns a dict with an ``action`` key that details what action,
779 if any, the consumer should take next.
786 if any, the consumer should take next.
780 """
787 """
781 if not frame.streamid % 2:
788 if not frame.streamid % 2:
782 self._state = 'errored'
789 self._state = 'errored'
783 return self._makeerrorresult(
790 return self._makeerrorresult(
784 _('received frame with even numbered stream ID: %d') %
791 _('received frame with even numbered stream ID: %d') %
785 frame.streamid)
792 frame.streamid)
786
793
787 if frame.streamid not in self._incomingstreams:
794 if frame.streamid not in self._incomingstreams:
788 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
795 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
789 self._state = 'errored'
796 self._state = 'errored'
790 return self._makeerrorresult(
797 return self._makeerrorresult(
791 _('received frame on unknown inactive stream without '
798 _('received frame on unknown inactive stream without '
792 'beginning of stream flag set'))
799 'beginning of stream flag set'))
793
800
794 self._incomingstreams[frame.streamid] = stream(frame.streamid)
801 self._incomingstreams[frame.streamid] = stream(frame.streamid)
795
802
796 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
803 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
797 # TODO handle decoding frames
804 # TODO handle decoding frames
798 self._state = 'errored'
805 self._state = 'errored'
799 raise error.ProgrammingError('support for decoding stream payloads '
806 raise error.ProgrammingError('support for decoding stream payloads '
800 'not yet implemented')
807 'not yet implemented')
801
808
802 if frame.streamflags & STREAM_FLAG_END_STREAM:
809 if frame.streamflags & STREAM_FLAG_END_STREAM:
803 del self._incomingstreams[frame.streamid]
810 del self._incomingstreams[frame.streamid]
804
811
805 handlers = {
812 handlers = {
806 'initial': self._onframeinitial,
813 'initial': self._onframeinitial,
807 'protocol-settings-receiving': self._onframeprotocolsettings,
814 'protocol-settings-receiving': self._onframeprotocolsettings,
808 'idle': self._onframeidle,
815 'idle': self._onframeidle,
809 'command-receiving': self._onframecommandreceiving,
816 'command-receiving': self._onframecommandreceiving,
810 'errored': self._onframeerrored,
817 'errored': self._onframeerrored,
811 }
818 }
812
819
813 meth = handlers.get(self._state)
820 meth = handlers.get(self._state)
814 if not meth:
821 if not meth:
815 raise error.ProgrammingError('unhandled state: %s' % self._state)
822 raise error.ProgrammingError('unhandled state: %s' % self._state)
816
823
817 return meth(frame)
824 return meth(frame)
818
825
819 def oncommandresponseready(self, stream, requestid, data):
826 def oncommandresponseready(self, stream, requestid, data):
820 """Signal that a bytes response is ready to be sent to the client.
827 """Signal that a bytes response is ready to be sent to the client.
821
828
822 The raw bytes response is passed as an argument.
829 The raw bytes response is passed as an argument.
823 """
830 """
824 ensureserverstream(stream)
831 ensureserverstream(stream)
825
832
826 def sendframes():
833 def sendframes():
827 for frame in createcommandresponseframesfrombytes(stream, requestid,
834 for frame in createcommandresponseframesfrombytes(stream, requestid,
828 data):
835 data):
829 yield frame
836 yield frame
830
837
831 self._activecommands.remove(requestid)
838 self._activecommands.remove(requestid)
832
839
833 result = sendframes()
840 result = sendframes()
834
841
835 if self._deferoutput:
842 if self._deferoutput:
836 self._bufferedframegens.append(result)
843 self._bufferedframegens.append(result)
837 return 'noop', {}
844 return 'noop', {}
838 else:
845 else:
839 return 'sendframes', {
846 return 'sendframes', {
840 'framegen': result,
847 'framegen': result,
841 }
848 }
842
849
843 def oncommandresponsereadyobjects(self, stream, requestid, objs):
850 def oncommandresponsereadyobjects(self, stream, requestid, objs):
844 """Signal that objects are ready to be sent to the client.
851 """Signal that objects are ready to be sent to the client.
845
852
846 ``objs`` is an iterable of objects (typically a generator) that will
853 ``objs`` is an iterable of objects (typically a generator) that will
847 be encoded via CBOR and added to frames, which will be sent to the
854 be encoded via CBOR and added to frames, which will be sent to the
848 client.
855 client.
849 """
856 """
850 ensureserverstream(stream)
857 ensureserverstream(stream)
851
858
852 # We need to take care over exception handling. Uncaught exceptions
859 # We need to take care over exception handling. Uncaught exceptions
853 # when generating frames could lead to premature end of the frame
860 # when generating frames could lead to premature end of the frame
854 # stream and the possibility of the server or client process getting
861 # stream and the possibility of the server or client process getting
855 # in a bad state.
862 # in a bad state.
856 #
863 #
857 # Keep in mind that if ``objs`` is a generator, advancing it could
864 # Keep in mind that if ``objs`` is a generator, advancing it could
858 # raise exceptions that originated in e.g. wire protocol command
865 # raise exceptions that originated in e.g. wire protocol command
859 # functions. That is why we differentiate between exceptions raised
866 # functions. That is why we differentiate between exceptions raised
860 # when iterating versus other exceptions that occur.
867 # when iterating versus other exceptions that occur.
861 #
868 #
862 # In all cases, when the function finishes, the request is fully
869 # In all cases, when the function finishes, the request is fully
863 # handled and no new frames for it should be seen.
870 # handled and no new frames for it should be seen.
864
871
865 def sendframes():
872 def sendframes():
866 emitted = False
873 emitted = False
867 alternatelocationsent = False
874 alternatelocationsent = False
868 emitter = bufferingcommandresponseemitter(stream, requestid)
875 emitter = bufferingcommandresponseemitter(stream, requestid)
869 while True:
876 while True:
870 try:
877 try:
871 o = next(objs)
878 o = next(objs)
872 except StopIteration:
879 except StopIteration:
873 for frame in emitter.send(None):
880 for frame in emitter.send(None):
874 yield frame
881 yield frame
875
882
876 if emitted:
883 if emitted:
877 yield createcommandresponseeosframe(stream, requestid)
884 yield createcommandresponseeosframe(stream, requestid)
878 break
885 break
879
886
880 except error.WireprotoCommandError as e:
887 except error.WireprotoCommandError as e:
881 for frame in createcommanderrorresponse(
888 for frame in createcommanderrorresponse(
882 stream, requestid, e.message, e.messageargs):
889 stream, requestid, e.message, e.messageargs):
883 yield frame
890 yield frame
884 break
891 break
885
892
886 except Exception as e:
893 except Exception as e:
887 for frame in createerrorframe(
894 for frame in createerrorframe(
888 stream, requestid, '%s' % stringutil.forcebytestr(e),
895 stream, requestid, '%s' % stringutil.forcebytestr(e),
889 errtype='server'):
896 errtype='server'):
890
897
891 yield frame
898 yield frame
892
899
893 break
900 break
894
901
895 try:
902 try:
896 # Alternate location responses can only be the first and
903 # Alternate location responses can only be the first and
897 # only object in the output stream.
904 # only object in the output stream.
898 if isinstance(o, wireprototypes.alternatelocationresponse):
905 if isinstance(o, wireprototypes.alternatelocationresponse):
899 if emitted:
906 if emitted:
900 raise error.ProgrammingError(
907 raise error.ProgrammingError(
901 'alternatelocationresponse seen after initial '
908 'alternatelocationresponse seen after initial '
902 'output object')
909 'output object')
903
910
904 yield createalternatelocationresponseframe(
911 yield createalternatelocationresponseframe(
905 stream, requestid, o)
912 stream, requestid, o)
906
913
907 alternatelocationsent = True
914 alternatelocationsent = True
908 emitted = True
915 emitted = True
909 continue
916 continue
910
917
911 if alternatelocationsent:
918 if alternatelocationsent:
912 raise error.ProgrammingError(
919 raise error.ProgrammingError(
913 'object follows alternatelocationresponse')
920 'object follows alternatelocationresponse')
914
921
915 if not emitted:
922 if not emitted:
916 yield createcommandresponseokframe(stream, requestid)
923 yield createcommandresponseokframe(stream, requestid)
917 emitted = True
924 emitted = True
918
925
919 # Objects emitted by command functions can be serializable
926 # Objects emitted by command functions can be serializable
920 # data structures or special types.
927 # data structures or special types.
921 # TODO consider extracting the content normalization to a
928 # TODO consider extracting the content normalization to a
922 # standalone function, as it may be useful for e.g. cachers.
929 # standalone function, as it may be useful for e.g. cachers.
923
930
924 # A pre-encoded object is sent directly to the emitter.
931 # A pre-encoded object is sent directly to the emitter.
925 if isinstance(o, wireprototypes.encodedresponse):
932 if isinstance(o, wireprototypes.encodedresponse):
926 for frame in emitter.send(o.data):
933 for frame in emitter.send(o.data):
927 yield frame
934 yield frame
928
935
929 # A regular object is CBOR encoded.
936 # A regular object is CBOR encoded.
930 else:
937 else:
931 for chunk in cborutil.streamencode(o):
938 for chunk in cborutil.streamencode(o):
932 for frame in emitter.send(chunk):
939 for frame in emitter.send(chunk):
933 yield frame
940 yield frame
934
941
935 except Exception as e:
942 except Exception as e:
936 for frame in createerrorframe(stream, requestid,
943 for frame in createerrorframe(stream, requestid,
937 '%s' % e,
944 '%s' % e,
938 errtype='server'):
945 errtype='server'):
939 yield frame
946 yield frame
940
947
941 break
948 break
942
949
943 self._activecommands.remove(requestid)
950 self._activecommands.remove(requestid)
944
951
945 return self._handlesendframes(sendframes())
952 return self._handlesendframes(sendframes())
946
953
947 def oninputeof(self):
954 def oninputeof(self):
948 """Signals that end of input has been received.
955 """Signals that end of input has been received.
949
956
950 No more frames will be received. All pending activity should be
957 No more frames will be received. All pending activity should be
951 completed.
958 completed.
952 """
959 """
953 # TODO should we do anything about in-flight commands?
960 # TODO should we do anything about in-flight commands?
954
961
955 if not self._deferoutput or not self._bufferedframegens:
962 if not self._deferoutput or not self._bufferedframegens:
956 return 'noop', {}
963 return 'noop', {}
957
964
958 # If we buffered all our responses, emit those.
965 # If we buffered all our responses, emit those.
959 def makegen():
966 def makegen():
960 for gen in self._bufferedframegens:
967 for gen in self._bufferedframegens:
961 for frame in gen:
968 for frame in gen:
962 yield frame
969 yield frame
963
970
964 return 'sendframes', {
971 return 'sendframes', {
965 'framegen': makegen(),
972 'framegen': makegen(),
966 }
973 }
967
974
968 def _handlesendframes(self, framegen):
975 def _handlesendframes(self, framegen):
969 if self._deferoutput:
976 if self._deferoutput:
970 self._bufferedframegens.append(framegen)
977 self._bufferedframegens.append(framegen)
971 return 'noop', {}
978 return 'noop', {}
972 else:
979 else:
973 return 'sendframes', {
980 return 'sendframes', {
974 'framegen': framegen,
981 'framegen': framegen,
975 }
982 }
976
983
977 def onservererror(self, stream, requestid, msg):
984 def onservererror(self, stream, requestid, msg):
978 ensureserverstream(stream)
985 ensureserverstream(stream)
979
986
980 def sendframes():
987 def sendframes():
981 for frame in createerrorframe(stream, requestid, msg,
988 for frame in createerrorframe(stream, requestid, msg,
982 errtype='server'):
989 errtype='server'):
983 yield frame
990 yield frame
984
991
985 self._activecommands.remove(requestid)
992 self._activecommands.remove(requestid)
986
993
987 return self._handlesendframes(sendframes())
994 return self._handlesendframes(sendframes())
988
995
989 def oncommanderror(self, stream, requestid, message, args=None):
996 def oncommanderror(self, stream, requestid, message, args=None):
990 """Called when a command encountered an error before sending output."""
997 """Called when a command encountered an error before sending output."""
991 ensureserverstream(stream)
998 ensureserverstream(stream)
992
999
993 def sendframes():
1000 def sendframes():
994 for frame in createcommanderrorresponse(stream, requestid, message,
1001 for frame in createcommanderrorresponse(stream, requestid, message,
995 args):
1002 args):
996 yield frame
1003 yield frame
997
1004
998 self._activecommands.remove(requestid)
1005 self._activecommands.remove(requestid)
999
1006
1000 return self._handlesendframes(sendframes())
1007 return self._handlesendframes(sendframes())
1001
1008
1002 def makeoutputstream(self):
1009 def makeoutputstream(self):
1003 """Create a stream to be used for sending data to the client."""
1010 """Create a stream to be used for sending data to the client."""
1004 streamid = self._nextoutgoingstreamid
1011 streamid = self._nextoutgoingstreamid
1005 self._nextoutgoingstreamid += 2
1012 self._nextoutgoingstreamid += 2
1006
1013
1007 s = stream(streamid)
1014 s = stream(streamid)
1008 self._outgoingstreams[streamid] = s
1015 self._outgoingstreams[streamid] = s
1009
1016
1010 return s
1017 return s
1011
1018
1012 def _makeerrorresult(self, msg):
1019 def _makeerrorresult(self, msg):
1013 return 'error', {
1020 return 'error', {
1014 'message': msg,
1021 'message': msg,
1015 }
1022 }
1016
1023
1017 def _makeruncommandresult(self, requestid):
1024 def _makeruncommandresult(self, requestid):
1018 entry = self._receivingcommands[requestid]
1025 entry = self._receivingcommands[requestid]
1019
1026
1020 if not entry['requestdone']:
1027 if not entry['requestdone']:
1021 self._state = 'errored'
1028 self._state = 'errored'
1022 raise error.ProgrammingError('should not be called without '
1029 raise error.ProgrammingError('should not be called without '
1023 'requestdone set')
1030 'requestdone set')
1024
1031
1025 del self._receivingcommands[requestid]
1032 del self._receivingcommands[requestid]
1026
1033
1027 if self._receivingcommands:
1034 if self._receivingcommands:
1028 self._state = 'command-receiving'
1035 self._state = 'command-receiving'
1029 else:
1036 else:
1030 self._state = 'idle'
1037 self._state = 'idle'
1031
1038
1032 # Decode the payloads as CBOR.
1039 # Decode the payloads as CBOR.
1033 entry['payload'].seek(0)
1040 entry['payload'].seek(0)
1034 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1041 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1035
1042
1036 if b'name' not in request:
1043 if b'name' not in request:
1037 self._state = 'errored'
1044 self._state = 'errored'
1038 return self._makeerrorresult(
1045 return self._makeerrorresult(
1039 _('command request missing "name" field'))
1046 _('command request missing "name" field'))
1040
1047
1041 if b'args' not in request:
1048 if b'args' not in request:
1042 request[b'args'] = {}
1049 request[b'args'] = {}
1043
1050
1044 assert requestid not in self._activecommands
1051 assert requestid not in self._activecommands
1045 self._activecommands.add(requestid)
1052 self._activecommands.add(requestid)
1046
1053
1047 return 'runcommand', {
1054 return 'runcommand', {
1048 'requestid': requestid,
1055 'requestid': requestid,
1049 'command': request[b'name'],
1056 'command': request[b'name'],
1050 'args': request[b'args'],
1057 'args': request[b'args'],
1051 'redirect': request.get(b'redirect'),
1058 'redirect': request.get(b'redirect'),
1052 'data': entry['data'].getvalue() if entry['data'] else None,
1059 'data': entry['data'].getvalue() if entry['data'] else None,
1053 }
1060 }
1054
1061
1055 def _makewantframeresult(self):
1062 def _makewantframeresult(self):
1056 return 'wantframe', {
1063 return 'wantframe', {
1057 'state': self._state,
1064 'state': self._state,
1058 }
1065 }
1059
1066
1060 def _validatecommandrequestframe(self, frame):
1067 def _validatecommandrequestframe(self, frame):
1061 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1068 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1062 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1069 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1063
1070
1064 if new and continuation:
1071 if new and continuation:
1065 self._state = 'errored'
1072 self._state = 'errored'
1066 return self._makeerrorresult(
1073 return self._makeerrorresult(
1067 _('received command request frame with both new and '
1074 _('received command request frame with both new and '
1068 'continuation flags set'))
1075 'continuation flags set'))
1069
1076
1070 if not new and not continuation:
1077 if not new and not continuation:
1071 self._state = 'errored'
1078 self._state = 'errored'
1072 return self._makeerrorresult(
1079 return self._makeerrorresult(
1073 _('received command request frame with neither new nor '
1080 _('received command request frame with neither new nor '
1074 'continuation flags set'))
1081 'continuation flags set'))
1075
1082
1076 def _onframeinitial(self, frame):
1083 def _onframeinitial(self, frame):
1077 # Called when we receive a frame when in the "initial" state.
1084 # Called when we receive a frame when in the "initial" state.
1078 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1085 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1079 self._state = 'protocol-settings-receiving'
1086 self._state = 'protocol-settings-receiving'
1080 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1087 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1081 return self._onframeprotocolsettings(frame)
1088 return self._onframeprotocolsettings(frame)
1082
1089
1083 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1090 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1084 self._state = 'idle'
1091 self._state = 'idle'
1085 return self._onframeidle(frame)
1092 return self._onframeidle(frame)
1086
1093
1087 else:
1094 else:
1088 self._state = 'errored'
1095 self._state = 'errored'
1089 return self._makeerrorresult(
1096 return self._makeerrorresult(
1090 _('expected sender protocol settings or command request '
1097 _('expected sender protocol settings or command request '
1091 'frame; got %d') % frame.typeid)
1098 'frame; got %d') % frame.typeid)
1092
1099
1093 def _onframeprotocolsettings(self, frame):
1100 def _onframeprotocolsettings(self, frame):
1094 assert self._state == 'protocol-settings-receiving'
1101 assert self._state == 'protocol-settings-receiving'
1095 assert self._protocolsettingsdecoder is not None
1102 assert self._protocolsettingsdecoder is not None
1096
1103
1097 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1104 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1098 self._state = 'errored'
1105 self._state = 'errored'
1099 return self._makeerrorresult(
1106 return self._makeerrorresult(
1100 _('expected sender protocol settings frame; got %d') %
1107 _('expected sender protocol settings frame; got %d') %
1101 frame.typeid)
1108 frame.typeid)
1102
1109
1103 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1110 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1104 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1111 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1105
1112
1106 if more and eos:
1113 if more and eos:
1107 self._state = 'errored'
1114 self._state = 'errored'
1108 return self._makeerrorresult(
1115 return self._makeerrorresult(
1109 _('sender protocol settings frame cannot have both '
1116 _('sender protocol settings frame cannot have both '
1110 'continuation and end of stream flags set'))
1117 'continuation and end of stream flags set'))
1111
1118
1112 if not more and not eos:
1119 if not more and not eos:
1113 self._state = 'errored'
1120 self._state = 'errored'
1114 return self._makeerrorresult(
1121 return self._makeerrorresult(
1115 _('sender protocol settings frame must have continuation or '
1122 _('sender protocol settings frame must have continuation or '
1116 'end of stream flag set'))
1123 'end of stream flag set'))
1117
1124
1118 # TODO establish limits for maximum amount of data that can be
1125 # TODO establish limits for maximum amount of data that can be
1119 # buffered.
1126 # buffered.
1120 try:
1127 try:
1121 self._protocolsettingsdecoder.decode(frame.payload)
1128 self._protocolsettingsdecoder.decode(frame.payload)
1122 except Exception as e:
1129 except Exception as e:
1123 self._state = 'errored'
1130 self._state = 'errored'
1124 return self._makeerrorresult(
1131 return self._makeerrorresult(
1125 _('error decoding CBOR from sender protocol settings frame: %s')
1132 _('error decoding CBOR from sender protocol settings frame: %s')
1126 % stringutil.forcebytestr(e))
1133 % stringutil.forcebytestr(e))
1127
1134
1128 if more:
1135 if more:
1129 return self._makewantframeresult()
1136 return self._makewantframeresult()
1130
1137
1131 assert eos
1138 assert eos
1132
1139
1133 decoded = self._protocolsettingsdecoder.getavailable()
1140 decoded = self._protocolsettingsdecoder.getavailable()
1134 self._protocolsettingsdecoder = None
1141 self._protocolsettingsdecoder = None
1135
1142
1136 if not decoded:
1143 if not decoded:
1137 self._state = 'errored'
1144 self._state = 'errored'
1138 return self._makeerrorresult(
1145 return self._makeerrorresult(
1139 _('sender protocol settings frame did not contain CBOR data'))
1146 _('sender protocol settings frame did not contain CBOR data'))
1140 elif len(decoded) > 1:
1147 elif len(decoded) > 1:
1141 self._state = 'errored'
1148 self._state = 'errored'
1142 return self._makeerrorresult(
1149 return self._makeerrorresult(
1143 _('sender protocol settings frame contained multiple CBOR '
1150 _('sender protocol settings frame contained multiple CBOR '
1144 'values'))
1151 'values'))
1145
1152
1146 d = decoded[0]
1153 d = decoded[0]
1147
1154
1148 if b'contentencodings' in d:
1155 if b'contentencodings' in d:
1149 self._sendersettings['contentencodings'] = d[b'contentencodings']
1156 self._sendersettings['contentencodings'] = d[b'contentencodings']
1150
1157
1151 self._state = 'idle'
1158 self._state = 'idle'
1152
1159
1153 return self._makewantframeresult()
1160 return self._makewantframeresult()
1154
1161
1155 def _onframeidle(self, frame):
1162 def _onframeidle(self, frame):
1156 # The only frame type that should be received in this state is a
1163 # The only frame type that should be received in this state is a
1157 # command request.
1164 # command request.
1158 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1165 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1159 self._state = 'errored'
1166 self._state = 'errored'
1160 return self._makeerrorresult(
1167 return self._makeerrorresult(
1161 _('expected command request frame; got %d') % frame.typeid)
1168 _('expected command request frame; got %d') % frame.typeid)
1162
1169
1163 res = self._validatecommandrequestframe(frame)
1170 res = self._validatecommandrequestframe(frame)
1164 if res:
1171 if res:
1165 return res
1172 return res
1166
1173
1167 if frame.requestid in self._receivingcommands:
1174 if frame.requestid in self._receivingcommands:
1168 self._state = 'errored'
1175 self._state = 'errored'
1169 return self._makeerrorresult(
1176 return self._makeerrorresult(
1170 _('request with ID %d already received') % frame.requestid)
1177 _('request with ID %d already received') % frame.requestid)
1171
1178
1172 if frame.requestid in self._activecommands:
1179 if frame.requestid in self._activecommands:
1173 self._state = 'errored'
1180 self._state = 'errored'
1174 return self._makeerrorresult(
1181 return self._makeerrorresult(
1175 _('request with ID %d is already active') % frame.requestid)
1182 _('request with ID %d is already active') % frame.requestid)
1176
1183
1177 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1184 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1178 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1185 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1179 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1186 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1180
1187
1181 if not new:
1188 if not new:
1182 self._state = 'errored'
1189 self._state = 'errored'
1183 return self._makeerrorresult(
1190 return self._makeerrorresult(
1184 _('received command request frame without new flag set'))
1191 _('received command request frame without new flag set'))
1185
1192
1186 payload = util.bytesio()
1193 payload = util.bytesio()
1187 payload.write(frame.payload)
1194 payload.write(frame.payload)
1188
1195
1189 self._receivingcommands[frame.requestid] = {
1196 self._receivingcommands[frame.requestid] = {
1190 'payload': payload,
1197 'payload': payload,
1191 'data': None,
1198 'data': None,
1192 'requestdone': not moreframes,
1199 'requestdone': not moreframes,
1193 'expectingdata': bool(expectingdata),
1200 'expectingdata': bool(expectingdata),
1194 }
1201 }
1195
1202
1196 # This is the final frame for this request. Dispatch it.
1203 # This is the final frame for this request. Dispatch it.
1197 if not moreframes and not expectingdata:
1204 if not moreframes and not expectingdata:
1198 return self._makeruncommandresult(frame.requestid)
1205 return self._makeruncommandresult(frame.requestid)
1199
1206
1200 assert moreframes or expectingdata
1207 assert moreframes or expectingdata
1201 self._state = 'command-receiving'
1208 self._state = 'command-receiving'
1202 return self._makewantframeresult()
1209 return self._makewantframeresult()
1203
1210
1204 def _onframecommandreceiving(self, frame):
1211 def _onframecommandreceiving(self, frame):
1205 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1212 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1206 # Process new command requests as such.
1213 # Process new command requests as such.
1207 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1214 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1208 return self._onframeidle(frame)
1215 return self._onframeidle(frame)
1209
1216
1210 res = self._validatecommandrequestframe(frame)
1217 res = self._validatecommandrequestframe(frame)
1211 if res:
1218 if res:
1212 return res
1219 return res
1213
1220
1214 # All other frames should be related to a command that is currently
1221 # All other frames should be related to a command that is currently
1215 # receiving but is not active.
1222 # receiving but is not active.
1216 if frame.requestid in self._activecommands:
1223 if frame.requestid in self._activecommands:
1217 self._state = 'errored'
1224 self._state = 'errored'
1218 return self._makeerrorresult(
1225 return self._makeerrorresult(
1219 _('received frame for request that is still active: %d') %
1226 _('received frame for request that is still active: %d') %
1220 frame.requestid)
1227 frame.requestid)
1221
1228
1222 if frame.requestid not in self._receivingcommands:
1229 if frame.requestid not in self._receivingcommands:
1223 self._state = 'errored'
1230 self._state = 'errored'
1224 return self._makeerrorresult(
1231 return self._makeerrorresult(
1225 _('received frame for request that is not receiving: %d') %
1232 _('received frame for request that is not receiving: %d') %
1226 frame.requestid)
1233 frame.requestid)
1227
1234
1228 entry = self._receivingcommands[frame.requestid]
1235 entry = self._receivingcommands[frame.requestid]
1229
1236
1230 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1237 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1231 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1238 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1232 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1239 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1233
1240
1234 if entry['requestdone']:
1241 if entry['requestdone']:
1235 self._state = 'errored'
1242 self._state = 'errored'
1236 return self._makeerrorresult(
1243 return self._makeerrorresult(
1237 _('received command request frame when request frames '
1244 _('received command request frame when request frames '
1238 'were supposedly done'))
1245 'were supposedly done'))
1239
1246
1240 if expectingdata != entry['expectingdata']:
1247 if expectingdata != entry['expectingdata']:
1241 self._state = 'errored'
1248 self._state = 'errored'
1242 return self._makeerrorresult(
1249 return self._makeerrorresult(
1243 _('mismatch between expect data flag and previous frame'))
1250 _('mismatch between expect data flag and previous frame'))
1244
1251
1245 entry['payload'].write(frame.payload)
1252 entry['payload'].write(frame.payload)
1246
1253
1247 if not moreframes:
1254 if not moreframes:
1248 entry['requestdone'] = True
1255 entry['requestdone'] = True
1249
1256
1250 if not moreframes and not expectingdata:
1257 if not moreframes and not expectingdata:
1251 return self._makeruncommandresult(frame.requestid)
1258 return self._makeruncommandresult(frame.requestid)
1252
1259
1253 return self._makewantframeresult()
1260 return self._makewantframeresult()
1254
1261
1255 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1262 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1256 if not entry['expectingdata']:
1263 if not entry['expectingdata']:
1257 self._state = 'errored'
1264 self._state = 'errored'
1258 return self._makeerrorresult(_(
1265 return self._makeerrorresult(_(
1259 'received command data frame for request that is not '
1266 'received command data frame for request that is not '
1260 'expecting data: %d') % frame.requestid)
1267 'expecting data: %d') % frame.requestid)
1261
1268
1262 if entry['data'] is None:
1269 if entry['data'] is None:
1263 entry['data'] = util.bytesio()
1270 entry['data'] = util.bytesio()
1264
1271
1265 return self._handlecommanddataframe(frame, entry)
1272 return self._handlecommanddataframe(frame, entry)
1266 else:
1273 else:
1267 self._state = 'errored'
1274 self._state = 'errored'
1268 return self._makeerrorresult(_(
1275 return self._makeerrorresult(_(
1269 'received unexpected frame type: %d') % frame.typeid)
1276 'received unexpected frame type: %d') % frame.typeid)
1270
1277
1271 def _handlecommanddataframe(self, frame, entry):
1278 def _handlecommanddataframe(self, frame, entry):
1272 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1279 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1273
1280
1274 # TODO support streaming data instead of buffering it.
1281 # TODO support streaming data instead of buffering it.
1275 entry['data'].write(frame.payload)
1282 entry['data'].write(frame.payload)
1276
1283
1277 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1284 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1278 return self._makewantframeresult()
1285 return self._makewantframeresult()
1279 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1286 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1280 entry['data'].seek(0)
1287 entry['data'].seek(0)
1281 return self._makeruncommandresult(frame.requestid)
1288 return self._makeruncommandresult(frame.requestid)
1282 else:
1289 else:
1283 self._state = 'errored'
1290 self._state = 'errored'
1284 return self._makeerrorresult(_('command data frame without '
1291 return self._makeerrorresult(_('command data frame without '
1285 'flags'))
1292 'flags'))
1286
1293
1287 def _onframeerrored(self, frame):
1294 def _onframeerrored(self, frame):
1288 return self._makeerrorresult(_('server already errored'))
1295 return self._makeerrorresult(_('server already errored'))
1289
1296
1290 class commandrequest(object):
1297 class commandrequest(object):
1291 """Represents a request to run a command."""
1298 """Represents a request to run a command."""
1292
1299
1293 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1300 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1294 self.requestid = requestid
1301 self.requestid = requestid
1295 self.name = name
1302 self.name = name
1296 self.args = args
1303 self.args = args
1297 self.datafh = datafh
1304 self.datafh = datafh
1298 self.redirect = redirect
1305 self.redirect = redirect
1299 self.state = 'pending'
1306 self.state = 'pending'
1300
1307
1301 class clientreactor(object):
1308 class clientreactor(object):
1302 """Holds state of a client issuing frame-based protocol requests.
1309 """Holds state of a client issuing frame-based protocol requests.
1303
1310
1304 This is like ``serverreactor`` but for client-side state.
1311 This is like ``serverreactor`` but for client-side state.
1305
1312
1306 Each instance is bound to the lifetime of a connection. For persistent
1313 Each instance is bound to the lifetime of a connection. For persistent
1307 connection transports using e.g. TCP sockets and speaking the raw
1314 connection transports using e.g. TCP sockets and speaking the raw
1308 framing protocol, there will be a single instance for the lifetime of
1315 framing protocol, there will be a single instance for the lifetime of
1309 the TCP socket. For transports where there are multiple discrete
1316 the TCP socket. For transports where there are multiple discrete
1310 interactions (say tunneled within in HTTP request), there will be a
1317 interactions (say tunneled within in HTTP request), there will be a
1311 separate instance for each distinct interaction.
1318 separate instance for each distinct interaction.
1312
1319
1313 Consumers are expected to tell instances when events occur by calling
1320 Consumers are expected to tell instances when events occur by calling
1314 various methods. These methods return a 2-tuple describing any follow-up
1321 various methods. These methods return a 2-tuple describing any follow-up
1315 action(s) to take. The first element is the name of an action to
1322 action(s) to take. The first element is the name of an action to
1316 perform. The second is a data structure (usually a dict) specific to
1323 perform. The second is a data structure (usually a dict) specific to
1317 that action that contains more information. e.g. if the reactor wants
1324 that action that contains more information. e.g. if the reactor wants
1318 to send frames to the server, the data structure will contain a reference
1325 to send frames to the server, the data structure will contain a reference
1319 to those frames.
1326 to those frames.
1320
1327
1321 Valid actions that consumers can be instructed to take are:
1328 Valid actions that consumers can be instructed to take are:
1322
1329
1323 noop
1330 noop
1324 Indicates no additional action is required.
1331 Indicates no additional action is required.
1325
1332
1326 sendframes
1333 sendframes
1327 Indicates that frames should be sent to the server. The ``framegen``
1334 Indicates that frames should be sent to the server. The ``framegen``
1328 key contains a generator of frames that should be sent. The reactor
1335 key contains a generator of frames that should be sent. The reactor
1329 assumes that all frames in this generator are sent to the server.
1336 assumes that all frames in this generator are sent to the server.
1330
1337
1331 error
1338 error
1332 Indicates that an error occurred. The ``message`` key contains an
1339 Indicates that an error occurred. The ``message`` key contains an
1333 error message describing the failure.
1340 error message describing the failure.
1334
1341
1335 responsedata
1342 responsedata
1336 Indicates a response to a previously-issued command was received.
1343 Indicates a response to a previously-issued command was received.
1337
1344
1338 The ``request`` key contains the ``commandrequest`` instance that
1345 The ``request`` key contains the ``commandrequest`` instance that
1339 represents the request this data is for.
1346 represents the request this data is for.
1340
1347
1341 The ``data`` key contains the decoded data from the server.
1348 The ``data`` key contains the decoded data from the server.
1342
1349
1343 ``expectmore`` and ``eos`` evaluate to True when more response data
1350 ``expectmore`` and ``eos`` evaluate to True when more response data
1344 is expected to follow or we're at the end of the response stream,
1351 is expected to follow or we're at the end of the response stream,
1345 respectively.
1352 respectively.
1346 """
1353 """
1347 def __init__(self, hasmultiplesend=False, buffersends=True):
1354 def __init__(self, hasmultiplesend=False, buffersends=True):
1348 """Create a new instance.
1355 """Create a new instance.
1349
1356
1350 ``hasmultiplesend`` indicates whether multiple sends are supported
1357 ``hasmultiplesend`` indicates whether multiple sends are supported
1351 by the transport. When True, it is possible to send commands immediately
1358 by the transport. When True, it is possible to send commands immediately
1352 instead of buffering until the caller signals an intent to finish a
1359 instead of buffering until the caller signals an intent to finish a
1353 send operation.
1360 send operation.
1354
1361
1355 ``buffercommands`` indicates whether sends should be buffered until the
1362 ``buffercommands`` indicates whether sends should be buffered until the
1356 last request has been issued.
1363 last request has been issued.
1357 """
1364 """
1358 self._hasmultiplesend = hasmultiplesend
1365 self._hasmultiplesend = hasmultiplesend
1359 self._buffersends = buffersends
1366 self._buffersends = buffersends
1360
1367
1361 self._canissuecommands = True
1368 self._canissuecommands = True
1362 self._cansend = True
1369 self._cansend = True
1363
1370
1364 self._nextrequestid = 1
1371 self._nextrequestid = 1
1365 # We only support a single outgoing stream for now.
1372 # We only support a single outgoing stream for now.
1366 self._outgoingstream = stream(1)
1373 self._outgoingstream = stream(1)
1367 self._pendingrequests = collections.deque()
1374 self._pendingrequests = collections.deque()
1368 self._activerequests = {}
1375 self._activerequests = {}
1369 self._incomingstreams = {}
1376 self._incomingstreams = {}
1377 self._streamsettingsdecoders = {}
1370
1378
1371 def callcommand(self, name, args, datafh=None, redirect=None):
1379 def callcommand(self, name, args, datafh=None, redirect=None):
1372 """Request that a command be executed.
1380 """Request that a command be executed.
1373
1381
1374 Receives the command name, a dict of arguments to pass to the command,
1382 Receives the command name, a dict of arguments to pass to the command,
1375 and an optional file object containing the raw data for the command.
1383 and an optional file object containing the raw data for the command.
1376
1384
1377 Returns a 3-tuple of (request, action, action data).
1385 Returns a 3-tuple of (request, action, action data).
1378 """
1386 """
1379 if not self._canissuecommands:
1387 if not self._canissuecommands:
1380 raise error.ProgrammingError('cannot issue new commands')
1388 raise error.ProgrammingError('cannot issue new commands')
1381
1389
1382 requestid = self._nextrequestid
1390 requestid = self._nextrequestid
1383 self._nextrequestid += 2
1391 self._nextrequestid += 2
1384
1392
1385 request = commandrequest(requestid, name, args, datafh=datafh,
1393 request = commandrequest(requestid, name, args, datafh=datafh,
1386 redirect=redirect)
1394 redirect=redirect)
1387
1395
1388 if self._buffersends:
1396 if self._buffersends:
1389 self._pendingrequests.append(request)
1397 self._pendingrequests.append(request)
1390 return request, 'noop', {}
1398 return request, 'noop', {}
1391 else:
1399 else:
1392 if not self._cansend:
1400 if not self._cansend:
1393 raise error.ProgrammingError('sends cannot be performed on '
1401 raise error.ProgrammingError('sends cannot be performed on '
1394 'this instance')
1402 'this instance')
1395
1403
1396 if not self._hasmultiplesend:
1404 if not self._hasmultiplesend:
1397 self._cansend = False
1405 self._cansend = False
1398 self._canissuecommands = False
1406 self._canissuecommands = False
1399
1407
1400 return request, 'sendframes', {
1408 return request, 'sendframes', {
1401 'framegen': self._makecommandframes(request),
1409 'framegen': self._makecommandframes(request),
1402 }
1410 }
1403
1411
1404 def flushcommands(self):
1412 def flushcommands(self):
1405 """Request that all queued commands be sent.
1413 """Request that all queued commands be sent.
1406
1414
1407 If any commands are buffered, this will instruct the caller to send
1415 If any commands are buffered, this will instruct the caller to send
1408 them over the wire. If no commands are buffered it instructs the client
1416 them over the wire. If no commands are buffered it instructs the client
1409 to no-op.
1417 to no-op.
1410
1418
1411 If instances aren't configured for multiple sends, no new command
1419 If instances aren't configured for multiple sends, no new command
1412 requests are allowed after this is called.
1420 requests are allowed after this is called.
1413 """
1421 """
1414 if not self._pendingrequests:
1422 if not self._pendingrequests:
1415 return 'noop', {}
1423 return 'noop', {}
1416
1424
1417 if not self._cansend:
1425 if not self._cansend:
1418 raise error.ProgrammingError('sends cannot be performed on this '
1426 raise error.ProgrammingError('sends cannot be performed on this '
1419 'instance')
1427 'instance')
1420
1428
1421 # If the instance only allows sending once, mark that we have fired
1429 # If the instance only allows sending once, mark that we have fired
1422 # our one shot.
1430 # our one shot.
1423 if not self._hasmultiplesend:
1431 if not self._hasmultiplesend:
1424 self._canissuecommands = False
1432 self._canissuecommands = False
1425 self._cansend = False
1433 self._cansend = False
1426
1434
1427 def makeframes():
1435 def makeframes():
1428 while self._pendingrequests:
1436 while self._pendingrequests:
1429 request = self._pendingrequests.popleft()
1437 request = self._pendingrequests.popleft()
1430 for frame in self._makecommandframes(request):
1438 for frame in self._makecommandframes(request):
1431 yield frame
1439 yield frame
1432
1440
1433 return 'sendframes', {
1441 return 'sendframes', {
1434 'framegen': makeframes(),
1442 'framegen': makeframes(),
1435 }
1443 }
1436
1444
1437 def _makecommandframes(self, request):
1445 def _makecommandframes(self, request):
1438 """Emit frames to issue a command request.
1446 """Emit frames to issue a command request.
1439
1447
1440 As a side-effect, update request accounting to reflect its changed
1448 As a side-effect, update request accounting to reflect its changed
1441 state.
1449 state.
1442 """
1450 """
1443 self._activerequests[request.requestid] = request
1451 self._activerequests[request.requestid] = request
1444 request.state = 'sending'
1452 request.state = 'sending'
1445
1453
1446 res = createcommandframes(self._outgoingstream,
1454 res = createcommandframes(self._outgoingstream,
1447 request.requestid,
1455 request.requestid,
1448 request.name,
1456 request.name,
1449 request.args,
1457 request.args,
1450 datafh=request.datafh,
1458 datafh=request.datafh,
1451 redirect=request.redirect)
1459 redirect=request.redirect)
1452
1460
1453 for frame in res:
1461 for frame in res:
1454 yield frame
1462 yield frame
1455
1463
1456 request.state = 'sent'
1464 request.state = 'sent'
1457
1465
1458 def onframerecv(self, frame):
1466 def onframerecv(self, frame):
1459 """Process a frame that has been received off the wire.
1467 """Process a frame that has been received off the wire.
1460
1468
1461 Returns a 2-tuple of (action, meta) describing further action the
1469 Returns a 2-tuple of (action, meta) describing further action the
1462 caller needs to take as a result of receiving this frame.
1470 caller needs to take as a result of receiving this frame.
1463 """
1471 """
1464 if frame.streamid % 2:
1472 if frame.streamid % 2:
1465 return 'error', {
1473 return 'error', {
1466 'message': (
1474 'message': (
1467 _('received frame with odd numbered stream ID: %d') %
1475 _('received frame with odd numbered stream ID: %d') %
1468 frame.streamid),
1476 frame.streamid),
1469 }
1477 }
1470
1478
1471 if frame.streamid not in self._incomingstreams:
1479 if frame.streamid not in self._incomingstreams:
1472 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1480 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1473 return 'error', {
1481 return 'error', {
1474 'message': _('received frame on unknown stream '
1482 'message': _('received frame on unknown stream '
1475 'without beginning of stream flag set'),
1483 'without beginning of stream flag set'),
1476 }
1484 }
1477
1485
1478 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1486 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1479
1487
1480 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1488 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1481 raise error.ProgrammingError('support for decoding stream '
1489 raise error.ProgrammingError('support for decoding stream '
1482 'payloads not yet implemneted')
1490 'payloads not yet implemneted')
1483
1491
1484 if frame.streamflags & STREAM_FLAG_END_STREAM:
1492 if frame.streamflags & STREAM_FLAG_END_STREAM:
1485 del self._incomingstreams[frame.streamid]
1493 del self._incomingstreams[frame.streamid]
1486
1494
1495 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1496 return self._onstreamsettingsframe(frame)
1497
1487 if frame.requestid not in self._activerequests:
1498 if frame.requestid not in self._activerequests:
1488 return 'error', {
1499 return 'error', {
1489 'message': (_('received frame for inactive request ID: %d') %
1500 'message': (_('received frame for inactive request ID: %d') %
1490 frame.requestid),
1501 frame.requestid),
1491 }
1502 }
1492
1503
1493 request = self._activerequests[frame.requestid]
1504 request = self._activerequests[frame.requestid]
1494 request.state = 'receiving'
1505 request.state = 'receiving'
1495
1506
1496 handlers = {
1507 handlers = {
1497 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1508 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1498 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1509 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1499 }
1510 }
1500
1511
1501 meth = handlers.get(frame.typeid)
1512 meth = handlers.get(frame.typeid)
1502 if not meth:
1513 if not meth:
1503 raise error.ProgrammingError('unhandled frame type: %d' %
1514 raise error.ProgrammingError('unhandled frame type: %d' %
1504 frame.typeid)
1515 frame.typeid)
1505
1516
1506 return meth(request, frame)
1517 return meth(request, frame)
1507
1518
1519 def _onstreamsettingsframe(self, frame):
1520 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1521
1522 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1523 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1524
1525 if more and eos:
1526 return 'error', {
1527 'message': (_('stream encoding settings frame cannot have both '
1528 'continuation and end of stream flags set')),
1529 }
1530
1531 if not more and not eos:
1532 return 'error', {
1533 'message': _('stream encoding settings frame must have '
1534 'continuation or end of stream flag set'),
1535 }
1536
1537 if frame.streamid not in self._streamsettingsdecoders:
1538 decoder = cborutil.bufferingdecoder()
1539 self._streamsettingsdecoders[frame.streamid] = decoder
1540
1541 decoder = self._streamsettingsdecoders[frame.streamid]
1542
1543 try:
1544 decoder.decode(frame.payload)
1545 except Exception as e:
1546 return 'error', {
1547 'message': (_('error decoding CBOR from stream encoding '
1548 'settings frame: %s') %
1549 stringutil.forcebytestr(e)),
1550 }
1551
1552 if more:
1553 return 'noop', {}
1554
1555 assert eos
1556
1557 decoded = decoder.getavailable()
1558 del self._streamsettingsdecoders[frame.streamid]
1559
1560 if not decoded:
1561 return 'error', {
1562 'message': _('stream encoding settings frame did not contain '
1563 'CBOR data'),
1564 }
1565
1566 try:
1567 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1568 decoded[1:])
1569 except Exception as e:
1570 return 'error', {
1571 'message': (_('error setting stream decoder: %s') %
1572 stringutil.forcebytestr(e)),
1573 }
1574
1575 return 'noop', {}
1576
1508 def _oncommandresponseframe(self, request, frame):
1577 def _oncommandresponseframe(self, request, frame):
1509 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1578 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1510 request.state = 'received'
1579 request.state = 'received'
1511 del self._activerequests[request.requestid]
1580 del self._activerequests[request.requestid]
1512
1581
1513 return 'responsedata', {
1582 return 'responsedata', {
1514 'request': request,
1583 'request': request,
1515 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1584 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1516 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1585 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1517 'data': frame.payload,
1586 'data': frame.payload,
1518 }
1587 }
1519
1588
1520 def _onerrorresponseframe(self, request, frame):
1589 def _onerrorresponseframe(self, request, frame):
1521 request.state = 'errored'
1590 request.state = 'errored'
1522 del self._activerequests[request.requestid]
1591 del self._activerequests[request.requestid]
1523
1592
1524 # The payload should be a CBOR map.
1593 # The payload should be a CBOR map.
1525 m = cborutil.decodeall(frame.payload)[0]
1594 m = cborutil.decodeall(frame.payload)[0]
1526
1595
1527 return 'error', {
1596 return 'error', {
1528 'request': request,
1597 'request': request,
1529 'type': m['type'],
1598 'type': m['type'],
1530 'message': m['message'],
1599 'message': m['message'],
1531 }
1600 }
@@ -1,167 +1,284 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial import (
5 from mercurial import (
6 error,
6 error,
7 wireprotoframing as framing,
7 wireprotoframing as framing,
8 )
8 )
9 from mercurial.utils import (
10 cborutil,
11 )
9
12
10 ffs = framing.makeframefromhumanstring
13 ffs = framing.makeframefromhumanstring
11
14
12 def sendframe(reactor, frame):
15 def sendframe(reactor, frame):
13 """Send a frame bytearray to a reactor."""
16 """Send a frame bytearray to a reactor."""
14 header = framing.parseheader(frame)
17 header = framing.parseheader(frame)
15 payload = frame[framing.FRAME_HEADER_SIZE:]
18 payload = frame[framing.FRAME_HEADER_SIZE:]
16 assert len(payload) == header.length
19 assert len(payload) == header.length
17
20
18 return reactor.onframerecv(framing.frame(header.requestid,
21 return reactor.onframerecv(framing.frame(header.requestid,
19 header.streamid,
22 header.streamid,
20 header.streamflags,
23 header.streamflags,
21 header.typeid,
24 header.typeid,
22 header.flags,
25 header.flags,
23 payload))
26 payload))
24
27
25 class SingleSendTests(unittest.TestCase):
28 class SingleSendTests(unittest.TestCase):
26 """A reactor that can only send once rejects subsequent sends."""
29 """A reactor that can only send once rejects subsequent sends."""
27
30
28 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
31 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
29 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
32 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
30 # the regex version.
33 # the regex version.
31 assertRaisesRegex = (# camelcase-required
34 assertRaisesRegex = (# camelcase-required
32 unittest.TestCase.assertRaisesRegexp)
35 unittest.TestCase.assertRaisesRegexp)
33
36
34 def testbasic(self):
37 def testbasic(self):
35 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
38 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
36
39
37 request, action, meta = reactor.callcommand(b'foo', {})
40 request, action, meta = reactor.callcommand(b'foo', {})
38 self.assertEqual(request.state, b'pending')
41 self.assertEqual(request.state, b'pending')
39 self.assertEqual(action, b'noop')
42 self.assertEqual(action, b'noop')
40
43
41 action, meta = reactor.flushcommands()
44 action, meta = reactor.flushcommands()
42 self.assertEqual(action, b'sendframes')
45 self.assertEqual(action, b'sendframes')
43
46
44 for frame in meta[b'framegen']:
47 for frame in meta[b'framegen']:
45 self.assertEqual(request.state, b'sending')
48 self.assertEqual(request.state, b'sending')
46
49
47 self.assertEqual(request.state, b'sent')
50 self.assertEqual(request.state, b'sent')
48
51
49 with self.assertRaisesRegex(error.ProgrammingError,
52 with self.assertRaisesRegex(error.ProgrammingError,
50 'cannot issue new commands'):
53 'cannot issue new commands'):
51 reactor.callcommand(b'foo', {})
54 reactor.callcommand(b'foo', {})
52
55
53 with self.assertRaisesRegex(error.ProgrammingError,
56 with self.assertRaisesRegex(error.ProgrammingError,
54 'cannot issue new commands'):
57 'cannot issue new commands'):
55 reactor.callcommand(b'foo', {})
58 reactor.callcommand(b'foo', {})
56
59
57 class NoBufferTests(unittest.TestCase):
60 class NoBufferTests(unittest.TestCase):
58 """A reactor without send buffering sends requests immediately."""
61 """A reactor without send buffering sends requests immediately."""
59 def testbasic(self):
62 def testbasic(self):
60 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
63 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
61
64
62 request, action, meta = reactor.callcommand(b'command1', {})
65 request, action, meta = reactor.callcommand(b'command1', {})
63 self.assertEqual(request.requestid, 1)
66 self.assertEqual(request.requestid, 1)
64 self.assertEqual(action, b'sendframes')
67 self.assertEqual(action, b'sendframes')
65
68
66 self.assertEqual(request.state, b'pending')
69 self.assertEqual(request.state, b'pending')
67
70
68 for frame in meta[b'framegen']:
71 for frame in meta[b'framegen']:
69 self.assertEqual(request.state, b'sending')
72 self.assertEqual(request.state, b'sending')
70
73
71 self.assertEqual(request.state, b'sent')
74 self.assertEqual(request.state, b'sent')
72
75
73 action, meta = reactor.flushcommands()
76 action, meta = reactor.flushcommands()
74 self.assertEqual(action, b'noop')
77 self.assertEqual(action, b'noop')
75
78
76 # And we can send another command.
79 # And we can send another command.
77 request, action, meta = reactor.callcommand(b'command2', {})
80 request, action, meta = reactor.callcommand(b'command2', {})
78 self.assertEqual(request.requestid, 3)
81 self.assertEqual(request.requestid, 3)
79 self.assertEqual(action, b'sendframes')
82 self.assertEqual(action, b'sendframes')
80
83
81 for frame in meta[b'framegen']:
84 for frame in meta[b'framegen']:
82 self.assertEqual(request.state, b'sending')
85 self.assertEqual(request.state, b'sending')
83
86
84 self.assertEqual(request.state, b'sent')
87 self.assertEqual(request.state, b'sent')
85
88
86 class BadFrameRecvTests(unittest.TestCase):
89 class BadFrameRecvTests(unittest.TestCase):
87 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
90 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
88 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
91 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
89 # the regex version.
92 # the regex version.
90 assertRaisesRegex = (# camelcase-required
93 assertRaisesRegex = (# camelcase-required
91 unittest.TestCase.assertRaisesRegexp)
94 unittest.TestCase.assertRaisesRegexp)
92
95
93 def testoddstream(self):
96 def testoddstream(self):
94 reactor = framing.clientreactor()
97 reactor = framing.clientreactor()
95
98
96 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
99 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
97 self.assertEqual(action, b'error')
100 self.assertEqual(action, b'error')
98 self.assertEqual(meta[b'message'],
101 self.assertEqual(meta[b'message'],
99 b'received frame with odd numbered stream ID: 1')
102 b'received frame with odd numbered stream ID: 1')
100
103
101 def testunknownstream(self):
104 def testunknownstream(self):
102 reactor = framing.clientreactor()
105 reactor = framing.clientreactor()
103
106
104 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
107 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
105 self.assertEqual(action, b'error')
108 self.assertEqual(action, b'error')
106 self.assertEqual(meta[b'message'],
109 self.assertEqual(meta[b'message'],
107 b'received frame on unknown stream without beginning '
110 b'received frame on unknown stream without beginning '
108 b'of stream flag set')
111 b'of stream flag set')
109
112
110 def testunhandledframetype(self):
113 def testunhandledframetype(self):
111 reactor = framing.clientreactor(buffersends=False)
114 reactor = framing.clientreactor(buffersends=False)
112
115
113 request, action, meta = reactor.callcommand(b'foo', {})
116 request, action, meta = reactor.callcommand(b'foo', {})
114 for frame in meta[b'framegen']:
117 for frame in meta[b'framegen']:
115 pass
118 pass
116
119
117 with self.assertRaisesRegex(error.ProgrammingError,
120 with self.assertRaisesRegex(error.ProgrammingError,
118 'unhandled frame type'):
121 'unhandled frame type'):
119 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
122 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
120
123
121 class StreamTests(unittest.TestCase):
124 class StreamTests(unittest.TestCase):
122 def testmultipleresponseframes(self):
125 def testmultipleresponseframes(self):
123 reactor = framing.clientreactor(buffersends=False)
126 reactor = framing.clientreactor(buffersends=False)
124
127
125 request, action, meta = reactor.callcommand(b'foo', {})
128 request, action, meta = reactor.callcommand(b'foo', {})
126
129
127 self.assertEqual(action, b'sendframes')
130 self.assertEqual(action, b'sendframes')
128 for f in meta[b'framegen']:
131 for f in meta[b'framegen']:
129 pass
132 pass
130
133
131 action, meta = sendframe(
134 action, meta = sendframe(
132 reactor,
135 reactor,
133 ffs(b'%d 0 stream-begin command-response 0 foo' %
136 ffs(b'%d 0 stream-begin command-response 0 foo' %
134 request.requestid))
137 request.requestid))
135 self.assertEqual(action, b'responsedata')
138 self.assertEqual(action, b'responsedata')
136
139
137 action, meta = sendframe(
140 action, meta = sendframe(
138 reactor,
141 reactor,
139 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
142 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
140 self.assertEqual(action, b'responsedata')
143 self.assertEqual(action, b'responsedata')
141
144
142 class RedirectTests(unittest.TestCase):
145 class RedirectTests(unittest.TestCase):
143 def testredirect(self):
146 def testredirect(self):
144 reactor = framing.clientreactor(buffersends=False)
147 reactor = framing.clientreactor(buffersends=False)
145
148
146 redirect = {
149 redirect = {
147 b'targets': [b'a', b'b'],
150 b'targets': [b'a', b'b'],
148 b'hashes': [b'sha256'],
151 b'hashes': [b'sha256'],
149 }
152 }
150
153
151 request, action, meta = reactor.callcommand(
154 request, action, meta = reactor.callcommand(
152 b'foo', {}, redirect=redirect)
155 b'foo', {}, redirect=redirect)
153
156
154 self.assertEqual(action, b'sendframes')
157 self.assertEqual(action, b'sendframes')
155
158
156 frames = list(meta[b'framegen'])
159 frames = list(meta[b'framegen'])
157 self.assertEqual(len(frames), 1)
160 self.assertEqual(len(frames), 1)
158
161
159 self.assertEqual(frames[0],
162 self.assertEqual(frames[0],
160 ffs(b'1 1 stream-begin command-request new '
163 ffs(b'1 1 stream-begin command-request new '
161 b"cbor:{b'name': b'foo', "
164 b"cbor:{b'name': b'foo', "
162 b"b'redirect': {b'targets': [b'a', b'b'], "
165 b"b'redirect': {b'targets': [b'a', b'b'], "
163 b"b'hashes': [b'sha256']}}"))
166 b"b'hashes': [b'sha256']}}"))
164
167
168 class StreamSettingsTests(unittest.TestCase):
169 def testnoflags(self):
170 reactor = framing.clientreactor(buffersends=False)
171
172 request, action, meta = reactor.callcommand(b'foo', {})
173 for f in meta[b'framegen']:
174 pass
175
176 action, meta = sendframe(reactor,
177 ffs(b'1 2 stream-begin stream-settings 0 '))
178
179 self.assertEqual(action, b'error')
180 self.assertEqual(meta, {
181 b'message': b'stream encoding settings frame must have '
182 b'continuation or end of stream flag set',
183 })
184
185 def testconflictflags(self):
186 reactor = framing.clientreactor(buffersends=False)
187
188 request, action, meta = reactor.callcommand(b'foo', {})
189 for f in meta[b'framegen']:
190 pass
191
192 action, meta = sendframe(reactor,
193 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
194
195 self.assertEqual(action, b'error')
196 self.assertEqual(meta, {
197 b'message': b'stream encoding settings frame cannot have both '
198 b'continuation and end of stream flags set',
199 })
200
201 def testemptypayload(self):
202 reactor = framing.clientreactor(buffersends=False)
203
204 request, action, meta = reactor.callcommand(b'foo', {})
205 for f in meta[b'framegen']:
206 pass
207
208 action, meta = sendframe(reactor,
209 ffs(b'1 2 stream-begin stream-settings eos '))
210
211 self.assertEqual(action, b'error')
212 self.assertEqual(meta, {
213 b'message': b'stream encoding settings frame did not contain '
214 b'CBOR data'
215 })
216
217 def testbadcbor(self):
218 reactor = framing.clientreactor(buffersends=False)
219
220 request, action, meta = reactor.callcommand(b'foo', {})
221 for f in meta[b'framegen']:
222 pass
223
224 action, meta = sendframe(reactor,
225 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
226
227 self.assertEqual(action, b'error')
228
229 def testsingleobject(self):
230 reactor = framing.clientreactor(buffersends=False)
231
232 request, action, meta = reactor.callcommand(b'foo', {})
233 for f in meta[b'framegen']:
234 pass
235
236 action, meta = sendframe(reactor,
237 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
238
239 self.assertEqual(action, b'noop')
240 self.assertEqual(meta, {})
241
242 def testmultipleobjects(self):
243 reactor = framing.clientreactor(buffersends=False)
244
245 request, action, meta = reactor.callcommand(b'foo', {})
246 for f in meta[b'framegen']:
247 pass
248
249 data = b''.join([
250 b''.join(cborutil.streamencode(b'identity')),
251 b''.join(cborutil.streamencode({b'foo', b'bar'})),
252 ])
253
254 action, meta = sendframe(reactor,
255 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
256
257 self.assertEqual(action, b'noop')
258 self.assertEqual(meta, {})
259
260 def testmultipleframes(self):
261 reactor = framing.clientreactor(buffersends=False)
262
263 request, action, meta = reactor.callcommand(b'foo', {})
264 for f in meta[b'framegen']:
265 pass
266
267 data = b''.join(cborutil.streamencode(b'identity'))
268
269 action, meta = sendframe(reactor,
270 ffs(b'1 2 stream-begin stream-settings continuation %s' %
271 data[0:3]))
272
273 self.assertEqual(action, b'noop')
274 self.assertEqual(meta, {})
275
276 action, meta = sendframe(reactor,
277 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
278
279 self.assertEqual(action, b'noop')
280 self.assertEqual(meta, {})
281
165 if __name__ == '__main__':
282 if __name__ == '__main__':
166 import silenttestrunner
283 import silenttestrunner
167 silenttestrunner.main(__name__)
284 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now