##// END OF EJS Templates
wireprotov2: establish dedicated classes for input and output streams...
Gregory Szorc -
r40166:5d44c4d1 default
parent child Browse files
Show More
@@ -1,1602 +1,1609 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
371
371
372 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
373 """
373 """
374 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
378
378
379 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
384 flags=flags,
385 payload=overall + data)
385 payload=overall + data)
386 return
386 return
387
387
388 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
393 payload=overall)
394
394
395 offset = 0
395 offset = 0
396 while True:
396 while True:
397 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
398 offset += len(chunk)
399 done = offset == len(data)
399 done = offset == len(data)
400
400
401 if done:
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
405
406 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
408 flags=flags,
409 payload=chunk)
409 payload=chunk)
410
410
411 if done:
411 if done:
412 break
412 break
413
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
417
417
418 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
420 flag.
421 """
421 """
422 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
424
425 while True:
425 while True:
426 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
427 if not chunk:
427 if not chunk:
428 break
428 break
429
429
430 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
432 flags=flags,
433 payload=chunk)
433 payload=chunk)
434
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
436
437 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
439
440 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
443 payload=overall)
444
444
445 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
450 payload=b'')
451
451
452 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
453 data = {
454 b'status': b'redirect',
454 b'status': b'redirect',
455 b'location': {
455 b'location': {
456 b'url': location.url,
456 b'url': location.url,
457 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
458 }
458 }
459 }
459 }
460
460
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
462 r'servercadercerts'):
463 value = getattr(location, a)
463 value = getattr(location, a)
464 if value is not None:
464 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
466
466
467 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
471
471
472 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
474 # formatting works consistently?
475 m = {
475 m = {
476 b'status': b'error',
476 b'status': b'error',
477 b'error': {
477 b'error': {
478 b'message': message,
478 b'message': message,
479 }
479 }
480 }
480 }
481
481
482 if args:
482 if args:
483 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
484
484
485 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
486
486
487 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
490 payload=overall)
491
491
492 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
495
496 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
497 b'type': errtype,
498 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
499 }))
499 }))
500
500
501 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
503 flags=0,
504 payload=payload)
504 payload=payload)
505
505
506 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
509
509
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
511
512 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
516 """
516 """
517 atomdicts = []
517 atomdicts = []
518
518
519 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
521
521
522 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
524 for arg in args:
524 for arg in args:
525 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
527 for label in labels:
527 for label in labels:
528 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
530
530
531 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
533
534 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
536
537 # Labels must be ASCII.
537 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
539 for l in labels]
540
540
541 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
542 if args:
542 if args:
543 atom[b'args'] = args
543 atom[b'args'] = args
544 if labels:
544 if labels:
545 atom[b'labels'] = labels
545 atom[b'labels'] = labels
546
546
547 atomdicts.append(atom)
547 atomdicts.append(atom)
548
548
549 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
550
550
551 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
553
553
554 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
556 flags=0,
557 payload=payload)
557 payload=payload)
558
558
559 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
561
561
562 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
565
565
566 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
568 level.
568 level.
569 """
569 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
571 self._stream = stream
572 self._requestid = requestid
572 self._requestid = requestid
573 self._maxsize = maxframesize
573 self._maxsize = maxframesize
574 self._chunks = []
574 self._chunks = []
575 self._chunkssize = 0
575 self._chunkssize = 0
576
576
577 def send(self, data):
577 def send(self, data):
578 """Send new data for emission.
578 """Send new data for emission.
579
579
580 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
581
581
582 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
583 data to frames.
583 data to frames.
584 """
584 """
585
585
586 if data is None:
586 if data is None:
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589 return
589 return
590
590
591 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
594 # a simple implementation:
595 #
595 #
596 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
597 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
599 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
601 # and then buffer the new chunk.
602
602
603 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
604 for frame in self._flush():
604 for frame in self._flush():
605 yield frame
605 yield frame
606
606
607 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
608 offset = 0
608 offset = 0
609 while True:
609 while True:
610 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
611 offset += len(chunk)
612
612
613 yield self._stream.makeframe(
613 yield self._stream.makeframe(
614 self._requestid,
614 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
617 payload=chunk)
618
618
619 if offset == len(data):
619 if offset == len(data):
620 return
620 return
621
621
622 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
623 # return.
623 # return.
624 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
625 self._chunks.append(data)
626 self._chunkssize += len(data)
626 self._chunkssize += len(data)
627 return
627 return
628
628
629 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
631 # keep things simple for now.
632 for frame in self._flush():
632 for frame in self._flush():
633 yield frame
633 yield frame
634
634
635 self._chunks.append(data)
635 self._chunks.append(data)
636 self._chunkssize = len(data)
636 self._chunkssize = len(data)
637
637
638 def _flush(self):
638 def _flush(self):
639 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
641
641
642 self._chunks[:] = []
642 self._chunks[:] = []
643 self._chunkssize = 0
643 self._chunkssize = 0
644
644
645 yield self._stream.makeframe(
645 yield self._stream.makeframe(
646 self._requestid,
646 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 class stream(object):
651 class stream(object):
652 """Represents a logical unidirectional series of frames."""
652 """Represents a logical unidirectional series of frames."""
653
653
654 def __init__(self, streamid, active=False):
654 def __init__(self, streamid, active=False):
655 self.streamid = streamid
655 self.streamid = streamid
656 self._active = active
656 self._active = active
657
657
658 def makeframe(self, requestid, typeid, flags, payload):
658 def makeframe(self, requestid, typeid, flags, payload):
659 """Create a frame to be sent out over this stream.
659 """Create a frame to be sent out over this stream.
660
660
661 Only returns the frame instance. Does not actually send it.
661 Only returns the frame instance. Does not actually send it.
662 """
662 """
663 streamflags = 0
663 streamflags = 0
664 if not self._active:
664 if not self._active:
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 self._active = True
666 self._active = True
667
667
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 payload)
669 payload)
670
670
671 class inputstream(stream):
672 """Represents a stream used for receiving data."""
673
671 def setdecoder(self, name, extraobjs):
674 def setdecoder(self, name, extraobjs):
672 """Set the decoder for this stream.
675 """Set the decoder for this stream.
673
676
674 Receives the stream profile name and any additional CBOR objects
677 Receives the stream profile name and any additional CBOR objects
675 decoded from the stream encoding settings frame payloads.
678 decoded from the stream encoding settings frame payloads.
676 """
679 """
677
680
681 class outputstream(stream):
682 """Represents a stream used for sending data."""
683
678 def ensureserverstream(stream):
684 def ensureserverstream(stream):
679 if stream.streamid % 2:
685 if stream.streamid % 2:
680 raise error.ProgrammingError('server should only write to even '
686 raise error.ProgrammingError('server should only write to even '
681 'numbered streams; %d is not even' %
687 'numbered streams; %d is not even' %
682 stream.streamid)
688 stream.streamid)
683
689
684 DEFAULT_PROTOCOL_SETTINGS = {
690 DEFAULT_PROTOCOL_SETTINGS = {
685 'contentencodings': [b'identity'],
691 'contentencodings': [b'identity'],
686 }
692 }
687
693
688 class serverreactor(object):
694 class serverreactor(object):
689 """Holds state of a server handling frame-based protocol requests.
695 """Holds state of a server handling frame-based protocol requests.
690
696
691 This class is the "brain" of the unified frame-based protocol server
697 This class is the "brain" of the unified frame-based protocol server
692 component. While the protocol is stateless from the perspective of
698 component. While the protocol is stateless from the perspective of
693 requests/commands, something needs to track which frames have been
699 requests/commands, something needs to track which frames have been
694 received, what frames to expect, etc. This class is that thing.
700 received, what frames to expect, etc. This class is that thing.
695
701
696 Instances are modeled as a state machine of sorts. Instances are also
702 Instances are modeled as a state machine of sorts. Instances are also
697 reactionary to external events. The point of this class is to encapsulate
703 reactionary to external events. The point of this class is to encapsulate
698 the state of the connection and the exchange of frames, not to perform
704 the state of the connection and the exchange of frames, not to perform
699 work. Instead, callers tell this class when something occurs, like a
705 work. Instead, callers tell this class when something occurs, like a
700 frame arriving. If that activity is worthy of a follow-up action (say
706 frame arriving. If that activity is worthy of a follow-up action (say
701 *run a command*), the return value of that handler will say so.
707 *run a command*), the return value of that handler will say so.
702
708
703 I/O and CPU intensive operations are purposefully delegated outside of
709 I/O and CPU intensive operations are purposefully delegated outside of
704 this class.
710 this class.
705
711
706 Consumers are expected to tell instances when events occur. They do so by
712 Consumers are expected to tell instances when events occur. They do so by
707 calling the various ``on*`` methods. These methods return a 2-tuple
713 calling the various ``on*`` methods. These methods return a 2-tuple
708 describing any follow-up action(s) to take. The first element is the
714 describing any follow-up action(s) to take. The first element is the
709 name of an action to perform. The second is a data structure (usually
715 name of an action to perform. The second is a data structure (usually
710 a dict) specific to that action that contains more information. e.g.
716 a dict) specific to that action that contains more information. e.g.
711 if the server wants to send frames back to the client, the data structure
717 if the server wants to send frames back to the client, the data structure
712 will contain a reference to those frames.
718 will contain a reference to those frames.
713
719
714 Valid actions that consumers can be instructed to take are:
720 Valid actions that consumers can be instructed to take are:
715
721
716 sendframes
722 sendframes
717 Indicates that frames should be sent to the client. The ``framegen``
723 Indicates that frames should be sent to the client. The ``framegen``
718 key contains a generator of frames that should be sent. The server
724 key contains a generator of frames that should be sent. The server
719 assumes that all frames are sent to the client.
725 assumes that all frames are sent to the client.
720
726
721 error
727 error
722 Indicates that an error occurred. Consumer should probably abort.
728 Indicates that an error occurred. Consumer should probably abort.
723
729
724 runcommand
730 runcommand
725 Indicates that the consumer should run a wire protocol command. Details
731 Indicates that the consumer should run a wire protocol command. Details
726 of the command to run are given in the data structure.
732 of the command to run are given in the data structure.
727
733
728 wantframe
734 wantframe
729 Indicates that nothing of interest happened and the server is waiting on
735 Indicates that nothing of interest happened and the server is waiting on
730 more frames from the client before anything interesting can be done.
736 more frames from the client before anything interesting can be done.
731
737
732 noop
738 noop
733 Indicates no additional action is required.
739 Indicates no additional action is required.
734
740
735 Known Issues
741 Known Issues
736 ------------
742 ------------
737
743
738 There are no limits to the number of partially received commands or their
744 There are no limits to the number of partially received commands or their
739 size. A malicious client could stream command request data and exhaust the
745 size. A malicious client could stream command request data and exhaust the
740 server's memory.
746 server's memory.
741
747
742 Partially received commands are not acted upon when end of input is
748 Partially received commands are not acted upon when end of input is
743 reached. Should the server error if it receives a partial request?
749 reached. Should the server error if it receives a partial request?
744 Should the client send a message to abort a partially transmitted request
750 Should the client send a message to abort a partially transmitted request
745 to facilitate graceful shutdown?
751 to facilitate graceful shutdown?
746
752
747 Active requests that haven't been responded to aren't tracked. This means
753 Active requests that haven't been responded to aren't tracked. This means
748 that if we receive a command and instruct its dispatch, another command
754 that if we receive a command and instruct its dispatch, another command
749 with its request ID can come in over the wire and there will be a race
755 with its request ID can come in over the wire and there will be a race
750 between who responds to what.
756 between who responds to what.
751 """
757 """
752
758
753 def __init__(self, ui, deferoutput=False):
759 def __init__(self, ui, deferoutput=False):
754 """Construct a new server reactor.
760 """Construct a new server reactor.
755
761
756 ``deferoutput`` can be used to indicate that no output frames should be
762 ``deferoutput`` can be used to indicate that no output frames should be
757 instructed to be sent until input has been exhausted. In this mode,
763 instructed to be sent until input has been exhausted. In this mode,
758 events that would normally generate output frames (such as a command
764 events that would normally generate output frames (such as a command
759 response being ready) will instead defer instructing the consumer to
765 response being ready) will instead defer instructing the consumer to
760 send those frames. This is useful for half-duplex transports where the
766 send those frames. This is useful for half-duplex transports where the
761 sender cannot receive until all data has been transmitted.
767 sender cannot receive until all data has been transmitted.
762 """
768 """
763 self._ui = ui
769 self._ui = ui
764 self._deferoutput = deferoutput
770 self._deferoutput = deferoutput
765 self._state = 'initial'
771 self._state = 'initial'
766 self._nextoutgoingstreamid = 2
772 self._nextoutgoingstreamid = 2
767 self._bufferedframegens = []
773 self._bufferedframegens = []
768 # stream id -> stream instance for all active streams from the client.
774 # stream id -> stream instance for all active streams from the client.
769 self._incomingstreams = {}
775 self._incomingstreams = {}
770 self._outgoingstreams = {}
776 self._outgoingstreams = {}
771 # request id -> dict of commands that are actively being received.
777 # request id -> dict of commands that are actively being received.
772 self._receivingcommands = {}
778 self._receivingcommands = {}
773 # Request IDs that have been received and are actively being processed.
779 # Request IDs that have been received and are actively being processed.
774 # Once all output for a request has been sent, it is removed from this
780 # Once all output for a request has been sent, it is removed from this
775 # set.
781 # set.
776 self._activecommands = set()
782 self._activecommands = set()
777
783
778 self._protocolsettingsdecoder = None
784 self._protocolsettingsdecoder = None
779
785
780 # Sender protocol settings are optional. Set implied default values.
786 # Sender protocol settings are optional. Set implied default values.
781 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
787 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
782
788
783 def onframerecv(self, frame):
789 def onframerecv(self, frame):
784 """Process a frame that has been received off the wire.
790 """Process a frame that has been received off the wire.
785
791
786 Returns a dict with an ``action`` key that details what action,
792 Returns a dict with an ``action`` key that details what action,
787 if any, the consumer should take next.
793 if any, the consumer should take next.
788 """
794 """
789 if not frame.streamid % 2:
795 if not frame.streamid % 2:
790 self._state = 'errored'
796 self._state = 'errored'
791 return self._makeerrorresult(
797 return self._makeerrorresult(
792 _('received frame with even numbered stream ID: %d') %
798 _('received frame with even numbered stream ID: %d') %
793 frame.streamid)
799 frame.streamid)
794
800
795 if frame.streamid not in self._incomingstreams:
801 if frame.streamid not in self._incomingstreams:
796 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
802 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
797 self._state = 'errored'
803 self._state = 'errored'
798 return self._makeerrorresult(
804 return self._makeerrorresult(
799 _('received frame on unknown inactive stream without '
805 _('received frame on unknown inactive stream without '
800 'beginning of stream flag set'))
806 'beginning of stream flag set'))
801
807
802 self._incomingstreams[frame.streamid] = stream(frame.streamid)
808 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
803
809
804 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
810 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
805 # TODO handle decoding frames
811 # TODO handle decoding frames
806 self._state = 'errored'
812 self._state = 'errored'
807 raise error.ProgrammingError('support for decoding stream payloads '
813 raise error.ProgrammingError('support for decoding stream payloads '
808 'not yet implemented')
814 'not yet implemented')
809
815
810 if frame.streamflags & STREAM_FLAG_END_STREAM:
816 if frame.streamflags & STREAM_FLAG_END_STREAM:
811 del self._incomingstreams[frame.streamid]
817 del self._incomingstreams[frame.streamid]
812
818
813 handlers = {
819 handlers = {
814 'initial': self._onframeinitial,
820 'initial': self._onframeinitial,
815 'protocol-settings-receiving': self._onframeprotocolsettings,
821 'protocol-settings-receiving': self._onframeprotocolsettings,
816 'idle': self._onframeidle,
822 'idle': self._onframeidle,
817 'command-receiving': self._onframecommandreceiving,
823 'command-receiving': self._onframecommandreceiving,
818 'errored': self._onframeerrored,
824 'errored': self._onframeerrored,
819 }
825 }
820
826
821 meth = handlers.get(self._state)
827 meth = handlers.get(self._state)
822 if not meth:
828 if not meth:
823 raise error.ProgrammingError('unhandled state: %s' % self._state)
829 raise error.ProgrammingError('unhandled state: %s' % self._state)
824
830
825 return meth(frame)
831 return meth(frame)
826
832
827 def oncommandresponseready(self, stream, requestid, data):
833 def oncommandresponseready(self, stream, requestid, data):
828 """Signal that a bytes response is ready to be sent to the client.
834 """Signal that a bytes response is ready to be sent to the client.
829
835
830 The raw bytes response is passed as an argument.
836 The raw bytes response is passed as an argument.
831 """
837 """
832 ensureserverstream(stream)
838 ensureserverstream(stream)
833
839
834 def sendframes():
840 def sendframes():
835 for frame in createcommandresponseframesfrombytes(stream, requestid,
841 for frame in createcommandresponseframesfrombytes(stream, requestid,
836 data):
842 data):
837 yield frame
843 yield frame
838
844
839 self._activecommands.remove(requestid)
845 self._activecommands.remove(requestid)
840
846
841 result = sendframes()
847 result = sendframes()
842
848
843 if self._deferoutput:
849 if self._deferoutput:
844 self._bufferedframegens.append(result)
850 self._bufferedframegens.append(result)
845 return 'noop', {}
851 return 'noop', {}
846 else:
852 else:
847 return 'sendframes', {
853 return 'sendframes', {
848 'framegen': result,
854 'framegen': result,
849 }
855 }
850
856
851 def oncommandresponsereadyobjects(self, stream, requestid, objs):
857 def oncommandresponsereadyobjects(self, stream, requestid, objs):
852 """Signal that objects are ready to be sent to the client.
858 """Signal that objects are ready to be sent to the client.
853
859
854 ``objs`` is an iterable of objects (typically a generator) that will
860 ``objs`` is an iterable of objects (typically a generator) that will
855 be encoded via CBOR and added to frames, which will be sent to the
861 be encoded via CBOR and added to frames, which will be sent to the
856 client.
862 client.
857 """
863 """
858 ensureserverstream(stream)
864 ensureserverstream(stream)
859
865
860 # We need to take care over exception handling. Uncaught exceptions
866 # We need to take care over exception handling. Uncaught exceptions
861 # when generating frames could lead to premature end of the frame
867 # when generating frames could lead to premature end of the frame
862 # stream and the possibility of the server or client process getting
868 # stream and the possibility of the server or client process getting
863 # in a bad state.
869 # in a bad state.
864 #
870 #
865 # Keep in mind that if ``objs`` is a generator, advancing it could
871 # Keep in mind that if ``objs`` is a generator, advancing it could
866 # raise exceptions that originated in e.g. wire protocol command
872 # raise exceptions that originated in e.g. wire protocol command
867 # functions. That is why we differentiate between exceptions raised
873 # functions. That is why we differentiate between exceptions raised
868 # when iterating versus other exceptions that occur.
874 # when iterating versus other exceptions that occur.
869 #
875 #
870 # In all cases, when the function finishes, the request is fully
876 # In all cases, when the function finishes, the request is fully
871 # handled and no new frames for it should be seen.
877 # handled and no new frames for it should be seen.
872
878
873 def sendframes():
879 def sendframes():
874 emitted = False
880 emitted = False
875 alternatelocationsent = False
881 alternatelocationsent = False
876 emitter = bufferingcommandresponseemitter(stream, requestid)
882 emitter = bufferingcommandresponseemitter(stream, requestid)
877 while True:
883 while True:
878 try:
884 try:
879 o = next(objs)
885 o = next(objs)
880 except StopIteration:
886 except StopIteration:
881 for frame in emitter.send(None):
887 for frame in emitter.send(None):
882 yield frame
888 yield frame
883
889
884 if emitted:
890 if emitted:
885 yield createcommandresponseeosframe(stream, requestid)
891 yield createcommandresponseeosframe(stream, requestid)
886 break
892 break
887
893
888 except error.WireprotoCommandError as e:
894 except error.WireprotoCommandError as e:
889 for frame in createcommanderrorresponse(
895 for frame in createcommanderrorresponse(
890 stream, requestid, e.message, e.messageargs):
896 stream, requestid, e.message, e.messageargs):
891 yield frame
897 yield frame
892 break
898 break
893
899
894 except Exception as e:
900 except Exception as e:
895 for frame in createerrorframe(
901 for frame in createerrorframe(
896 stream, requestid, '%s' % stringutil.forcebytestr(e),
902 stream, requestid, '%s' % stringutil.forcebytestr(e),
897 errtype='server'):
903 errtype='server'):
898
904
899 yield frame
905 yield frame
900
906
901 break
907 break
902
908
903 try:
909 try:
904 # Alternate location responses can only be the first and
910 # Alternate location responses can only be the first and
905 # only object in the output stream.
911 # only object in the output stream.
906 if isinstance(o, wireprototypes.alternatelocationresponse):
912 if isinstance(o, wireprototypes.alternatelocationresponse):
907 if emitted:
913 if emitted:
908 raise error.ProgrammingError(
914 raise error.ProgrammingError(
909 'alternatelocationresponse seen after initial '
915 'alternatelocationresponse seen after initial '
910 'output object')
916 'output object')
911
917
912 yield createalternatelocationresponseframe(
918 yield createalternatelocationresponseframe(
913 stream, requestid, o)
919 stream, requestid, o)
914
920
915 alternatelocationsent = True
921 alternatelocationsent = True
916 emitted = True
922 emitted = True
917 continue
923 continue
918
924
919 if alternatelocationsent:
925 if alternatelocationsent:
920 raise error.ProgrammingError(
926 raise error.ProgrammingError(
921 'object follows alternatelocationresponse')
927 'object follows alternatelocationresponse')
922
928
923 if not emitted:
929 if not emitted:
924 yield createcommandresponseokframe(stream, requestid)
930 yield createcommandresponseokframe(stream, requestid)
925 emitted = True
931 emitted = True
926
932
927 # Objects emitted by command functions can be serializable
933 # Objects emitted by command functions can be serializable
928 # data structures or special types.
934 # data structures or special types.
929 # TODO consider extracting the content normalization to a
935 # TODO consider extracting the content normalization to a
930 # standalone function, as it may be useful for e.g. cachers.
936 # standalone function, as it may be useful for e.g. cachers.
931
937
932 # A pre-encoded object is sent directly to the emitter.
938 # A pre-encoded object is sent directly to the emitter.
933 if isinstance(o, wireprototypes.encodedresponse):
939 if isinstance(o, wireprototypes.encodedresponse):
934 for frame in emitter.send(o.data):
940 for frame in emitter.send(o.data):
935 yield frame
941 yield frame
936
942
937 # A regular object is CBOR encoded.
943 # A regular object is CBOR encoded.
938 else:
944 else:
939 for chunk in cborutil.streamencode(o):
945 for chunk in cborutil.streamencode(o):
940 for frame in emitter.send(chunk):
946 for frame in emitter.send(chunk):
941 yield frame
947 yield frame
942
948
943 except Exception as e:
949 except Exception as e:
944 for frame in createerrorframe(stream, requestid,
950 for frame in createerrorframe(stream, requestid,
945 '%s' % e,
951 '%s' % e,
946 errtype='server'):
952 errtype='server'):
947 yield frame
953 yield frame
948
954
949 break
955 break
950
956
951 self._activecommands.remove(requestid)
957 self._activecommands.remove(requestid)
952
958
953 return self._handlesendframes(sendframes())
959 return self._handlesendframes(sendframes())
954
960
955 def oninputeof(self):
961 def oninputeof(self):
956 """Signals that end of input has been received.
962 """Signals that end of input has been received.
957
963
958 No more frames will be received. All pending activity should be
964 No more frames will be received. All pending activity should be
959 completed.
965 completed.
960 """
966 """
961 # TODO should we do anything about in-flight commands?
967 # TODO should we do anything about in-flight commands?
962
968
963 if not self._deferoutput or not self._bufferedframegens:
969 if not self._deferoutput or not self._bufferedframegens:
964 return 'noop', {}
970 return 'noop', {}
965
971
966 # If we buffered all our responses, emit those.
972 # If we buffered all our responses, emit those.
967 def makegen():
973 def makegen():
968 for gen in self._bufferedframegens:
974 for gen in self._bufferedframegens:
969 for frame in gen:
975 for frame in gen:
970 yield frame
976 yield frame
971
977
972 return 'sendframes', {
978 return 'sendframes', {
973 'framegen': makegen(),
979 'framegen': makegen(),
974 }
980 }
975
981
976 def _handlesendframes(self, framegen):
982 def _handlesendframes(self, framegen):
977 if self._deferoutput:
983 if self._deferoutput:
978 self._bufferedframegens.append(framegen)
984 self._bufferedframegens.append(framegen)
979 return 'noop', {}
985 return 'noop', {}
980 else:
986 else:
981 return 'sendframes', {
987 return 'sendframes', {
982 'framegen': framegen,
988 'framegen': framegen,
983 }
989 }
984
990
985 def onservererror(self, stream, requestid, msg):
991 def onservererror(self, stream, requestid, msg):
986 ensureserverstream(stream)
992 ensureserverstream(stream)
987
993
988 def sendframes():
994 def sendframes():
989 for frame in createerrorframe(stream, requestid, msg,
995 for frame in createerrorframe(stream, requestid, msg,
990 errtype='server'):
996 errtype='server'):
991 yield frame
997 yield frame
992
998
993 self._activecommands.remove(requestid)
999 self._activecommands.remove(requestid)
994
1000
995 return self._handlesendframes(sendframes())
1001 return self._handlesendframes(sendframes())
996
1002
997 def oncommanderror(self, stream, requestid, message, args=None):
1003 def oncommanderror(self, stream, requestid, message, args=None):
998 """Called when a command encountered an error before sending output."""
1004 """Called when a command encountered an error before sending output."""
999 ensureserverstream(stream)
1005 ensureserverstream(stream)
1000
1006
1001 def sendframes():
1007 def sendframes():
1002 for frame in createcommanderrorresponse(stream, requestid, message,
1008 for frame in createcommanderrorresponse(stream, requestid, message,
1003 args):
1009 args):
1004 yield frame
1010 yield frame
1005
1011
1006 self._activecommands.remove(requestid)
1012 self._activecommands.remove(requestid)
1007
1013
1008 return self._handlesendframes(sendframes())
1014 return self._handlesendframes(sendframes())
1009
1015
1010 def makeoutputstream(self):
1016 def makeoutputstream(self):
1011 """Create a stream to be used for sending data to the client."""
1017 """Create a stream to be used for sending data to the client."""
1012 streamid = self._nextoutgoingstreamid
1018 streamid = self._nextoutgoingstreamid
1013 self._nextoutgoingstreamid += 2
1019 self._nextoutgoingstreamid += 2
1014
1020
1015 s = stream(streamid)
1021 s = outputstream(streamid)
1016 self._outgoingstreams[streamid] = s
1022 self._outgoingstreams[streamid] = s
1017
1023
1018 return s
1024 return s
1019
1025
1020 def _makeerrorresult(self, msg):
1026 def _makeerrorresult(self, msg):
1021 return 'error', {
1027 return 'error', {
1022 'message': msg,
1028 'message': msg,
1023 }
1029 }
1024
1030
1025 def _makeruncommandresult(self, requestid):
1031 def _makeruncommandresult(self, requestid):
1026 entry = self._receivingcommands[requestid]
1032 entry = self._receivingcommands[requestid]
1027
1033
1028 if not entry['requestdone']:
1034 if not entry['requestdone']:
1029 self._state = 'errored'
1035 self._state = 'errored'
1030 raise error.ProgrammingError('should not be called without '
1036 raise error.ProgrammingError('should not be called without '
1031 'requestdone set')
1037 'requestdone set')
1032
1038
1033 del self._receivingcommands[requestid]
1039 del self._receivingcommands[requestid]
1034
1040
1035 if self._receivingcommands:
1041 if self._receivingcommands:
1036 self._state = 'command-receiving'
1042 self._state = 'command-receiving'
1037 else:
1043 else:
1038 self._state = 'idle'
1044 self._state = 'idle'
1039
1045
1040 # Decode the payloads as CBOR.
1046 # Decode the payloads as CBOR.
1041 entry['payload'].seek(0)
1047 entry['payload'].seek(0)
1042 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1048 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1043
1049
1044 if b'name' not in request:
1050 if b'name' not in request:
1045 self._state = 'errored'
1051 self._state = 'errored'
1046 return self._makeerrorresult(
1052 return self._makeerrorresult(
1047 _('command request missing "name" field'))
1053 _('command request missing "name" field'))
1048
1054
1049 if b'args' not in request:
1055 if b'args' not in request:
1050 request[b'args'] = {}
1056 request[b'args'] = {}
1051
1057
1052 assert requestid not in self._activecommands
1058 assert requestid not in self._activecommands
1053 self._activecommands.add(requestid)
1059 self._activecommands.add(requestid)
1054
1060
1055 return 'runcommand', {
1061 return 'runcommand', {
1056 'requestid': requestid,
1062 'requestid': requestid,
1057 'command': request[b'name'],
1063 'command': request[b'name'],
1058 'args': request[b'args'],
1064 'args': request[b'args'],
1059 'redirect': request.get(b'redirect'),
1065 'redirect': request.get(b'redirect'),
1060 'data': entry['data'].getvalue() if entry['data'] else None,
1066 'data': entry['data'].getvalue() if entry['data'] else None,
1061 }
1067 }
1062
1068
1063 def _makewantframeresult(self):
1069 def _makewantframeresult(self):
1064 return 'wantframe', {
1070 return 'wantframe', {
1065 'state': self._state,
1071 'state': self._state,
1066 }
1072 }
1067
1073
1068 def _validatecommandrequestframe(self, frame):
1074 def _validatecommandrequestframe(self, frame):
1069 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1075 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1070 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1076 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1071
1077
1072 if new and continuation:
1078 if new and continuation:
1073 self._state = 'errored'
1079 self._state = 'errored'
1074 return self._makeerrorresult(
1080 return self._makeerrorresult(
1075 _('received command request frame with both new and '
1081 _('received command request frame with both new and '
1076 'continuation flags set'))
1082 'continuation flags set'))
1077
1083
1078 if not new and not continuation:
1084 if not new and not continuation:
1079 self._state = 'errored'
1085 self._state = 'errored'
1080 return self._makeerrorresult(
1086 return self._makeerrorresult(
1081 _('received command request frame with neither new nor '
1087 _('received command request frame with neither new nor '
1082 'continuation flags set'))
1088 'continuation flags set'))
1083
1089
1084 def _onframeinitial(self, frame):
1090 def _onframeinitial(self, frame):
1085 # Called when we receive a frame when in the "initial" state.
1091 # Called when we receive a frame when in the "initial" state.
1086 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1092 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1087 self._state = 'protocol-settings-receiving'
1093 self._state = 'protocol-settings-receiving'
1088 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1094 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1089 return self._onframeprotocolsettings(frame)
1095 return self._onframeprotocolsettings(frame)
1090
1096
1091 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1097 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1092 self._state = 'idle'
1098 self._state = 'idle'
1093 return self._onframeidle(frame)
1099 return self._onframeidle(frame)
1094
1100
1095 else:
1101 else:
1096 self._state = 'errored'
1102 self._state = 'errored'
1097 return self._makeerrorresult(
1103 return self._makeerrorresult(
1098 _('expected sender protocol settings or command request '
1104 _('expected sender protocol settings or command request '
1099 'frame; got %d') % frame.typeid)
1105 'frame; got %d') % frame.typeid)
1100
1106
1101 def _onframeprotocolsettings(self, frame):
1107 def _onframeprotocolsettings(self, frame):
1102 assert self._state == 'protocol-settings-receiving'
1108 assert self._state == 'protocol-settings-receiving'
1103 assert self._protocolsettingsdecoder is not None
1109 assert self._protocolsettingsdecoder is not None
1104
1110
1105 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1111 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1106 self._state = 'errored'
1112 self._state = 'errored'
1107 return self._makeerrorresult(
1113 return self._makeerrorresult(
1108 _('expected sender protocol settings frame; got %d') %
1114 _('expected sender protocol settings frame; got %d') %
1109 frame.typeid)
1115 frame.typeid)
1110
1116
1111 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1117 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1112 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1118 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1113
1119
1114 if more and eos:
1120 if more and eos:
1115 self._state = 'errored'
1121 self._state = 'errored'
1116 return self._makeerrorresult(
1122 return self._makeerrorresult(
1117 _('sender protocol settings frame cannot have both '
1123 _('sender protocol settings frame cannot have both '
1118 'continuation and end of stream flags set'))
1124 'continuation and end of stream flags set'))
1119
1125
1120 if not more and not eos:
1126 if not more and not eos:
1121 self._state = 'errored'
1127 self._state = 'errored'
1122 return self._makeerrorresult(
1128 return self._makeerrorresult(
1123 _('sender protocol settings frame must have continuation or '
1129 _('sender protocol settings frame must have continuation or '
1124 'end of stream flag set'))
1130 'end of stream flag set'))
1125
1131
1126 # TODO establish limits for maximum amount of data that can be
1132 # TODO establish limits for maximum amount of data that can be
1127 # buffered.
1133 # buffered.
1128 try:
1134 try:
1129 self._protocolsettingsdecoder.decode(frame.payload)
1135 self._protocolsettingsdecoder.decode(frame.payload)
1130 except Exception as e:
1136 except Exception as e:
1131 self._state = 'errored'
1137 self._state = 'errored'
1132 return self._makeerrorresult(
1138 return self._makeerrorresult(
1133 _('error decoding CBOR from sender protocol settings frame: %s')
1139 _('error decoding CBOR from sender protocol settings frame: %s')
1134 % stringutil.forcebytestr(e))
1140 % stringutil.forcebytestr(e))
1135
1141
1136 if more:
1142 if more:
1137 return self._makewantframeresult()
1143 return self._makewantframeresult()
1138
1144
1139 assert eos
1145 assert eos
1140
1146
1141 decoded = self._protocolsettingsdecoder.getavailable()
1147 decoded = self._protocolsettingsdecoder.getavailable()
1142 self._protocolsettingsdecoder = None
1148 self._protocolsettingsdecoder = None
1143
1149
1144 if not decoded:
1150 if not decoded:
1145 self._state = 'errored'
1151 self._state = 'errored'
1146 return self._makeerrorresult(
1152 return self._makeerrorresult(
1147 _('sender protocol settings frame did not contain CBOR data'))
1153 _('sender protocol settings frame did not contain CBOR data'))
1148 elif len(decoded) > 1:
1154 elif len(decoded) > 1:
1149 self._state = 'errored'
1155 self._state = 'errored'
1150 return self._makeerrorresult(
1156 return self._makeerrorresult(
1151 _('sender protocol settings frame contained multiple CBOR '
1157 _('sender protocol settings frame contained multiple CBOR '
1152 'values'))
1158 'values'))
1153
1159
1154 d = decoded[0]
1160 d = decoded[0]
1155
1161
1156 if b'contentencodings' in d:
1162 if b'contentencodings' in d:
1157 self._sendersettings['contentencodings'] = d[b'contentencodings']
1163 self._sendersettings['contentencodings'] = d[b'contentencodings']
1158
1164
1159 self._state = 'idle'
1165 self._state = 'idle'
1160
1166
1161 return self._makewantframeresult()
1167 return self._makewantframeresult()
1162
1168
1163 def _onframeidle(self, frame):
1169 def _onframeidle(self, frame):
1164 # The only frame type that should be received in this state is a
1170 # The only frame type that should be received in this state is a
1165 # command request.
1171 # command request.
1166 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1172 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1167 self._state = 'errored'
1173 self._state = 'errored'
1168 return self._makeerrorresult(
1174 return self._makeerrorresult(
1169 _('expected command request frame; got %d') % frame.typeid)
1175 _('expected command request frame; got %d') % frame.typeid)
1170
1176
1171 res = self._validatecommandrequestframe(frame)
1177 res = self._validatecommandrequestframe(frame)
1172 if res:
1178 if res:
1173 return res
1179 return res
1174
1180
1175 if frame.requestid in self._receivingcommands:
1181 if frame.requestid in self._receivingcommands:
1176 self._state = 'errored'
1182 self._state = 'errored'
1177 return self._makeerrorresult(
1183 return self._makeerrorresult(
1178 _('request with ID %d already received') % frame.requestid)
1184 _('request with ID %d already received') % frame.requestid)
1179
1185
1180 if frame.requestid in self._activecommands:
1186 if frame.requestid in self._activecommands:
1181 self._state = 'errored'
1187 self._state = 'errored'
1182 return self._makeerrorresult(
1188 return self._makeerrorresult(
1183 _('request with ID %d is already active') % frame.requestid)
1189 _('request with ID %d is already active') % frame.requestid)
1184
1190
1185 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1191 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1186 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1192 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1187 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1193 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1188
1194
1189 if not new:
1195 if not new:
1190 self._state = 'errored'
1196 self._state = 'errored'
1191 return self._makeerrorresult(
1197 return self._makeerrorresult(
1192 _('received command request frame without new flag set'))
1198 _('received command request frame without new flag set'))
1193
1199
1194 payload = util.bytesio()
1200 payload = util.bytesio()
1195 payload.write(frame.payload)
1201 payload.write(frame.payload)
1196
1202
1197 self._receivingcommands[frame.requestid] = {
1203 self._receivingcommands[frame.requestid] = {
1198 'payload': payload,
1204 'payload': payload,
1199 'data': None,
1205 'data': None,
1200 'requestdone': not moreframes,
1206 'requestdone': not moreframes,
1201 'expectingdata': bool(expectingdata),
1207 'expectingdata': bool(expectingdata),
1202 }
1208 }
1203
1209
1204 # This is the final frame for this request. Dispatch it.
1210 # This is the final frame for this request. Dispatch it.
1205 if not moreframes and not expectingdata:
1211 if not moreframes and not expectingdata:
1206 return self._makeruncommandresult(frame.requestid)
1212 return self._makeruncommandresult(frame.requestid)
1207
1213
1208 assert moreframes or expectingdata
1214 assert moreframes or expectingdata
1209 self._state = 'command-receiving'
1215 self._state = 'command-receiving'
1210 return self._makewantframeresult()
1216 return self._makewantframeresult()
1211
1217
1212 def _onframecommandreceiving(self, frame):
1218 def _onframecommandreceiving(self, frame):
1213 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1219 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1214 # Process new command requests as such.
1220 # Process new command requests as such.
1215 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1221 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1216 return self._onframeidle(frame)
1222 return self._onframeidle(frame)
1217
1223
1218 res = self._validatecommandrequestframe(frame)
1224 res = self._validatecommandrequestframe(frame)
1219 if res:
1225 if res:
1220 return res
1226 return res
1221
1227
1222 # All other frames should be related to a command that is currently
1228 # All other frames should be related to a command that is currently
1223 # receiving but is not active.
1229 # receiving but is not active.
1224 if frame.requestid in self._activecommands:
1230 if frame.requestid in self._activecommands:
1225 self._state = 'errored'
1231 self._state = 'errored'
1226 return self._makeerrorresult(
1232 return self._makeerrorresult(
1227 _('received frame for request that is still active: %d') %
1233 _('received frame for request that is still active: %d') %
1228 frame.requestid)
1234 frame.requestid)
1229
1235
1230 if frame.requestid not in self._receivingcommands:
1236 if frame.requestid not in self._receivingcommands:
1231 self._state = 'errored'
1237 self._state = 'errored'
1232 return self._makeerrorresult(
1238 return self._makeerrorresult(
1233 _('received frame for request that is not receiving: %d') %
1239 _('received frame for request that is not receiving: %d') %
1234 frame.requestid)
1240 frame.requestid)
1235
1241
1236 entry = self._receivingcommands[frame.requestid]
1242 entry = self._receivingcommands[frame.requestid]
1237
1243
1238 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1244 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1239 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1245 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1240 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1246 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1241
1247
1242 if entry['requestdone']:
1248 if entry['requestdone']:
1243 self._state = 'errored'
1249 self._state = 'errored'
1244 return self._makeerrorresult(
1250 return self._makeerrorresult(
1245 _('received command request frame when request frames '
1251 _('received command request frame when request frames '
1246 'were supposedly done'))
1252 'were supposedly done'))
1247
1253
1248 if expectingdata != entry['expectingdata']:
1254 if expectingdata != entry['expectingdata']:
1249 self._state = 'errored'
1255 self._state = 'errored'
1250 return self._makeerrorresult(
1256 return self._makeerrorresult(
1251 _('mismatch between expect data flag and previous frame'))
1257 _('mismatch between expect data flag and previous frame'))
1252
1258
1253 entry['payload'].write(frame.payload)
1259 entry['payload'].write(frame.payload)
1254
1260
1255 if not moreframes:
1261 if not moreframes:
1256 entry['requestdone'] = True
1262 entry['requestdone'] = True
1257
1263
1258 if not moreframes and not expectingdata:
1264 if not moreframes and not expectingdata:
1259 return self._makeruncommandresult(frame.requestid)
1265 return self._makeruncommandresult(frame.requestid)
1260
1266
1261 return self._makewantframeresult()
1267 return self._makewantframeresult()
1262
1268
1263 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1269 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1264 if not entry['expectingdata']:
1270 if not entry['expectingdata']:
1265 self._state = 'errored'
1271 self._state = 'errored'
1266 return self._makeerrorresult(_(
1272 return self._makeerrorresult(_(
1267 'received command data frame for request that is not '
1273 'received command data frame for request that is not '
1268 'expecting data: %d') % frame.requestid)
1274 'expecting data: %d') % frame.requestid)
1269
1275
1270 if entry['data'] is None:
1276 if entry['data'] is None:
1271 entry['data'] = util.bytesio()
1277 entry['data'] = util.bytesio()
1272
1278
1273 return self._handlecommanddataframe(frame, entry)
1279 return self._handlecommanddataframe(frame, entry)
1274 else:
1280 else:
1275 self._state = 'errored'
1281 self._state = 'errored'
1276 return self._makeerrorresult(_(
1282 return self._makeerrorresult(_(
1277 'received unexpected frame type: %d') % frame.typeid)
1283 'received unexpected frame type: %d') % frame.typeid)
1278
1284
1279 def _handlecommanddataframe(self, frame, entry):
1285 def _handlecommanddataframe(self, frame, entry):
1280 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1286 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1281
1287
1282 # TODO support streaming data instead of buffering it.
1288 # TODO support streaming data instead of buffering it.
1283 entry['data'].write(frame.payload)
1289 entry['data'].write(frame.payload)
1284
1290
1285 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1291 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1286 return self._makewantframeresult()
1292 return self._makewantframeresult()
1287 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1293 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1288 entry['data'].seek(0)
1294 entry['data'].seek(0)
1289 return self._makeruncommandresult(frame.requestid)
1295 return self._makeruncommandresult(frame.requestid)
1290 else:
1296 else:
1291 self._state = 'errored'
1297 self._state = 'errored'
1292 return self._makeerrorresult(_('command data frame without '
1298 return self._makeerrorresult(_('command data frame without '
1293 'flags'))
1299 'flags'))
1294
1300
1295 def _onframeerrored(self, frame):
1301 def _onframeerrored(self, frame):
1296 return self._makeerrorresult(_('server already errored'))
1302 return self._makeerrorresult(_('server already errored'))
1297
1303
1298 class commandrequest(object):
1304 class commandrequest(object):
1299 """Represents a request to run a command."""
1305 """Represents a request to run a command."""
1300
1306
1301 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1307 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1302 self.requestid = requestid
1308 self.requestid = requestid
1303 self.name = name
1309 self.name = name
1304 self.args = args
1310 self.args = args
1305 self.datafh = datafh
1311 self.datafh = datafh
1306 self.redirect = redirect
1312 self.redirect = redirect
1307 self.state = 'pending'
1313 self.state = 'pending'
1308
1314
1309 class clientreactor(object):
1315 class clientreactor(object):
1310 """Holds state of a client issuing frame-based protocol requests.
1316 """Holds state of a client issuing frame-based protocol requests.
1311
1317
1312 This is like ``serverreactor`` but for client-side state.
1318 This is like ``serverreactor`` but for client-side state.
1313
1319
1314 Each instance is bound to the lifetime of a connection. For persistent
1320 Each instance is bound to the lifetime of a connection. For persistent
1315 connection transports using e.g. TCP sockets and speaking the raw
1321 connection transports using e.g. TCP sockets and speaking the raw
1316 framing protocol, there will be a single instance for the lifetime of
1322 framing protocol, there will be a single instance for the lifetime of
1317 the TCP socket. For transports where there are multiple discrete
1323 the TCP socket. For transports where there are multiple discrete
1318 interactions (say tunneled within in HTTP request), there will be a
1324 interactions (say tunneled within in HTTP request), there will be a
1319 separate instance for each distinct interaction.
1325 separate instance for each distinct interaction.
1320
1326
1321 Consumers are expected to tell instances when events occur by calling
1327 Consumers are expected to tell instances when events occur by calling
1322 various methods. These methods return a 2-tuple describing any follow-up
1328 various methods. These methods return a 2-tuple describing any follow-up
1323 action(s) to take. The first element is the name of an action to
1329 action(s) to take. The first element is the name of an action to
1324 perform. The second is a data structure (usually a dict) specific to
1330 perform. The second is a data structure (usually a dict) specific to
1325 that action that contains more information. e.g. if the reactor wants
1331 that action that contains more information. e.g. if the reactor wants
1326 to send frames to the server, the data structure will contain a reference
1332 to send frames to the server, the data structure will contain a reference
1327 to those frames.
1333 to those frames.
1328
1334
1329 Valid actions that consumers can be instructed to take are:
1335 Valid actions that consumers can be instructed to take are:
1330
1336
1331 noop
1337 noop
1332 Indicates no additional action is required.
1338 Indicates no additional action is required.
1333
1339
1334 sendframes
1340 sendframes
1335 Indicates that frames should be sent to the server. The ``framegen``
1341 Indicates that frames should be sent to the server. The ``framegen``
1336 key contains a generator of frames that should be sent. The reactor
1342 key contains a generator of frames that should be sent. The reactor
1337 assumes that all frames in this generator are sent to the server.
1343 assumes that all frames in this generator are sent to the server.
1338
1344
1339 error
1345 error
1340 Indicates that an error occurred. The ``message`` key contains an
1346 Indicates that an error occurred. The ``message`` key contains an
1341 error message describing the failure.
1347 error message describing the failure.
1342
1348
1343 responsedata
1349 responsedata
1344 Indicates a response to a previously-issued command was received.
1350 Indicates a response to a previously-issued command was received.
1345
1351
1346 The ``request`` key contains the ``commandrequest`` instance that
1352 The ``request`` key contains the ``commandrequest`` instance that
1347 represents the request this data is for.
1353 represents the request this data is for.
1348
1354
1349 The ``data`` key contains the decoded data from the server.
1355 The ``data`` key contains the decoded data from the server.
1350
1356
1351 ``expectmore`` and ``eos`` evaluate to True when more response data
1357 ``expectmore`` and ``eos`` evaluate to True when more response data
1352 is expected to follow or we're at the end of the response stream,
1358 is expected to follow or we're at the end of the response stream,
1353 respectively.
1359 respectively.
1354 """
1360 """
1355 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1361 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1356 """Create a new instance.
1362 """Create a new instance.
1357
1363
1358 ``hasmultiplesend`` indicates whether multiple sends are supported
1364 ``hasmultiplesend`` indicates whether multiple sends are supported
1359 by the transport. When True, it is possible to send commands immediately
1365 by the transport. When True, it is possible to send commands immediately
1360 instead of buffering until the caller signals an intent to finish a
1366 instead of buffering until the caller signals an intent to finish a
1361 send operation.
1367 send operation.
1362
1368
1363 ``buffercommands`` indicates whether sends should be buffered until the
1369 ``buffercommands`` indicates whether sends should be buffered until the
1364 last request has been issued.
1370 last request has been issued.
1365 """
1371 """
1366 self._ui = ui
1372 self._ui = ui
1367 self._hasmultiplesend = hasmultiplesend
1373 self._hasmultiplesend = hasmultiplesend
1368 self._buffersends = buffersends
1374 self._buffersends = buffersends
1369
1375
1370 self._canissuecommands = True
1376 self._canissuecommands = True
1371 self._cansend = True
1377 self._cansend = True
1372
1378
1373 self._nextrequestid = 1
1379 self._nextrequestid = 1
1374 # We only support a single outgoing stream for now.
1380 # We only support a single outgoing stream for now.
1375 self._outgoingstream = stream(1)
1381 self._outgoingstream = outputstream(1)
1376 self._pendingrequests = collections.deque()
1382 self._pendingrequests = collections.deque()
1377 self._activerequests = {}
1383 self._activerequests = {}
1378 self._incomingstreams = {}
1384 self._incomingstreams = {}
1379 self._streamsettingsdecoders = {}
1385 self._streamsettingsdecoders = {}
1380
1386
1381 def callcommand(self, name, args, datafh=None, redirect=None):
1387 def callcommand(self, name, args, datafh=None, redirect=None):
1382 """Request that a command be executed.
1388 """Request that a command be executed.
1383
1389
1384 Receives the command name, a dict of arguments to pass to the command,
1390 Receives the command name, a dict of arguments to pass to the command,
1385 and an optional file object containing the raw data for the command.
1391 and an optional file object containing the raw data for the command.
1386
1392
1387 Returns a 3-tuple of (request, action, action data).
1393 Returns a 3-tuple of (request, action, action data).
1388 """
1394 """
1389 if not self._canissuecommands:
1395 if not self._canissuecommands:
1390 raise error.ProgrammingError('cannot issue new commands')
1396 raise error.ProgrammingError('cannot issue new commands')
1391
1397
1392 requestid = self._nextrequestid
1398 requestid = self._nextrequestid
1393 self._nextrequestid += 2
1399 self._nextrequestid += 2
1394
1400
1395 request = commandrequest(requestid, name, args, datafh=datafh,
1401 request = commandrequest(requestid, name, args, datafh=datafh,
1396 redirect=redirect)
1402 redirect=redirect)
1397
1403
1398 if self._buffersends:
1404 if self._buffersends:
1399 self._pendingrequests.append(request)
1405 self._pendingrequests.append(request)
1400 return request, 'noop', {}
1406 return request, 'noop', {}
1401 else:
1407 else:
1402 if not self._cansend:
1408 if not self._cansend:
1403 raise error.ProgrammingError('sends cannot be performed on '
1409 raise error.ProgrammingError('sends cannot be performed on '
1404 'this instance')
1410 'this instance')
1405
1411
1406 if not self._hasmultiplesend:
1412 if not self._hasmultiplesend:
1407 self._cansend = False
1413 self._cansend = False
1408 self._canissuecommands = False
1414 self._canissuecommands = False
1409
1415
1410 return request, 'sendframes', {
1416 return request, 'sendframes', {
1411 'framegen': self._makecommandframes(request),
1417 'framegen': self._makecommandframes(request),
1412 }
1418 }
1413
1419
1414 def flushcommands(self):
1420 def flushcommands(self):
1415 """Request that all queued commands be sent.
1421 """Request that all queued commands be sent.
1416
1422
1417 If any commands are buffered, this will instruct the caller to send
1423 If any commands are buffered, this will instruct the caller to send
1418 them over the wire. If no commands are buffered it instructs the client
1424 them over the wire. If no commands are buffered it instructs the client
1419 to no-op.
1425 to no-op.
1420
1426
1421 If instances aren't configured for multiple sends, no new command
1427 If instances aren't configured for multiple sends, no new command
1422 requests are allowed after this is called.
1428 requests are allowed after this is called.
1423 """
1429 """
1424 if not self._pendingrequests:
1430 if not self._pendingrequests:
1425 return 'noop', {}
1431 return 'noop', {}
1426
1432
1427 if not self._cansend:
1433 if not self._cansend:
1428 raise error.ProgrammingError('sends cannot be performed on this '
1434 raise error.ProgrammingError('sends cannot be performed on this '
1429 'instance')
1435 'instance')
1430
1436
1431 # If the instance only allows sending once, mark that we have fired
1437 # If the instance only allows sending once, mark that we have fired
1432 # our one shot.
1438 # our one shot.
1433 if not self._hasmultiplesend:
1439 if not self._hasmultiplesend:
1434 self._canissuecommands = False
1440 self._canissuecommands = False
1435 self._cansend = False
1441 self._cansend = False
1436
1442
1437 def makeframes():
1443 def makeframes():
1438 while self._pendingrequests:
1444 while self._pendingrequests:
1439 request = self._pendingrequests.popleft()
1445 request = self._pendingrequests.popleft()
1440 for frame in self._makecommandframes(request):
1446 for frame in self._makecommandframes(request):
1441 yield frame
1447 yield frame
1442
1448
1443 return 'sendframes', {
1449 return 'sendframes', {
1444 'framegen': makeframes(),
1450 'framegen': makeframes(),
1445 }
1451 }
1446
1452
1447 def _makecommandframes(self, request):
1453 def _makecommandframes(self, request):
1448 """Emit frames to issue a command request.
1454 """Emit frames to issue a command request.
1449
1455
1450 As a side-effect, update request accounting to reflect its changed
1456 As a side-effect, update request accounting to reflect its changed
1451 state.
1457 state.
1452 """
1458 """
1453 self._activerequests[request.requestid] = request
1459 self._activerequests[request.requestid] = request
1454 request.state = 'sending'
1460 request.state = 'sending'
1455
1461
1456 res = createcommandframes(self._outgoingstream,
1462 res = createcommandframes(self._outgoingstream,
1457 request.requestid,
1463 request.requestid,
1458 request.name,
1464 request.name,
1459 request.args,
1465 request.args,
1460 datafh=request.datafh,
1466 datafh=request.datafh,
1461 redirect=request.redirect)
1467 redirect=request.redirect)
1462
1468
1463 for frame in res:
1469 for frame in res:
1464 yield frame
1470 yield frame
1465
1471
1466 request.state = 'sent'
1472 request.state = 'sent'
1467
1473
1468 def onframerecv(self, frame):
1474 def onframerecv(self, frame):
1469 """Process a frame that has been received off the wire.
1475 """Process a frame that has been received off the wire.
1470
1476
1471 Returns a 2-tuple of (action, meta) describing further action the
1477 Returns a 2-tuple of (action, meta) describing further action the
1472 caller needs to take as a result of receiving this frame.
1478 caller needs to take as a result of receiving this frame.
1473 """
1479 """
1474 if frame.streamid % 2:
1480 if frame.streamid % 2:
1475 return 'error', {
1481 return 'error', {
1476 'message': (
1482 'message': (
1477 _('received frame with odd numbered stream ID: %d') %
1483 _('received frame with odd numbered stream ID: %d') %
1478 frame.streamid),
1484 frame.streamid),
1479 }
1485 }
1480
1486
1481 if frame.streamid not in self._incomingstreams:
1487 if frame.streamid not in self._incomingstreams:
1482 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1488 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1483 return 'error', {
1489 return 'error', {
1484 'message': _('received frame on unknown stream '
1490 'message': _('received frame on unknown stream '
1485 'without beginning of stream flag set'),
1491 'without beginning of stream flag set'),
1486 }
1492 }
1487
1493
1488 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1494 self._incomingstreams[frame.streamid] = inputstream(
1495 frame.streamid)
1489
1496
1490 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1497 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1491 raise error.ProgrammingError('support for decoding stream '
1498 raise error.ProgrammingError('support for decoding stream '
1492 'payloads not yet implemneted')
1499 'payloads not yet implemneted')
1493
1500
1494 if frame.streamflags & STREAM_FLAG_END_STREAM:
1501 if frame.streamflags & STREAM_FLAG_END_STREAM:
1495 del self._incomingstreams[frame.streamid]
1502 del self._incomingstreams[frame.streamid]
1496
1503
1497 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1504 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1498 return self._onstreamsettingsframe(frame)
1505 return self._onstreamsettingsframe(frame)
1499
1506
1500 if frame.requestid not in self._activerequests:
1507 if frame.requestid not in self._activerequests:
1501 return 'error', {
1508 return 'error', {
1502 'message': (_('received frame for inactive request ID: %d') %
1509 'message': (_('received frame for inactive request ID: %d') %
1503 frame.requestid),
1510 frame.requestid),
1504 }
1511 }
1505
1512
1506 request = self._activerequests[frame.requestid]
1513 request = self._activerequests[frame.requestid]
1507 request.state = 'receiving'
1514 request.state = 'receiving'
1508
1515
1509 handlers = {
1516 handlers = {
1510 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1517 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1511 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1518 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1512 }
1519 }
1513
1520
1514 meth = handlers.get(frame.typeid)
1521 meth = handlers.get(frame.typeid)
1515 if not meth:
1522 if not meth:
1516 raise error.ProgrammingError('unhandled frame type: %d' %
1523 raise error.ProgrammingError('unhandled frame type: %d' %
1517 frame.typeid)
1524 frame.typeid)
1518
1525
1519 return meth(request, frame)
1526 return meth(request, frame)
1520
1527
1521 def _onstreamsettingsframe(self, frame):
1528 def _onstreamsettingsframe(self, frame):
1522 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1529 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1523
1530
1524 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1531 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1525 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1532 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1526
1533
1527 if more and eos:
1534 if more and eos:
1528 return 'error', {
1535 return 'error', {
1529 'message': (_('stream encoding settings frame cannot have both '
1536 'message': (_('stream encoding settings frame cannot have both '
1530 'continuation and end of stream flags set')),
1537 'continuation and end of stream flags set')),
1531 }
1538 }
1532
1539
1533 if not more and not eos:
1540 if not more and not eos:
1534 return 'error', {
1541 return 'error', {
1535 'message': _('stream encoding settings frame must have '
1542 'message': _('stream encoding settings frame must have '
1536 'continuation or end of stream flag set'),
1543 'continuation or end of stream flag set'),
1537 }
1544 }
1538
1545
1539 if frame.streamid not in self._streamsettingsdecoders:
1546 if frame.streamid not in self._streamsettingsdecoders:
1540 decoder = cborutil.bufferingdecoder()
1547 decoder = cborutil.bufferingdecoder()
1541 self._streamsettingsdecoders[frame.streamid] = decoder
1548 self._streamsettingsdecoders[frame.streamid] = decoder
1542
1549
1543 decoder = self._streamsettingsdecoders[frame.streamid]
1550 decoder = self._streamsettingsdecoders[frame.streamid]
1544
1551
1545 try:
1552 try:
1546 decoder.decode(frame.payload)
1553 decoder.decode(frame.payload)
1547 except Exception as e:
1554 except Exception as e:
1548 return 'error', {
1555 return 'error', {
1549 'message': (_('error decoding CBOR from stream encoding '
1556 'message': (_('error decoding CBOR from stream encoding '
1550 'settings frame: %s') %
1557 'settings frame: %s') %
1551 stringutil.forcebytestr(e)),
1558 stringutil.forcebytestr(e)),
1552 }
1559 }
1553
1560
1554 if more:
1561 if more:
1555 return 'noop', {}
1562 return 'noop', {}
1556
1563
1557 assert eos
1564 assert eos
1558
1565
1559 decoded = decoder.getavailable()
1566 decoded = decoder.getavailable()
1560 del self._streamsettingsdecoders[frame.streamid]
1567 del self._streamsettingsdecoders[frame.streamid]
1561
1568
1562 if not decoded:
1569 if not decoded:
1563 return 'error', {
1570 return 'error', {
1564 'message': _('stream encoding settings frame did not contain '
1571 'message': _('stream encoding settings frame did not contain '
1565 'CBOR data'),
1572 'CBOR data'),
1566 }
1573 }
1567
1574
1568 try:
1575 try:
1569 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1576 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1570 decoded[1:])
1577 decoded[1:])
1571 except Exception as e:
1578 except Exception as e:
1572 return 'error', {
1579 return 'error', {
1573 'message': (_('error setting stream decoder: %s') %
1580 'message': (_('error setting stream decoder: %s') %
1574 stringutil.forcebytestr(e)),
1581 stringutil.forcebytestr(e)),
1575 }
1582 }
1576
1583
1577 return 'noop', {}
1584 return 'noop', {}
1578
1585
1579 def _oncommandresponseframe(self, request, frame):
1586 def _oncommandresponseframe(self, request, frame):
1580 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1587 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1581 request.state = 'received'
1588 request.state = 'received'
1582 del self._activerequests[request.requestid]
1589 del self._activerequests[request.requestid]
1583
1590
1584 return 'responsedata', {
1591 return 'responsedata', {
1585 'request': request,
1592 'request': request,
1586 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1593 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1587 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1594 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1588 'data': frame.payload,
1595 'data': frame.payload,
1589 }
1596 }
1590
1597
1591 def _onerrorresponseframe(self, request, frame):
1598 def _onerrorresponseframe(self, request, frame):
1592 request.state = 'errored'
1599 request.state = 'errored'
1593 del self._activerequests[request.requestid]
1600 del self._activerequests[request.requestid]
1594
1601
1595 # The payload should be a CBOR map.
1602 # The payload should be a CBOR map.
1596 m = cborutil.decodeall(frame.payload)[0]
1603 m = cborutil.decodeall(frame.payload)[0]
1597
1604
1598 return 'error', {
1605 return 'error', {
1599 'request': request,
1606 'request': request,
1600 'type': m['type'],
1607 'type': m['type'],
1601 'message': m['message'],
1608 'message': m['message'],
1602 }
1609 }
General Comments 0
You need to be logged in to leave comments. Login now