##// END OF EJS Templates
typing: suppress bogus pytype errors in `mercurial/wireprotoframing.py`...
Matt Harbison -
r53021:0c260e71 default
parent child Browse files
Show More
@@ -1,2094 +1,2096
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import annotations
12 from __future__ import annotations
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16 import typing
16 import typing
17
17
18 from .i18n import _
18 from .i18n import _
19 from .thirdparty import attr
19 from .thirdparty import attr
20
20
21 # Force pytype to use the non-vendored package
21 # Force pytype to use the non-vendored package
22 if typing.TYPE_CHECKING:
22 if typing.TYPE_CHECKING:
23 # noinspection PyPackageRequirements
23 # noinspection PyPackageRequirements
24 import attr
24 import attr
25
25
26 from . import (
26 from . import (
27 encoding,
27 encoding,
28 error,
28 error,
29 pycompat,
29 pycompat,
30 util,
30 util,
31 wireprototypes,
31 wireprototypes,
32 )
32 )
33 from .utils import (
33 from .utils import (
34 cborutil,
34 cborutil,
35 stringutil,
35 stringutil,
36 )
36 )
37
37
38 FRAME_HEADER_SIZE = 8
38 FRAME_HEADER_SIZE = 8
39 DEFAULT_MAX_FRAME_SIZE = 32768
39 DEFAULT_MAX_FRAME_SIZE = 32768
40
40
41 STREAM_FLAG_BEGIN_STREAM = 0x01
41 STREAM_FLAG_BEGIN_STREAM = 0x01
42 STREAM_FLAG_END_STREAM = 0x02
42 STREAM_FLAG_END_STREAM = 0x02
43 STREAM_FLAG_ENCODING_APPLIED = 0x04
43 STREAM_FLAG_ENCODING_APPLIED = 0x04
44
44
45 STREAM_FLAGS = {
45 STREAM_FLAGS = {
46 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
46 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
47 b'stream-end': STREAM_FLAG_END_STREAM,
47 b'stream-end': STREAM_FLAG_END_STREAM,
48 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
48 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
49 }
49 }
50
50
51 FRAME_TYPE_COMMAND_REQUEST = 0x01
51 FRAME_TYPE_COMMAND_REQUEST = 0x01
52 FRAME_TYPE_COMMAND_DATA = 0x02
52 FRAME_TYPE_COMMAND_DATA = 0x02
53 FRAME_TYPE_COMMAND_RESPONSE = 0x03
53 FRAME_TYPE_COMMAND_RESPONSE = 0x03
54 FRAME_TYPE_ERROR_RESPONSE = 0x05
54 FRAME_TYPE_ERROR_RESPONSE = 0x05
55 FRAME_TYPE_TEXT_OUTPUT = 0x06
55 FRAME_TYPE_TEXT_OUTPUT = 0x06
56 FRAME_TYPE_PROGRESS = 0x07
56 FRAME_TYPE_PROGRESS = 0x07
57 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
57 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
58 FRAME_TYPE_STREAM_SETTINGS = 0x09
58 FRAME_TYPE_STREAM_SETTINGS = 0x09
59
59
60 FRAME_TYPES = {
60 FRAME_TYPES = {
61 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
61 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
62 b'command-data': FRAME_TYPE_COMMAND_DATA,
62 b'command-data': FRAME_TYPE_COMMAND_DATA,
63 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
63 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
64 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
64 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
65 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
65 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
66 b'progress': FRAME_TYPE_PROGRESS,
66 b'progress': FRAME_TYPE_PROGRESS,
67 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
67 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
68 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
68 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
69 }
69 }
70
70
71 FLAG_COMMAND_REQUEST_NEW = 0x01
71 FLAG_COMMAND_REQUEST_NEW = 0x01
72 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
72 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
73 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
73 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
74 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
74 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
75
75
76 FLAGS_COMMAND_REQUEST = {
76 FLAGS_COMMAND_REQUEST = {
77 b'new': FLAG_COMMAND_REQUEST_NEW,
77 b'new': FLAG_COMMAND_REQUEST_NEW,
78 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
78 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
79 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
79 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
80 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
80 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
81 }
81 }
82
82
83 FLAG_COMMAND_DATA_CONTINUATION = 0x01
83 FLAG_COMMAND_DATA_CONTINUATION = 0x01
84 FLAG_COMMAND_DATA_EOS = 0x02
84 FLAG_COMMAND_DATA_EOS = 0x02
85
85
86 FLAGS_COMMAND_DATA = {
86 FLAGS_COMMAND_DATA = {
87 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
87 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
88 b'eos': FLAG_COMMAND_DATA_EOS,
88 b'eos': FLAG_COMMAND_DATA_EOS,
89 }
89 }
90
90
91 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
91 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
92 FLAG_COMMAND_RESPONSE_EOS = 0x02
92 FLAG_COMMAND_RESPONSE_EOS = 0x02
93
93
94 FLAGS_COMMAND_RESPONSE = {
94 FLAGS_COMMAND_RESPONSE = {
95 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
95 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
96 b'eos': FLAG_COMMAND_RESPONSE_EOS,
96 b'eos': FLAG_COMMAND_RESPONSE_EOS,
97 }
97 }
98
98
99 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
99 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
100 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
100 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
101
101
102 FLAGS_SENDER_PROTOCOL_SETTINGS = {
102 FLAGS_SENDER_PROTOCOL_SETTINGS = {
103 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
103 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
104 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
104 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
105 }
105 }
106
106
107 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
107 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
108 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
108 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
109
109
110 FLAGS_STREAM_ENCODING_SETTINGS = {
110 FLAGS_STREAM_ENCODING_SETTINGS = {
111 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
111 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
112 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
112 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
113 }
113 }
114
114
115 # Maps frame types to their available flags.
115 # Maps frame types to their available flags.
116 FRAME_TYPE_FLAGS = {
116 FRAME_TYPE_FLAGS = {
117 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
117 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
118 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
118 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
119 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
119 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
120 FRAME_TYPE_ERROR_RESPONSE: {},
120 FRAME_TYPE_ERROR_RESPONSE: {},
121 FRAME_TYPE_TEXT_OUTPUT: {},
121 FRAME_TYPE_TEXT_OUTPUT: {},
122 FRAME_TYPE_PROGRESS: {},
122 FRAME_TYPE_PROGRESS: {},
123 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
123 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
124 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
124 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
125 }
125 }
126
126
127 ARGUMENT_RECORD_HEADER = struct.Struct('<HH')
127 ARGUMENT_RECORD_HEADER = struct.Struct('<HH')
128
128
129
129
130 def humanflags(mapping, value):
130 def humanflags(mapping, value):
131 """Convert a numeric flags value to a human value, using a mapping table."""
131 """Convert a numeric flags value to a human value, using a mapping table."""
132 namemap = {v: k for k, v in mapping.items()}
132 namemap = {v: k for k, v in mapping.items()}
133 flags = []
133 flags = []
134 val = 1
134 val = 1
135 while value >= val:
135 while value >= val:
136 if value & val:
136 if value & val:
137 flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
137 flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
138 val <<= 1
138 val <<= 1
139
139
140 return b'|'.join(flags)
140 return b'|'.join(flags)
141
141
142
142
143 @attr.s(slots=True)
143 @attr.s(slots=True)
144 class frameheader:
144 class frameheader:
145 """Represents the data in a frame header."""
145 """Represents the data in a frame header."""
146
146
147 length = attr.ib()
147 length = attr.ib()
148 requestid = attr.ib()
148 requestid = attr.ib()
149 streamid = attr.ib()
149 streamid = attr.ib()
150 streamflags = attr.ib()
150 streamflags = attr.ib()
151 typeid = attr.ib()
151 typeid = attr.ib()
152 flags = attr.ib()
152 flags = attr.ib()
153
153
154
154
155 @attr.s(slots=True, repr=False)
155 @attr.s(slots=True, repr=False)
156 class frame:
156 class frame:
157 """Represents a parsed frame."""
157 """Represents a parsed frame."""
158
158
159 requestid = attr.ib()
159 requestid = attr.ib()
160 streamid = attr.ib()
160 streamid = attr.ib()
161 streamflags = attr.ib()
161 streamflags = attr.ib()
162 typeid = attr.ib()
162 typeid = attr.ib()
163 flags = attr.ib()
163 flags = attr.ib()
164 payload = attr.ib()
164 payload = attr.ib()
165
165
166 @encoding.strmethod
166 @encoding.strmethod
167 def __repr__(self):
167 def __repr__(self):
168 typename = b'<unknown 0x%02x>' % self.typeid
168 typename = b'<unknown 0x%02x>' % self.typeid
169 for name, value in FRAME_TYPES.items():
169 for name, value in FRAME_TYPES.items():
170 if value == self.typeid:
170 if value == self.typeid:
171 typename = name
171 typename = name
172 break
172 break
173
173
174 return (
174 return (
175 b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
175 b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
176 b'type=%s; flags=%s)'
176 b'type=%s; flags=%s)'
177 % (
177 % (
178 len(self.payload),
178 len(self.payload),
179 self.requestid,
179 self.requestid,
180 self.streamid,
180 self.streamid,
181 humanflags(STREAM_FLAGS, self.streamflags),
181 humanflags(STREAM_FLAGS, self.streamflags),
182 typename,
182 typename,
183 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
183 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
184 )
184 )
185 )
185 )
186
186
187
187
188 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
188 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
189 """Assemble a frame into a byte array."""
189 """Assemble a frame into a byte array."""
190 # TODO assert size of payload.
190 # TODO assert size of payload.
191 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
191 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
192
192
193 # 24 bits length
193 # 24 bits length
194 # 16 bits request id
194 # 16 bits request id
195 # 8 bits stream id
195 # 8 bits stream id
196 # 8 bits stream flags
196 # 8 bits stream flags
197 # 4 bits type
197 # 4 bits type
198 # 4 bits flags
198 # 4 bits flags
199
199
200 l = struct.pack('<I', len(payload))
200 l = struct.pack('<I', len(payload))
201 frame[0:3] = l[0:3]
201 frame[0:3] = l[0:3]
202 struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags)
202 struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags)
203 frame[7] = (typeid << 4) | flags
203 frame[7] = (typeid << 4) | flags
204 frame[8:] = payload
204 frame[8:] = payload
205
205
206 return frame
206 return frame
207
207
208
208
209 def makeframefromhumanstring(s):
209 def makeframefromhumanstring(s):
210 """Create a frame from a human readable string
210 """Create a frame from a human readable string
211
211
212 Strings have the form:
212 Strings have the form:
213
213
214 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
214 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
215
215
216 This can be used by user-facing applications and tests for creating
216 This can be used by user-facing applications and tests for creating
217 frames easily without having to type out a bunch of constants.
217 frames easily without having to type out a bunch of constants.
218
218
219 Request ID and stream IDs are integers.
219 Request ID and stream IDs are integers.
220
220
221 Stream flags, frame type, and flags can be specified by integer or
221 Stream flags, frame type, and flags can be specified by integer or
222 named constant.
222 named constant.
223
223
224 Flags can be delimited by `|` to bitwise OR them together.
224 Flags can be delimited by `|` to bitwise OR them together.
225
225
226 If the payload begins with ``cbor:``, the following string will be
226 If the payload begins with ``cbor:``, the following string will be
227 evaluated as Python literal and the resulting object will be fed into
227 evaluated as Python literal and the resulting object will be fed into
228 a CBOR encoder. Otherwise, the payload is interpreted as a Python
228 a CBOR encoder. Otherwise, the payload is interpreted as a Python
229 byte string literal.
229 byte string literal.
230 """
230 """
231 fields = s.split(b' ', 5)
231 fields = s.split(b' ', 5)
232 requestid, streamid, streamflags, frametype, frameflags, payload = fields
232 requestid, streamid, streamflags, frametype, frameflags, payload = fields
233
233
234 requestid = int(requestid)
234 requestid = int(requestid)
235 streamid = int(streamid)
235 streamid = int(streamid)
236
236
237 finalstreamflags = 0
237 finalstreamflags = 0
238 for flag in streamflags.split(b'|'):
238 for flag in streamflags.split(b'|'):
239 if flag in STREAM_FLAGS:
239 if flag in STREAM_FLAGS:
240 finalstreamflags |= STREAM_FLAGS[flag]
240 finalstreamflags |= STREAM_FLAGS[flag]
241 else:
241 else:
242 finalstreamflags |= int(flag)
242 finalstreamflags |= int(flag)
243
243
244 if frametype in FRAME_TYPES:
244 if frametype in FRAME_TYPES:
245 frametype = FRAME_TYPES[frametype]
245 frametype = FRAME_TYPES[frametype]
246 else:
246 else:
247 frametype = int(frametype)
247 frametype = int(frametype)
248
248
249 finalflags = 0
249 finalflags = 0
250 validflags = FRAME_TYPE_FLAGS[frametype]
250 validflags = FRAME_TYPE_FLAGS[frametype]
251 for flag in frameflags.split(b'|'):
251 for flag in frameflags.split(b'|'):
252 if flag in validflags:
252 if flag in validflags:
253 finalflags |= validflags[flag]
253 finalflags |= validflags[flag]
254 else:
254 else:
255 finalflags |= int(flag)
255 finalflags |= int(flag)
256
256
257 if payload.startswith(b'cbor:'):
257 if payload.startswith(b'cbor:'):
258 payload = b''.join(
258 payload = b''.join(
259 cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
259 cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
260 )
260 )
261
261
262 else:
262 else:
263 payload = stringutil.unescapestr(payload)
263 payload = stringutil.unescapestr(payload)
264
264
265 return makeframe(
265 return makeframe(
266 requestid=requestid,
266 requestid=requestid,
267 streamid=streamid,
267 streamid=streamid,
268 streamflags=finalstreamflags,
268 streamflags=finalstreamflags,
269 typeid=frametype,
269 typeid=frametype,
270 flags=finalflags,
270 flags=finalflags,
271 payload=payload,
271 payload=payload,
272 )
272 )
273
273
274
274
275 def parseheader(data):
275 def parseheader(data):
276 """Parse a unified framing protocol frame header from a buffer.
276 """Parse a unified framing protocol frame header from a buffer.
277
277
278 The header is expected to be in the buffer at offset 0 and the
278 The header is expected to be in the buffer at offset 0 and the
279 buffer is expected to be large enough to hold a full header.
279 buffer is expected to be large enough to hold a full header.
280 """
280 """
281 # 24 bits payload length (little endian)
281 # 24 bits payload length (little endian)
282 # 16 bits request ID
282 # 16 bits request ID
283 # 8 bits stream ID
283 # 8 bits stream ID
284 # 8 bits stream flags
284 # 8 bits stream flags
285 # 4 bits frame type
285 # 4 bits frame type
286 # 4 bits frame flags
286 # 4 bits frame flags
287 # ... payload
287 # ... payload
288 framelength = data[0] + 256 * data[1] + 16384 * data[2]
288 framelength = data[0] + 256 * data[1] + 16384 * data[2]
289 requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3)
289 requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3)
290 typeflags = data[7]
290 typeflags = data[7]
291
291
292 frametype = (typeflags & 0xF0) >> 4
292 frametype = (typeflags & 0xF0) >> 4
293 frameflags = typeflags & 0x0F
293 frameflags = typeflags & 0x0F
294
294
295 return frameheader(
295 return frameheader(
296 framelength, requestid, streamid, streamflags, frametype, frameflags
296 framelength, requestid, streamid, streamflags, frametype, frameflags
297 )
297 )
298
298
299
299
300 def readframe(fh):
300 def readframe(fh):
301 """Read a unified framing protocol frame from a file object.
301 """Read a unified framing protocol frame from a file object.
302
302
303 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
303 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
304 None if no frame is available. May raise if a malformed frame is
304 None if no frame is available. May raise if a malformed frame is
305 seen.
305 seen.
306 """
306 """
307 header = bytearray(FRAME_HEADER_SIZE)
307 header = bytearray(FRAME_HEADER_SIZE)
308
308
309 readcount = fh.readinto(header)
309 readcount = fh.readinto(header)
310
310
311 if readcount == 0:
311 if readcount == 0:
312 return None
312 return None
313
313
314 if readcount != FRAME_HEADER_SIZE:
314 if readcount != FRAME_HEADER_SIZE:
315 raise error.Abort(
315 raise error.Abort(
316 _(b'received incomplete frame: got %d bytes: %s')
316 _(b'received incomplete frame: got %d bytes: %s')
317 % (readcount, header)
317 % (readcount, header)
318 )
318 )
319
319
320 h = parseheader(header)
320 h = parseheader(header)
321
321
322 payload = fh.read(h.length)
322 payload = fh.read(h.length)
323 if len(payload) != h.length:
323 if len(payload) != h.length:
324 raise error.Abort(
324 raise error.Abort(
325 _(b'frame length error: expected %d; got %d')
325 _(b'frame length error: expected %d; got %d')
326 % (h.length, len(payload))
326 % (h.length, len(payload))
327 )
327 )
328
328
329 return frame(
329 return frame(
330 h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
330 h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
331 )
331 )
332
332
333
333
334 def createcommandframes(
334 def createcommandframes(
335 stream,
335 stream,
336 requestid,
336 requestid,
337 cmd,
337 cmd,
338 args,
338 args,
339 datafh=None,
339 datafh=None,
340 maxframesize=DEFAULT_MAX_FRAME_SIZE,
340 maxframesize=DEFAULT_MAX_FRAME_SIZE,
341 redirect=None,
341 redirect=None,
342 ):
342 ):
343 """Create frames necessary to transmit a request to run a command.
343 """Create frames necessary to transmit a request to run a command.
344
344
345 This is a generator of bytearrays. Each item represents a frame
345 This is a generator of bytearrays. Each item represents a frame
346 ready to be sent over the wire to a peer.
346 ready to be sent over the wire to a peer.
347 """
347 """
348 data = {b'name': cmd}
348 data = {b'name': cmd}
349 if args:
349 if args:
350 data[b'args'] = args
350 data[b'args'] = args
351
351
352 if redirect:
352 if redirect:
353 data[b'redirect'] = redirect
353 data[b'redirect'] = redirect
354
354
355 data = b''.join(cborutil.streamencode(data))
355 data = b''.join(cborutil.streamencode(data))
356
356
357 offset = 0
357 offset = 0
358
358
359 while True:
359 while True:
360 flags = 0
360 flags = 0
361
361
362 # Must set new or continuation flag.
362 # Must set new or continuation flag.
363 if not offset:
363 if not offset:
364 flags |= FLAG_COMMAND_REQUEST_NEW
364 flags |= FLAG_COMMAND_REQUEST_NEW
365 else:
365 else:
366 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
366 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
367
367
368 # Data frames is set on all frames.
368 # Data frames is set on all frames.
369 if datafh:
369 if datafh:
370 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
370 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
371
371
372 payload = data[offset : offset + maxframesize]
372 payload = data[offset : offset + maxframesize]
373 offset += len(payload)
373 offset += len(payload)
374
374
375 if len(payload) == maxframesize and offset < len(data):
375 if len(payload) == maxframesize and offset < len(data):
376 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
376 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
377
377
378 yield stream.makeframe(
378 yield stream.makeframe(
379 requestid=requestid,
379 requestid=requestid,
380 typeid=FRAME_TYPE_COMMAND_REQUEST,
380 typeid=FRAME_TYPE_COMMAND_REQUEST,
381 flags=flags,
381 flags=flags,
382 payload=payload,
382 payload=payload,
383 )
383 )
384
384
385 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
385 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
386 break
386 break
387
387
388 if datafh:
388 if datafh:
389 while True:
389 while True:
390 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
390 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
391
391
392 done = False
392 done = False
393 if len(data) == DEFAULT_MAX_FRAME_SIZE:
393 if len(data) == DEFAULT_MAX_FRAME_SIZE:
394 flags = FLAG_COMMAND_DATA_CONTINUATION
394 flags = FLAG_COMMAND_DATA_CONTINUATION
395 else:
395 else:
396 flags = FLAG_COMMAND_DATA_EOS
396 flags = FLAG_COMMAND_DATA_EOS
397 assert datafh.read(1) == b''
397 assert datafh.read(1) == b''
398 done = True
398 done = True
399
399
400 yield stream.makeframe(
400 yield stream.makeframe(
401 requestid=requestid,
401 requestid=requestid,
402 typeid=FRAME_TYPE_COMMAND_DATA,
402 typeid=FRAME_TYPE_COMMAND_DATA,
403 flags=flags,
403 flags=flags,
404 payload=data,
404 payload=data,
405 )
405 )
406
406
407 if done:
407 if done:
408 break
408 break
409
409
410
410
411 def createcommandresponseokframe(stream, requestid):
411 def createcommandresponseokframe(stream, requestid):
412 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
412 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
413
413
414 if stream.streamsettingssent:
414 if stream.streamsettingssent:
415 overall = stream.encode(overall)
415 overall = stream.encode(overall)
416 encoded = True
416 encoded = True
417
417
418 if not overall:
418 if not overall:
419 return None
419 return None
420 else:
420 else:
421 encoded = False
421 encoded = False
422
422
423 return stream.makeframe(
423 return stream.makeframe(
424 requestid=requestid,
424 requestid=requestid,
425 typeid=FRAME_TYPE_COMMAND_RESPONSE,
425 typeid=FRAME_TYPE_COMMAND_RESPONSE,
426 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
426 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
427 payload=overall,
427 payload=overall,
428 encoded=encoded,
428 encoded=encoded,
429 )
429 )
430
430
431
431
432 def createcommandresponseeosframes(
432 def createcommandresponseeosframes(
433 stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
433 stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
434 ):
434 ):
435 """Create an empty payload frame representing command end-of-stream."""
435 """Create an empty payload frame representing command end-of-stream."""
436 payload = stream.flush()
436 payload = stream.flush()
437
437
438 offset = 0
438 offset = 0
439 while True:
439 while True:
440 chunk = payload[offset : offset + maxframesize]
440 chunk = payload[offset : offset + maxframesize]
441 offset += len(chunk)
441 offset += len(chunk)
442
442
443 done = offset == len(payload)
443 done = offset == len(payload)
444
444
445 if done:
445 if done:
446 flags = FLAG_COMMAND_RESPONSE_EOS
446 flags = FLAG_COMMAND_RESPONSE_EOS
447 else:
447 else:
448 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
448 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
449
449
450 yield stream.makeframe(
450 yield stream.makeframe(
451 requestid=requestid,
451 requestid=requestid,
452 typeid=FRAME_TYPE_COMMAND_RESPONSE,
452 typeid=FRAME_TYPE_COMMAND_RESPONSE,
453 flags=flags,
453 flags=flags,
454 payload=chunk,
454 payload=chunk,
455 encoded=payload != b'',
455 encoded=payload != b'',
456 )
456 )
457
457
458 if done:
458 if done:
459 break
459 break
460
460
461
461
462 def createalternatelocationresponseframe(stream, requestid, location):
462 def createalternatelocationresponseframe(stream, requestid, location):
463 data = {
463 data = {
464 b'status': b'redirect',
464 b'status': b'redirect',
465 b'location': {
465 b'location': {
466 b'url': location.url,
466 b'url': location.url,
467 b'mediatype': location.mediatype,
467 b'mediatype': location.mediatype,
468 },
468 },
469 }
469 }
470
470
471 for a in (
471 for a in (
472 'size',
472 'size',
473 'fullhashes',
473 'fullhashes',
474 'fullhashseed',
474 'fullhashseed',
475 'serverdercerts',
475 'serverdercerts',
476 'servercadercerts',
476 'servercadercerts',
477 ):
477 ):
478 value = getattr(location, a)
478 value = getattr(location, a)
479 if value is not None:
479 if value is not None:
480 # pytype: disable=unsupported-operands
480 data[b'location'][pycompat.bytestr(a)] = value
481 data[b'location'][pycompat.bytestr(a)] = value
482 # pytype: enable=unsupported-operands
481
483
482 payload = b''.join(cborutil.streamencode(data))
484 payload = b''.join(cborutil.streamencode(data))
483
485
484 if stream.streamsettingssent:
486 if stream.streamsettingssent:
485 payload = stream.encode(payload)
487 payload = stream.encode(payload)
486 encoded = True
488 encoded = True
487 else:
489 else:
488 encoded = False
490 encoded = False
489
491
490 return stream.makeframe(
492 return stream.makeframe(
491 requestid=requestid,
493 requestid=requestid,
492 typeid=FRAME_TYPE_COMMAND_RESPONSE,
494 typeid=FRAME_TYPE_COMMAND_RESPONSE,
493 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
495 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
494 payload=payload,
496 payload=payload,
495 encoded=encoded,
497 encoded=encoded,
496 )
498 )
497
499
498
500
499 def createcommanderrorresponse(stream, requestid, message, args=None):
501 def createcommanderrorresponse(stream, requestid, message, args=None):
500 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
502 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
501 # formatting works consistently?
503 # formatting works consistently?
502 m = {
504 m = {
503 b'status': b'error',
505 b'status': b'error',
504 b'error': {
506 b'error': {
505 b'message': message,
507 b'message': message,
506 },
508 },
507 }
509 }
508
510
509 if args:
511 if args:
510 m[b'error'][b'args'] = args
512 m[b'error'][b'args'] = args # pytype: disable=unsupported-operands
511
513
512 overall = b''.join(cborutil.streamencode(m))
514 overall = b''.join(cborutil.streamencode(m))
513
515
514 yield stream.makeframe(
516 yield stream.makeframe(
515 requestid=requestid,
517 requestid=requestid,
516 typeid=FRAME_TYPE_COMMAND_RESPONSE,
518 typeid=FRAME_TYPE_COMMAND_RESPONSE,
517 flags=FLAG_COMMAND_RESPONSE_EOS,
519 flags=FLAG_COMMAND_RESPONSE_EOS,
518 payload=overall,
520 payload=overall,
519 )
521 )
520
522
521
523
522 def createerrorframe(stream, requestid, msg, errtype):
524 def createerrorframe(stream, requestid, msg, errtype):
523 # TODO properly handle frame size limits.
525 # TODO properly handle frame size limits.
524 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
526 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
525
527
526 payload = b''.join(
528 payload = b''.join(
527 cborutil.streamencode(
529 cborutil.streamencode(
528 {
530 {
529 b'type': errtype,
531 b'type': errtype,
530 b'message': [{b'msg': msg}],
532 b'message': [{b'msg': msg}],
531 }
533 }
532 )
534 )
533 )
535 )
534
536
535 yield stream.makeframe(
537 yield stream.makeframe(
536 requestid=requestid,
538 requestid=requestid,
537 typeid=FRAME_TYPE_ERROR_RESPONSE,
539 typeid=FRAME_TYPE_ERROR_RESPONSE,
538 flags=0,
540 flags=0,
539 payload=payload,
541 payload=payload,
540 )
542 )
541
543
542
544
543 def createtextoutputframe(
545 def createtextoutputframe(
544 stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
546 stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
545 ):
547 ):
546 """Create a text output frame to render text to people.
548 """Create a text output frame to render text to people.
547
549
548 ``atoms`` is a 3-tuple of (formatting string, args, labels).
550 ``atoms`` is a 3-tuple of (formatting string, args, labels).
549
551
550 The formatting string contains ``%s`` tokens to be replaced by the
552 The formatting string contains ``%s`` tokens to be replaced by the
551 corresponding indexed entry in ``args``. ``labels`` is an iterable of
553 corresponding indexed entry in ``args``. ``labels`` is an iterable of
552 formatters to be applied at rendering time. In terms of the ``ui``
554 formatters to be applied at rendering time. In terms of the ``ui``
553 class, each atom corresponds to a ``ui.write()``.
555 class, each atom corresponds to a ``ui.write()``.
554 """
556 """
555 atomdicts = []
557 atomdicts = []
556
558
557 for formatting, args, labels in atoms:
559 for formatting, args, labels in atoms:
558 # TODO look for localstr, other types here?
560 # TODO look for localstr, other types here?
559
561
560 if not isinstance(formatting, bytes):
562 if not isinstance(formatting, bytes):
561 raise ValueError(b'must use bytes formatting strings')
563 raise ValueError(b'must use bytes formatting strings')
562 for arg in args:
564 for arg in args:
563 if not isinstance(arg, bytes):
565 if not isinstance(arg, bytes):
564 raise ValueError(b'must use bytes for arguments')
566 raise ValueError(b'must use bytes for arguments')
565 for label in labels:
567 for label in labels:
566 if not isinstance(label, bytes):
568 if not isinstance(label, bytes):
567 raise ValueError(b'must use bytes for labels')
569 raise ValueError(b'must use bytes for labels')
568
570
569 # Formatting string must be ASCII.
571 # Formatting string must be ASCII.
570 formatting = formatting.decode('ascii', 'replace').encode('ascii')
572 formatting = formatting.decode('ascii', 'replace').encode('ascii')
571
573
572 # Arguments must be UTF-8.
574 # Arguments must be UTF-8.
573 args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args]
575 args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args]
574
576
575 # Labels must be ASCII.
577 # Labels must be ASCII.
576 labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels]
578 labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels]
577
579
578 atom = {b'msg': formatting}
580 atom = {b'msg': formatting}
579 if args:
581 if args:
580 atom[b'args'] = args
582 atom[b'args'] = args
581 if labels:
583 if labels:
582 atom[b'labels'] = labels
584 atom[b'labels'] = labels
583
585
584 atomdicts.append(atom)
586 atomdicts.append(atom)
585
587
586 payload = b''.join(cborutil.streamencode(atomdicts))
588 payload = b''.join(cborutil.streamencode(atomdicts))
587
589
588 if len(payload) > maxframesize:
590 if len(payload) > maxframesize:
589 raise ValueError(b'cannot encode data in a single frame')
591 raise ValueError(b'cannot encode data in a single frame')
590
592
591 yield stream.makeframe(
593 yield stream.makeframe(
592 requestid=requestid,
594 requestid=requestid,
593 typeid=FRAME_TYPE_TEXT_OUTPUT,
595 typeid=FRAME_TYPE_TEXT_OUTPUT,
594 flags=0,
596 flags=0,
595 payload=payload,
597 payload=payload,
596 )
598 )
597
599
598
600
599 class bufferingcommandresponseemitter:
601 class bufferingcommandresponseemitter:
600 """Helper object to emit command response frames intelligently.
602 """Helper object to emit command response frames intelligently.
601
603
602 Raw command response data is likely emitted in chunks much smaller
604 Raw command response data is likely emitted in chunks much smaller
603 than what can fit in a single frame. This class exists to buffer
605 than what can fit in a single frame. This class exists to buffer
604 chunks until enough data is available to fit in a single frame.
606 chunks until enough data is available to fit in a single frame.
605
607
606 TODO we'll need something like this when compression is supported.
608 TODO we'll need something like this when compression is supported.
607 So it might make sense to implement this functionality at the stream
609 So it might make sense to implement this functionality at the stream
608 level.
610 level.
609 """
611 """
610
612
611 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
613 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
612 self._stream = stream
614 self._stream = stream
613 self._requestid = requestid
615 self._requestid = requestid
614 self._maxsize = maxframesize
616 self._maxsize = maxframesize
615 self._chunks = []
617 self._chunks = []
616 self._chunkssize = 0
618 self._chunkssize = 0
617
619
618 def send(self, data):
620 def send(self, data):
619 """Send new data for emission.
621 """Send new data for emission.
620
622
621 Is a generator of new frames that were derived from the new input.
623 Is a generator of new frames that were derived from the new input.
622
624
623 If the special input ``None`` is received, flushes all buffered
625 If the special input ``None`` is received, flushes all buffered
624 data to frames.
626 data to frames.
625 """
627 """
626
628
627 if data is None:
629 if data is None:
628 for frame in self._flush():
630 for frame in self._flush():
629 yield frame
631 yield frame
630 return
632 return
631
633
632 data = self._stream.encode(data)
634 data = self._stream.encode(data)
633
635
634 # There is a ton of potential to do more complicated things here.
636 # There is a ton of potential to do more complicated things here.
635 # Our immediate goal is to coalesce small chunks into big frames,
637 # Our immediate goal is to coalesce small chunks into big frames,
636 # not achieve the fewest number of frames possible. So we go with
638 # not achieve the fewest number of frames possible. So we go with
637 # a simple implementation:
639 # a simple implementation:
638 #
640 #
639 # * If a chunk is too large for a frame, we flush and emit frames
641 # * If a chunk is too large for a frame, we flush and emit frames
640 # for the new chunk.
642 # for the new chunk.
641 # * If a chunk can be buffered without total buffered size limits
643 # * If a chunk can be buffered without total buffered size limits
642 # being exceeded, we do that.
644 # being exceeded, we do that.
643 # * If a chunk causes us to go over our buffering limit, we flush
645 # * If a chunk causes us to go over our buffering limit, we flush
644 # and then buffer the new chunk.
646 # and then buffer the new chunk.
645
647
646 if not data:
648 if not data:
647 return
649 return
648
650
649 if len(data) > self._maxsize:
651 if len(data) > self._maxsize:
650 for frame in self._flush():
652 for frame in self._flush():
651 yield frame
653 yield frame
652
654
653 # Now emit frames for the big chunk.
655 # Now emit frames for the big chunk.
654 offset = 0
656 offset = 0
655 while True:
657 while True:
656 chunk = data[offset : offset + self._maxsize]
658 chunk = data[offset : offset + self._maxsize]
657 offset += len(chunk)
659 offset += len(chunk)
658
660
659 yield self._stream.makeframe(
661 yield self._stream.makeframe(
660 self._requestid,
662 self._requestid,
661 typeid=FRAME_TYPE_COMMAND_RESPONSE,
663 typeid=FRAME_TYPE_COMMAND_RESPONSE,
662 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
664 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
663 payload=chunk,
665 payload=chunk,
664 encoded=True,
666 encoded=True,
665 )
667 )
666
668
667 if offset == len(data):
669 if offset == len(data):
668 return
670 return
669
671
670 # If we don't have enough to constitute a full frame, buffer and
672 # If we don't have enough to constitute a full frame, buffer and
671 # return.
673 # return.
672 if len(data) + self._chunkssize < self._maxsize:
674 if len(data) + self._chunkssize < self._maxsize:
673 self._chunks.append(data)
675 self._chunks.append(data)
674 self._chunkssize += len(data)
676 self._chunkssize += len(data)
675 return
677 return
676
678
677 # Else flush what we have and buffer the new chunk. We could do
679 # Else flush what we have and buffer the new chunk. We could do
678 # something more intelligent here, like break the chunk. Let's
680 # something more intelligent here, like break the chunk. Let's
679 # keep things simple for now.
681 # keep things simple for now.
680 for frame in self._flush():
682 for frame in self._flush():
681 yield frame
683 yield frame
682
684
683 self._chunks.append(data)
685 self._chunks.append(data)
684 self._chunkssize = len(data)
686 self._chunkssize = len(data)
685
687
686 def _flush(self):
688 def _flush(self):
687 payload = b''.join(self._chunks)
689 payload = b''.join(self._chunks)
688 assert len(payload) <= self._maxsize
690 assert len(payload) <= self._maxsize
689
691
690 self._chunks[:] = []
692 self._chunks[:] = []
691 self._chunkssize = 0
693 self._chunkssize = 0
692
694
693 if not payload:
695 if not payload:
694 return
696 return
695
697
696 yield self._stream.makeframe(
698 yield self._stream.makeframe(
697 self._requestid,
699 self._requestid,
698 typeid=FRAME_TYPE_COMMAND_RESPONSE,
700 typeid=FRAME_TYPE_COMMAND_RESPONSE,
699 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
701 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
700 payload=payload,
702 payload=payload,
701 encoded=True,
703 encoded=True,
702 )
704 )
703
705
704
706
705 # TODO consider defining encoders/decoders using the util.compressionengine
707 # TODO consider defining encoders/decoders using the util.compressionengine
706 # mechanism.
708 # mechanism.
707
709
708
710
709 class identityencoder:
711 class identityencoder:
710 """Encoder for the "identity" stream encoding profile."""
712 """Encoder for the "identity" stream encoding profile."""
711
713
712 def __init__(self, ui):
714 def __init__(self, ui):
713 pass
715 pass
714
716
715 def encode(self, data):
717 def encode(self, data):
716 return data
718 return data
717
719
718 def flush(self):
720 def flush(self):
719 return b''
721 return b''
720
722
721 def finish(self):
723 def finish(self):
722 return b''
724 return b''
723
725
724
726
725 class identitydecoder:
727 class identitydecoder:
726 """Decoder for the "identity" stream encoding profile."""
728 """Decoder for the "identity" stream encoding profile."""
727
729
728 def __init__(self, ui, extraobjs):
730 def __init__(self, ui, extraobjs):
729 if extraobjs:
731 if extraobjs:
730 raise error.Abort(
732 raise error.Abort(
731 _(b'identity decoder received unexpected additional values')
733 _(b'identity decoder received unexpected additional values')
732 )
734 )
733
735
734 def decode(self, data):
736 def decode(self, data):
735 return data
737 return data
736
738
737
739
738 class zlibencoder:
740 class zlibencoder:
739 def __init__(self, ui):
741 def __init__(self, ui):
740 import zlib
742 import zlib
741
743
742 self._zlib = zlib
744 self._zlib = zlib
743 self._compressor = zlib.compressobj()
745 self._compressor = zlib.compressobj()
744
746
745 def encode(self, data):
747 def encode(self, data):
746 return self._compressor.compress(data)
748 return self._compressor.compress(data)
747
749
748 def flush(self):
750 def flush(self):
749 # Z_SYNC_FLUSH doesn't reset compression context, which is
751 # Z_SYNC_FLUSH doesn't reset compression context, which is
750 # what we want.
752 # what we want.
751 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
753 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
752
754
753 def finish(self):
755 def finish(self):
754 res = self._compressor.flush(self._zlib.Z_FINISH)
756 res = self._compressor.flush(self._zlib.Z_FINISH)
755 self._compressor = None
757 self._compressor = None
756 return res
758 return res
757
759
758
760
759 class zlibdecoder:
761 class zlibdecoder:
760 def __init__(self, ui, extraobjs):
762 def __init__(self, ui, extraobjs):
761 import zlib
763 import zlib
762
764
763 if extraobjs:
765 if extraobjs:
764 raise error.Abort(
766 raise error.Abort(
765 _(b'zlib decoder received unexpected additional values')
767 _(b'zlib decoder received unexpected additional values')
766 )
768 )
767
769
768 self._decompressor = zlib.decompressobj()
770 self._decompressor = zlib.decompressobj()
769
771
770 def decode(self, data):
772 def decode(self, data):
771 return self._decompressor.decompress(data)
773 return self._decompressor.decompress(data)
772
774
773
775
774 class zstdbaseencoder:
776 class zstdbaseencoder:
775 def __init__(self, level):
777 def __init__(self, level):
776 from . import zstd
778 from . import zstd # pytype: disable=import-error
777
779
778 self._zstd = zstd
780 self._zstd = zstd
779 cctx = zstd.ZstdCompressor(level=level)
781 cctx = zstd.ZstdCompressor(level=level)
780 self._compressor = cctx.compressobj()
782 self._compressor = cctx.compressobj()
781
783
782 def encode(self, data):
784 def encode(self, data):
783 return self._compressor.compress(data)
785 return self._compressor.compress(data)
784
786
785 def flush(self):
787 def flush(self):
786 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
788 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
787 # compressor and allows a decompressor to access all encoded data
789 # compressor and allows a decompressor to access all encoded data
788 # up to this point.
790 # up to this point.
789 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
791 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
790
792
791 def finish(self):
793 def finish(self):
792 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
794 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
793 self._compressor = None
795 self._compressor = None
794 return res
796 return res
795
797
796
798
797 class zstd8mbencoder(zstdbaseencoder):
799 class zstd8mbencoder(zstdbaseencoder):
798 def __init__(self, ui):
800 def __init__(self, ui):
799 super(zstd8mbencoder, self).__init__(3)
801 super(zstd8mbencoder, self).__init__(3)
800
802
801
803
802 class zstdbasedecoder:
804 class zstdbasedecoder:
803 def __init__(self, maxwindowsize):
805 def __init__(self, maxwindowsize):
804 from . import zstd
806 from . import zstd # pytype: disable=import-error
805
807
806 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
808 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
807 self._decompressor = dctx.decompressobj()
809 self._decompressor = dctx.decompressobj()
808
810
809 def decode(self, data):
811 def decode(self, data):
810 return self._decompressor.decompress(data)
812 return self._decompressor.decompress(data)
811
813
812
814
813 class zstd8mbdecoder(zstdbasedecoder):
815 class zstd8mbdecoder(zstdbasedecoder):
814 def __init__(self, ui, extraobjs):
816 def __init__(self, ui, extraobjs):
815 if extraobjs:
817 if extraobjs:
816 raise error.Abort(
818 raise error.Abort(
817 _(b'zstd8mb decoder received unexpected additional values')
819 _(b'zstd8mb decoder received unexpected additional values')
818 )
820 )
819
821
820 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
822 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
821
823
822
824
823 # We lazily populate this to avoid excessive module imports when importing
825 # We lazily populate this to avoid excessive module imports when importing
824 # this module.
826 # this module.
825 STREAM_ENCODERS = {}
827 STREAM_ENCODERS = {}
826 STREAM_ENCODERS_ORDER = []
828 STREAM_ENCODERS_ORDER = []
827
829
828
830
829 def populatestreamencoders():
831 def populatestreamencoders():
830 if STREAM_ENCODERS:
832 if STREAM_ENCODERS:
831 return
833 return
832
834
833 try:
835 try:
834 from . import zstd
836 from . import zstd # pytype: disable=import-error
835
837
836 zstd.__version__
838 zstd.__version__
837 except ImportError:
839 except ImportError:
838 zstd = None
840 zstd = None
839
841
840 # zstandard is fastest and is preferred.
842 # zstandard is fastest and is preferred.
841 if zstd:
843 if zstd:
842 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
844 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
843 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
845 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
844
846
845 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
847 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
846 STREAM_ENCODERS_ORDER.append(b'zlib')
848 STREAM_ENCODERS_ORDER.append(b'zlib')
847
849
848 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
850 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
849 STREAM_ENCODERS_ORDER.append(b'identity')
851 STREAM_ENCODERS_ORDER.append(b'identity')
850
852
851
853
852 class stream:
854 class stream:
853 """Represents a logical unidirectional series of frames."""
855 """Represents a logical unidirectional series of frames."""
854
856
855 def __init__(self, streamid, active=False):
857 def __init__(self, streamid, active=False):
856 self.streamid = streamid
858 self.streamid = streamid
857 self._active = active
859 self._active = active
858
860
859 def makeframe(self, requestid, typeid, flags, payload):
861 def makeframe(self, requestid, typeid, flags, payload):
860 """Create a frame to be sent out over this stream.
862 """Create a frame to be sent out over this stream.
861
863
862 Only returns the frame instance. Does not actually send it.
864 Only returns the frame instance. Does not actually send it.
863 """
865 """
864 streamflags = 0
866 streamflags = 0
865 if not self._active:
867 if not self._active:
866 streamflags |= STREAM_FLAG_BEGIN_STREAM
868 streamflags |= STREAM_FLAG_BEGIN_STREAM
867 self._active = True
869 self._active = True
868
870
869 return makeframe(
871 return makeframe(
870 requestid, self.streamid, streamflags, typeid, flags, payload
872 requestid, self.streamid, streamflags, typeid, flags, payload
871 )
873 )
872
874
873
875
874 class inputstream(stream):
876 class inputstream(stream):
875 """Represents a stream used for receiving data."""
877 """Represents a stream used for receiving data."""
876
878
877 def __init__(self, streamid, active=False):
879 def __init__(self, streamid, active=False):
878 super(inputstream, self).__init__(streamid, active=active)
880 super(inputstream, self).__init__(streamid, active=active)
879 self._decoder = None
881 self._decoder = None
880
882
881 def setdecoder(self, ui, name, extraobjs):
883 def setdecoder(self, ui, name, extraobjs):
882 """Set the decoder for this stream.
884 """Set the decoder for this stream.
883
885
884 Receives the stream profile name and any additional CBOR objects
886 Receives the stream profile name and any additional CBOR objects
885 decoded from the stream encoding settings frame payloads.
887 decoded from the stream encoding settings frame payloads.
886 """
888 """
887 if name not in STREAM_ENCODERS:
889 if name not in STREAM_ENCODERS:
888 raise error.Abort(_(b'unknown stream decoder: %s') % name)
890 raise error.Abort(_(b'unknown stream decoder: %s') % name)
889
891
890 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
892 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
891
893
892 def decode(self, data):
894 def decode(self, data):
893 # Default is identity decoder. We don't bother instantiating one
895 # Default is identity decoder. We don't bother instantiating one
894 # because it is trivial.
896 # because it is trivial.
895 if not self._decoder:
897 if not self._decoder:
896 return data
898 return data
897
899
898 return self._decoder.decode(data)
900 return self._decoder.decode(data)
899
901
900 def flush(self):
902 def flush(self):
901 if not self._decoder:
903 if not self._decoder:
902 return b''
904 return b''
903
905
904 return self._decoder.flush()
906 return self._decoder.flush()
905
907
906
908
907 class outputstream(stream):
909 class outputstream(stream):
908 """Represents a stream used for sending data."""
910 """Represents a stream used for sending data."""
909
911
910 def __init__(self, streamid, active=False):
912 def __init__(self, streamid, active=False):
911 super(outputstream, self).__init__(streamid, active=active)
913 super(outputstream, self).__init__(streamid, active=active)
912 self.streamsettingssent = False
914 self.streamsettingssent = False
913 self._encoder = None
915 self._encoder = None
914 self._encodername = None
916 self._encodername = None
915
917
916 def setencoder(self, ui, name):
918 def setencoder(self, ui, name):
917 """Set the encoder for this stream.
919 """Set the encoder for this stream.
918
920
919 Receives the stream profile name.
921 Receives the stream profile name.
920 """
922 """
921 if name not in STREAM_ENCODERS:
923 if name not in STREAM_ENCODERS:
922 raise error.Abort(_(b'unknown stream encoder: %s') % name)
924 raise error.Abort(_(b'unknown stream encoder: %s') % name)
923
925
924 self._encoder = STREAM_ENCODERS[name][0](ui)
926 self._encoder = STREAM_ENCODERS[name][0](ui)
925 self._encodername = name
927 self._encodername = name
926
928
927 def encode(self, data):
929 def encode(self, data):
928 if not self._encoder:
930 if not self._encoder:
929 return data
931 return data
930
932
931 return self._encoder.encode(data)
933 return self._encoder.encode(data)
932
934
933 def flush(self):
935 def flush(self):
934 if not self._encoder:
936 if not self._encoder:
935 return b''
937 return b''
936
938
937 return self._encoder.flush()
939 return self._encoder.flush()
938
940
939 def finish(self):
941 def finish(self):
940 if not self._encoder:
942 if not self._encoder:
941 return b''
943 return b''
942
944
943 self._encoder.finish()
945 self._encoder.finish()
944
946
945 def makeframe(self, requestid, typeid, flags, payload, encoded=False):
947 def makeframe(self, requestid, typeid, flags, payload, encoded=False):
946 """Create a frame to be sent out over this stream.
948 """Create a frame to be sent out over this stream.
947
949
948 Only returns the frame instance. Does not actually send it.
950 Only returns the frame instance. Does not actually send it.
949 """
951 """
950 streamflags = 0
952 streamflags = 0
951 if not self._active:
953 if not self._active:
952 streamflags |= STREAM_FLAG_BEGIN_STREAM
954 streamflags |= STREAM_FLAG_BEGIN_STREAM
953 self._active = True
955 self._active = True
954
956
955 if encoded:
957 if encoded:
956 if not self.streamsettingssent:
958 if not self.streamsettingssent:
957 raise error.ProgrammingError(
959 raise error.ProgrammingError(
958 b'attempting to send encoded frame without sending stream '
960 b'attempting to send encoded frame without sending stream '
959 b'settings'
961 b'settings'
960 )
962 )
961
963
962 streamflags |= STREAM_FLAG_ENCODING_APPLIED
964 streamflags |= STREAM_FLAG_ENCODING_APPLIED
963
965
964 if (
966 if (
965 typeid == FRAME_TYPE_STREAM_SETTINGS
967 typeid == FRAME_TYPE_STREAM_SETTINGS
966 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
968 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
967 ):
969 ):
968 self.streamsettingssent = True
970 self.streamsettingssent = True
969
971
970 return makeframe(
972 return makeframe(
971 requestid, self.streamid, streamflags, typeid, flags, payload
973 requestid, self.streamid, streamflags, typeid, flags, payload
972 )
974 )
973
975
974 def makestreamsettingsframe(self, requestid):
976 def makestreamsettingsframe(self, requestid):
975 """Create a stream settings frame for this stream.
977 """Create a stream settings frame for this stream.
976
978
977 Returns frame data or None if no stream settings frame is needed or has
979 Returns frame data or None if no stream settings frame is needed or has
978 already been sent.
980 already been sent.
979 """
981 """
980 if not self._encoder or self.streamsettingssent:
982 if not self._encoder or self.streamsettingssent:
981 return None
983 return None
982
984
983 payload = b''.join(cborutil.streamencode(self._encodername))
985 payload = b''.join(cborutil.streamencode(self._encodername))
984 return self.makeframe(
986 return self.makeframe(
985 requestid,
987 requestid,
986 FRAME_TYPE_STREAM_SETTINGS,
988 FRAME_TYPE_STREAM_SETTINGS,
987 FLAG_STREAM_ENCODING_SETTINGS_EOS,
989 FLAG_STREAM_ENCODING_SETTINGS_EOS,
988 payload,
990 payload,
989 )
991 )
990
992
991
993
992 def ensureserverstream(stream):
994 def ensureserverstream(stream):
993 if stream.streamid % 2:
995 if stream.streamid % 2:
994 raise error.ProgrammingError(
996 raise error.ProgrammingError(
995 b'server should only write to even '
997 b'server should only write to even '
996 b'numbered streams; %d is not even' % stream.streamid
998 b'numbered streams; %d is not even' % stream.streamid
997 )
999 )
998
1000
999
1001
1000 DEFAULT_PROTOCOL_SETTINGS = {
1002 DEFAULT_PROTOCOL_SETTINGS = {
1001 b'contentencodings': [b'identity'],
1003 b'contentencodings': [b'identity'],
1002 }
1004 }
1003
1005
1004
1006
1005 class serverreactor:
1007 class serverreactor:
1006 """Holds state of a server handling frame-based protocol requests.
1008 """Holds state of a server handling frame-based protocol requests.
1007
1009
1008 This class is the "brain" of the unified frame-based protocol server
1010 This class is the "brain" of the unified frame-based protocol server
1009 component. While the protocol is stateless from the perspective of
1011 component. While the protocol is stateless from the perspective of
1010 requests/commands, something needs to track which frames have been
1012 requests/commands, something needs to track which frames have been
1011 received, what frames to expect, etc. This class is that thing.
1013 received, what frames to expect, etc. This class is that thing.
1012
1014
1013 Instances are modeled as a state machine of sorts. Instances are also
1015 Instances are modeled as a state machine of sorts. Instances are also
1014 reactionary to external events. The point of this class is to encapsulate
1016 reactionary to external events. The point of this class is to encapsulate
1015 the state of the connection and the exchange of frames, not to perform
1017 the state of the connection and the exchange of frames, not to perform
1016 work. Instead, callers tell this class when something occurs, like a
1018 work. Instead, callers tell this class when something occurs, like a
1017 frame arriving. If that activity is worthy of a follow-up action (say
1019 frame arriving. If that activity is worthy of a follow-up action (say
1018 *run a command*), the return value of that handler will say so.
1020 *run a command*), the return value of that handler will say so.
1019
1021
1020 I/O and CPU intensive operations are purposefully delegated outside of
1022 I/O and CPU intensive operations are purposefully delegated outside of
1021 this class.
1023 this class.
1022
1024
1023 Consumers are expected to tell instances when events occur. They do so by
1025 Consumers are expected to tell instances when events occur. They do so by
1024 calling the various ``on*`` methods. These methods return a 2-tuple
1026 calling the various ``on*`` methods. These methods return a 2-tuple
1025 describing any follow-up action(s) to take. The first element is the
1027 describing any follow-up action(s) to take. The first element is the
1026 name of an action to perform. The second is a data structure (usually
1028 name of an action to perform. The second is a data structure (usually
1027 a dict) specific to that action that contains more information. e.g.
1029 a dict) specific to that action that contains more information. e.g.
1028 if the server wants to send frames back to the client, the data structure
1030 if the server wants to send frames back to the client, the data structure
1029 will contain a reference to those frames.
1031 will contain a reference to those frames.
1030
1032
1031 Valid actions that consumers can be instructed to take are:
1033 Valid actions that consumers can be instructed to take are:
1032
1034
1033 sendframes
1035 sendframes
1034 Indicates that frames should be sent to the client. The ``framegen``
1036 Indicates that frames should be sent to the client. The ``framegen``
1035 key contains a generator of frames that should be sent. The server
1037 key contains a generator of frames that should be sent. The server
1036 assumes that all frames are sent to the client.
1038 assumes that all frames are sent to the client.
1037
1039
1038 error
1040 error
1039 Indicates that an error occurred. Consumer should probably abort.
1041 Indicates that an error occurred. Consumer should probably abort.
1040
1042
1041 runcommand
1043 runcommand
1042 Indicates that the consumer should run a wire protocol command. Details
1044 Indicates that the consumer should run a wire protocol command. Details
1043 of the command to run are given in the data structure.
1045 of the command to run are given in the data structure.
1044
1046
1045 wantframe
1047 wantframe
1046 Indicates that nothing of interest happened and the server is waiting on
1048 Indicates that nothing of interest happened and the server is waiting on
1047 more frames from the client before anything interesting can be done.
1049 more frames from the client before anything interesting can be done.
1048
1050
1049 noop
1051 noop
1050 Indicates no additional action is required.
1052 Indicates no additional action is required.
1051
1053
1052 Known Issues
1054 Known Issues
1053 ------------
1055 ------------
1054
1056
1055 There are no limits to the number of partially received commands or their
1057 There are no limits to the number of partially received commands or their
1056 size. A malicious client could stream command request data and exhaust the
1058 size. A malicious client could stream command request data and exhaust the
1057 server's memory.
1059 server's memory.
1058
1060
1059 Partially received commands are not acted upon when end of input is
1061 Partially received commands are not acted upon when end of input is
1060 reached. Should the server error if it receives a partial request?
1062 reached. Should the server error if it receives a partial request?
1061 Should the client send a message to abort a partially transmitted request
1063 Should the client send a message to abort a partially transmitted request
1062 to facilitate graceful shutdown?
1064 to facilitate graceful shutdown?
1063
1065
1064 Active requests that haven't been responded to aren't tracked. This means
1066 Active requests that haven't been responded to aren't tracked. This means
1065 that if we receive a command and instruct its dispatch, another command
1067 that if we receive a command and instruct its dispatch, another command
1066 with its request ID can come in over the wire and there will be a race
1068 with its request ID can come in over the wire and there will be a race
1067 between who responds to what.
1069 between who responds to what.
1068 """
1070 """
1069
1071
1070 def __init__(self, ui, deferoutput=False):
1072 def __init__(self, ui, deferoutput=False):
1071 """Construct a new server reactor.
1073 """Construct a new server reactor.
1072
1074
1073 ``deferoutput`` can be used to indicate that no output frames should be
1075 ``deferoutput`` can be used to indicate that no output frames should be
1074 instructed to be sent until input has been exhausted. In this mode,
1076 instructed to be sent until input has been exhausted. In this mode,
1075 events that would normally generate output frames (such as a command
1077 events that would normally generate output frames (such as a command
1076 response being ready) will instead defer instructing the consumer to
1078 response being ready) will instead defer instructing the consumer to
1077 send those frames. This is useful for half-duplex transports where the
1079 send those frames. This is useful for half-duplex transports where the
1078 sender cannot receive until all data has been transmitted.
1080 sender cannot receive until all data has been transmitted.
1079 """
1081 """
1080 self._ui = ui
1082 self._ui = ui
1081 self._deferoutput = deferoutput
1083 self._deferoutput = deferoutput
1082 self._state = b'initial'
1084 self._state = b'initial'
1083 self._nextoutgoingstreamid = 2
1085 self._nextoutgoingstreamid = 2
1084 self._bufferedframegens = []
1086 self._bufferedframegens = []
1085 # stream id -> stream instance for all active streams from the client.
1087 # stream id -> stream instance for all active streams from the client.
1086 self._incomingstreams = {}
1088 self._incomingstreams = {}
1087 self._outgoingstreams = {}
1089 self._outgoingstreams = {}
1088 # request id -> dict of commands that are actively being received.
1090 # request id -> dict of commands that are actively being received.
1089 self._receivingcommands = {}
1091 self._receivingcommands = {}
1090 # Request IDs that have been received and are actively being processed.
1092 # Request IDs that have been received and are actively being processed.
1091 # Once all output for a request has been sent, it is removed from this
1093 # Once all output for a request has been sent, it is removed from this
1092 # set.
1094 # set.
1093 self._activecommands = set()
1095 self._activecommands = set()
1094
1096
1095 self._protocolsettingsdecoder = None
1097 self._protocolsettingsdecoder = None
1096
1098
1097 # Sender protocol settings are optional. Set implied default values.
1099 # Sender protocol settings are optional. Set implied default values.
1098 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
1100 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
1099
1101
1100 populatestreamencoders()
1102 populatestreamencoders()
1101
1103
1102 def onframerecv(self, frame):
1104 def onframerecv(self, frame):
1103 """Process a frame that has been received off the wire.
1105 """Process a frame that has been received off the wire.
1104
1106
1105 Returns a dict with an ``action`` key that details what action,
1107 Returns a dict with an ``action`` key that details what action,
1106 if any, the consumer should take next.
1108 if any, the consumer should take next.
1107 """
1109 """
1108 if not frame.streamid % 2:
1110 if not frame.streamid % 2:
1109 self._state = b'errored'
1111 self._state = b'errored'
1110 return self._makeerrorresult(
1112 return self._makeerrorresult(
1111 _(b'received frame with even numbered stream ID: %d')
1113 _(b'received frame with even numbered stream ID: %d')
1112 % frame.streamid
1114 % frame.streamid
1113 )
1115 )
1114
1116
1115 if frame.streamid not in self._incomingstreams:
1117 if frame.streamid not in self._incomingstreams:
1116 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1118 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1117 self._state = b'errored'
1119 self._state = b'errored'
1118 return self._makeerrorresult(
1120 return self._makeerrorresult(
1119 _(
1121 _(
1120 b'received frame on unknown inactive stream without '
1122 b'received frame on unknown inactive stream without '
1121 b'beginning of stream flag set'
1123 b'beginning of stream flag set'
1122 )
1124 )
1123 )
1125 )
1124
1126
1125 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1127 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1126
1128
1127 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1129 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1128 # TODO handle decoding frames
1130 # TODO handle decoding frames
1129 self._state = b'errored'
1131 self._state = b'errored'
1130 raise error.ProgrammingError(
1132 raise error.ProgrammingError(
1131 b'support for decoding stream payloads not yet implemented'
1133 b'support for decoding stream payloads not yet implemented'
1132 )
1134 )
1133
1135
1134 if frame.streamflags & STREAM_FLAG_END_STREAM:
1136 if frame.streamflags & STREAM_FLAG_END_STREAM:
1135 del self._incomingstreams[frame.streamid]
1137 del self._incomingstreams[frame.streamid]
1136
1138
1137 handlers = {
1139 handlers = {
1138 b'initial': self._onframeinitial,
1140 b'initial': self._onframeinitial,
1139 b'protocol-settings-receiving': self._onframeprotocolsettings,
1141 b'protocol-settings-receiving': self._onframeprotocolsettings,
1140 b'idle': self._onframeidle,
1142 b'idle': self._onframeidle,
1141 b'command-receiving': self._onframecommandreceiving,
1143 b'command-receiving': self._onframecommandreceiving,
1142 b'errored': self._onframeerrored,
1144 b'errored': self._onframeerrored,
1143 }
1145 }
1144
1146
1145 meth = handlers.get(self._state)
1147 meth = handlers.get(self._state)
1146 if not meth:
1148 if not meth:
1147 raise error.ProgrammingError(b'unhandled state: %s' % self._state)
1149 raise error.ProgrammingError(b'unhandled state: %s' % self._state)
1148
1150
1149 return meth(frame)
1151 return meth(frame)
1150
1152
1151 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1153 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1152 """Signal that objects are ready to be sent to the client.
1154 """Signal that objects are ready to be sent to the client.
1153
1155
1154 ``objs`` is an iterable of objects (typically a generator) that will
1156 ``objs`` is an iterable of objects (typically a generator) that will
1155 be encoded via CBOR and added to frames, which will be sent to the
1157 be encoded via CBOR and added to frames, which will be sent to the
1156 client.
1158 client.
1157 """
1159 """
1158 ensureserverstream(stream)
1160 ensureserverstream(stream)
1159
1161
1160 # A more robust solution would be to check for objs.{next,__next__}.
1162 # A more robust solution would be to check for objs.{next,__next__}.
1161 if isinstance(objs, list):
1163 if isinstance(objs, list):
1162 objs = iter(objs)
1164 objs = iter(objs)
1163
1165
1164 # We need to take care over exception handling. Uncaught exceptions
1166 # We need to take care over exception handling. Uncaught exceptions
1165 # when generating frames could lead to premature end of the frame
1167 # when generating frames could lead to premature end of the frame
1166 # stream and the possibility of the server or client process getting
1168 # stream and the possibility of the server or client process getting
1167 # in a bad state.
1169 # in a bad state.
1168 #
1170 #
1169 # Keep in mind that if ``objs`` is a generator, advancing it could
1171 # Keep in mind that if ``objs`` is a generator, advancing it could
1170 # raise exceptions that originated in e.g. wire protocol command
1172 # raise exceptions that originated in e.g. wire protocol command
1171 # functions. That is why we differentiate between exceptions raised
1173 # functions. That is why we differentiate between exceptions raised
1172 # when iterating versus other exceptions that occur.
1174 # when iterating versus other exceptions that occur.
1173 #
1175 #
1174 # In all cases, when the function finishes, the request is fully
1176 # In all cases, when the function finishes, the request is fully
1175 # handled and no new frames for it should be seen.
1177 # handled and no new frames for it should be seen.
1176
1178
1177 def sendframes():
1179 def sendframes():
1178 emitted = False
1180 emitted = False
1179 alternatelocationsent = False
1181 alternatelocationsent = False
1180 emitter = bufferingcommandresponseemitter(stream, requestid)
1182 emitter = bufferingcommandresponseemitter(stream, requestid)
1181 while True:
1183 while True:
1182 try:
1184 try:
1183 o = next(objs)
1185 o = next(objs)
1184 except StopIteration:
1186 except StopIteration:
1185 for frame in emitter.send(None):
1187 for frame in emitter.send(None):
1186 yield frame
1188 yield frame
1187
1189
1188 if emitted:
1190 if emitted:
1189 for frame in createcommandresponseeosframes(
1191 for frame in createcommandresponseeosframes(
1190 stream, requestid
1192 stream, requestid
1191 ):
1193 ):
1192 yield frame
1194 yield frame
1193 break
1195 break
1194
1196
1195 except error.WireprotoCommandError as e:
1197 except error.WireprotoCommandError as e:
1196 for frame in createcommanderrorresponse(
1198 for frame in createcommanderrorresponse(
1197 stream, requestid, e.message, e.messageargs
1199 stream, requestid, e.message, e.messageargs
1198 ):
1200 ):
1199 yield frame
1201 yield frame
1200 break
1202 break
1201
1203
1202 except Exception as e:
1204 except Exception as e:
1203 for frame in createerrorframe(
1205 for frame in createerrorframe(
1204 stream,
1206 stream,
1205 requestid,
1207 requestid,
1206 b'%s' % stringutil.forcebytestr(e),
1208 b'%s' % stringutil.forcebytestr(e),
1207 errtype=b'server',
1209 errtype=b'server',
1208 ):
1210 ):
1209 yield frame
1211 yield frame
1210
1212
1211 break
1213 break
1212
1214
1213 try:
1215 try:
1214 # Alternate location responses can only be the first and
1216 # Alternate location responses can only be the first and
1215 # only object in the output stream.
1217 # only object in the output stream.
1216 if isinstance(o, wireprototypes.alternatelocationresponse):
1218 if isinstance(o, wireprototypes.alternatelocationresponse):
1217 if emitted:
1219 if emitted:
1218 raise error.ProgrammingError(
1220 raise error.ProgrammingError(
1219 b'alternatelocationresponse seen after initial '
1221 b'alternatelocationresponse seen after initial '
1220 b'output object'
1222 b'output object'
1221 )
1223 )
1222
1224
1223 frame = stream.makestreamsettingsframe(requestid)
1225 frame = stream.makestreamsettingsframe(requestid)
1224 if frame:
1226 if frame:
1225 yield frame
1227 yield frame
1226
1228
1227 yield createalternatelocationresponseframe(
1229 yield createalternatelocationresponseframe(
1228 stream, requestid, o
1230 stream, requestid, o
1229 )
1231 )
1230
1232
1231 alternatelocationsent = True
1233 alternatelocationsent = True
1232 emitted = True
1234 emitted = True
1233 continue
1235 continue
1234
1236
1235 if alternatelocationsent:
1237 if alternatelocationsent:
1236 raise error.ProgrammingError(
1238 raise error.ProgrammingError(
1237 b'object follows alternatelocationresponse'
1239 b'object follows alternatelocationresponse'
1238 )
1240 )
1239
1241
1240 if not emitted:
1242 if not emitted:
1241 # Frame is optional.
1243 # Frame is optional.
1242 frame = stream.makestreamsettingsframe(requestid)
1244 frame = stream.makestreamsettingsframe(requestid)
1243 if frame:
1245 if frame:
1244 yield frame
1246 yield frame
1245
1247
1246 # May be None if empty frame (due to encoding).
1248 # May be None if empty frame (due to encoding).
1247 frame = createcommandresponseokframe(stream, requestid)
1249 frame = createcommandresponseokframe(stream, requestid)
1248 if frame:
1250 if frame:
1249 yield frame
1251 yield frame
1250
1252
1251 emitted = True
1253 emitted = True
1252
1254
1253 # Objects emitted by command functions can be serializable
1255 # Objects emitted by command functions can be serializable
1254 # data structures or special types.
1256 # data structures or special types.
1255 # TODO consider extracting the content normalization to a
1257 # TODO consider extracting the content normalization to a
1256 # standalone function, as it may be useful for e.g. cachers.
1258 # standalone function, as it may be useful for e.g. cachers.
1257
1259
1258 # A pre-encoded object is sent directly to the emitter.
1260 # A pre-encoded object is sent directly to the emitter.
1259 if isinstance(o, wireprototypes.encodedresponse):
1261 if isinstance(o, wireprototypes.encodedresponse):
1260 for frame in emitter.send(o.data):
1262 for frame in emitter.send(o.data):
1261 yield frame
1263 yield frame
1262
1264
1263 elif isinstance(
1265 elif isinstance(
1264 o, wireprototypes.indefinitebytestringresponse
1266 o, wireprototypes.indefinitebytestringresponse
1265 ):
1267 ):
1266 for chunk in cborutil.streamencodebytestringfromiter(
1268 for chunk in cborutil.streamencodebytestringfromiter(
1267 o.chunks
1269 o.chunks
1268 ):
1270 ):
1269 for frame in emitter.send(chunk):
1271 for frame in emitter.send(chunk):
1270 yield frame
1272 yield frame
1271
1273
1272 # A regular object is CBOR encoded.
1274 # A regular object is CBOR encoded.
1273 else:
1275 else:
1274 for chunk in cborutil.streamencode(o):
1276 for chunk in cborutil.streamencode(o):
1275 for frame in emitter.send(chunk):
1277 for frame in emitter.send(chunk):
1276 yield frame
1278 yield frame
1277
1279
1278 except Exception as e:
1280 except Exception as e:
1279 for frame in createerrorframe(
1281 for frame in createerrorframe(
1280 stream, requestid, b'%s' % e, errtype=b'server'
1282 stream, requestid, b'%s' % e, errtype=b'server'
1281 ):
1283 ):
1282 yield frame
1284 yield frame
1283
1285
1284 break
1286 break
1285
1287
1286 self._activecommands.remove(requestid)
1288 self._activecommands.remove(requestid)
1287
1289
1288 return self._handlesendframes(sendframes())
1290 return self._handlesendframes(sendframes())
1289
1291
1290 def oninputeof(self):
1292 def oninputeof(self):
1291 """Signals that end of input has been received.
1293 """Signals that end of input has been received.
1292
1294
1293 No more frames will be received. All pending activity should be
1295 No more frames will be received. All pending activity should be
1294 completed.
1296 completed.
1295 """
1297 """
1296 # TODO should we do anything about in-flight commands?
1298 # TODO should we do anything about in-flight commands?
1297
1299
1298 if not self._deferoutput or not self._bufferedframegens:
1300 if not self._deferoutput or not self._bufferedframegens:
1299 return b'noop', {}
1301 return b'noop', {}
1300
1302
1301 # If we buffered all our responses, emit those.
1303 # If we buffered all our responses, emit those.
1302 def makegen():
1304 def makegen():
1303 for gen in self._bufferedframegens:
1305 for gen in self._bufferedframegens:
1304 for frame in gen:
1306 for frame in gen:
1305 yield frame
1307 yield frame
1306
1308
1307 return b'sendframes', {
1309 return b'sendframes', {
1308 b'framegen': makegen(),
1310 b'framegen': makegen(),
1309 }
1311 }
1310
1312
1311 def _handlesendframes(self, framegen):
1313 def _handlesendframes(self, framegen):
1312 if self._deferoutput:
1314 if self._deferoutput:
1313 self._bufferedframegens.append(framegen)
1315 self._bufferedframegens.append(framegen)
1314 return b'noop', {}
1316 return b'noop', {}
1315 else:
1317 else:
1316 return b'sendframes', {
1318 return b'sendframes', {
1317 b'framegen': framegen,
1319 b'framegen': framegen,
1318 }
1320 }
1319
1321
1320 def onservererror(self, stream, requestid, msg):
1322 def onservererror(self, stream, requestid, msg):
1321 ensureserverstream(stream)
1323 ensureserverstream(stream)
1322
1324
1323 def sendframes():
1325 def sendframes():
1324 for frame in createerrorframe(
1326 for frame in createerrorframe(
1325 stream, requestid, msg, errtype=b'server'
1327 stream, requestid, msg, errtype=b'server'
1326 ):
1328 ):
1327 yield frame
1329 yield frame
1328
1330
1329 self._activecommands.remove(requestid)
1331 self._activecommands.remove(requestid)
1330
1332
1331 return self._handlesendframes(sendframes())
1333 return self._handlesendframes(sendframes())
1332
1334
1333 def oncommanderror(self, stream, requestid, message, args=None):
1335 def oncommanderror(self, stream, requestid, message, args=None):
1334 """Called when a command encountered an error before sending output."""
1336 """Called when a command encountered an error before sending output."""
1335 ensureserverstream(stream)
1337 ensureserverstream(stream)
1336
1338
1337 def sendframes():
1339 def sendframes():
1338 for frame in createcommanderrorresponse(
1340 for frame in createcommanderrorresponse(
1339 stream, requestid, message, args
1341 stream, requestid, message, args
1340 ):
1342 ):
1341 yield frame
1343 yield frame
1342
1344
1343 self._activecommands.remove(requestid)
1345 self._activecommands.remove(requestid)
1344
1346
1345 return self._handlesendframes(sendframes())
1347 return self._handlesendframes(sendframes())
1346
1348
1347 def makeoutputstream(self):
1349 def makeoutputstream(self):
1348 """Create a stream to be used for sending data to the client.
1350 """Create a stream to be used for sending data to the client.
1349
1351
1350 If this is called before protocol settings frames are received, we
1352 If this is called before protocol settings frames are received, we
1351 don't know what stream encodings are supported by the client and
1353 don't know what stream encodings are supported by the client and
1352 we will default to identity.
1354 we will default to identity.
1353 """
1355 """
1354 streamid = self._nextoutgoingstreamid
1356 streamid = self._nextoutgoingstreamid
1355 self._nextoutgoingstreamid += 2
1357 self._nextoutgoingstreamid += 2
1356
1358
1357 s = outputstream(streamid)
1359 s = outputstream(streamid)
1358 self._outgoingstreams[streamid] = s
1360 self._outgoingstreams[streamid] = s
1359
1361
1360 # Always use the *server's* preferred encoder over the client's,
1362 # Always use the *server's* preferred encoder over the client's,
1361 # as servers have more to lose from sub-optimal encoders being used.
1363 # as servers have more to lose from sub-optimal encoders being used.
1362 for name in STREAM_ENCODERS_ORDER:
1364 for name in STREAM_ENCODERS_ORDER:
1363 if name in self._sendersettings[b'contentencodings']:
1365 if name in self._sendersettings[b'contentencodings']:
1364 s.setencoder(self._ui, name)
1366 s.setencoder(self._ui, name)
1365 break
1367 break
1366
1368
1367 return s
1369 return s
1368
1370
1369 def _makeerrorresult(self, msg):
1371 def _makeerrorresult(self, msg):
1370 return b'error', {
1372 return b'error', {
1371 b'message': msg,
1373 b'message': msg,
1372 }
1374 }
1373
1375
1374 def _makeruncommandresult(self, requestid):
1376 def _makeruncommandresult(self, requestid):
1375 entry = self._receivingcommands[requestid]
1377 entry = self._receivingcommands[requestid]
1376
1378
1377 if not entry[b'requestdone']:
1379 if not entry[b'requestdone']:
1378 self._state = b'errored'
1380 self._state = b'errored'
1379 raise error.ProgrammingError(
1381 raise error.ProgrammingError(
1380 b'should not be called without requestdone set'
1382 b'should not be called without requestdone set'
1381 )
1383 )
1382
1384
1383 del self._receivingcommands[requestid]
1385 del self._receivingcommands[requestid]
1384
1386
1385 if self._receivingcommands:
1387 if self._receivingcommands:
1386 self._state = b'command-receiving'
1388 self._state = b'command-receiving'
1387 else:
1389 else:
1388 self._state = b'idle'
1390 self._state = b'idle'
1389
1391
1390 # Decode the payloads as CBOR.
1392 # Decode the payloads as CBOR.
1391 entry[b'payload'].seek(0)
1393 entry[b'payload'].seek(0)
1392 request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
1394 request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
1393
1395
1394 if b'name' not in request:
1396 if b'name' not in request:
1395 self._state = b'errored'
1397 self._state = b'errored'
1396 return self._makeerrorresult(
1398 return self._makeerrorresult(
1397 _(b'command request missing "name" field')
1399 _(b'command request missing "name" field')
1398 )
1400 )
1399
1401
1400 if b'args' not in request:
1402 if b'args' not in request:
1401 request[b'args'] = {}
1403 request[b'args'] = {}
1402
1404
1403 assert requestid not in self._activecommands
1405 assert requestid not in self._activecommands
1404 self._activecommands.add(requestid)
1406 self._activecommands.add(requestid)
1405
1407
1406 return (
1408 return (
1407 b'runcommand',
1409 b'runcommand',
1408 {
1410 {
1409 b'requestid': requestid,
1411 b'requestid': requestid,
1410 b'command': request[b'name'],
1412 b'command': request[b'name'],
1411 b'args': request[b'args'],
1413 b'args': request[b'args'],
1412 b'redirect': request.get(b'redirect'),
1414 b'redirect': request.get(b'redirect'),
1413 b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
1415 b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
1414 },
1416 },
1415 )
1417 )
1416
1418
1417 def _makewantframeresult(self):
1419 def _makewantframeresult(self):
1418 return b'wantframe', {
1420 return b'wantframe', {
1419 b'state': self._state,
1421 b'state': self._state,
1420 }
1422 }
1421
1423
1422 def _validatecommandrequestframe(self, frame):
1424 def _validatecommandrequestframe(self, frame):
1423 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1425 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1424 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1426 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1425
1427
1426 if new and continuation:
1428 if new and continuation:
1427 self._state = b'errored'
1429 self._state = b'errored'
1428 return self._makeerrorresult(
1430 return self._makeerrorresult(
1429 _(
1431 _(
1430 b'received command request frame with both new and '
1432 b'received command request frame with both new and '
1431 b'continuation flags set'
1433 b'continuation flags set'
1432 )
1434 )
1433 )
1435 )
1434
1436
1435 if not new and not continuation:
1437 if not new and not continuation:
1436 self._state = b'errored'
1438 self._state = b'errored'
1437 return self._makeerrorresult(
1439 return self._makeerrorresult(
1438 _(
1440 _(
1439 b'received command request frame with neither new nor '
1441 b'received command request frame with neither new nor '
1440 b'continuation flags set'
1442 b'continuation flags set'
1441 )
1443 )
1442 )
1444 )
1443
1445
1444 def _onframeinitial(self, frame):
1446 def _onframeinitial(self, frame):
1445 # Called when we receive a frame when in the "initial" state.
1447 # Called when we receive a frame when in the "initial" state.
1446 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1448 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1447 self._state = b'protocol-settings-receiving'
1449 self._state = b'protocol-settings-receiving'
1448 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1450 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1449 return self._onframeprotocolsettings(frame)
1451 return self._onframeprotocolsettings(frame)
1450
1452
1451 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1453 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1452 self._state = b'idle'
1454 self._state = b'idle'
1453 return self._onframeidle(frame)
1455 return self._onframeidle(frame)
1454
1456
1455 else:
1457 else:
1456 self._state = b'errored'
1458 self._state = b'errored'
1457 return self._makeerrorresult(
1459 return self._makeerrorresult(
1458 _(
1460 _(
1459 b'expected sender protocol settings or command request '
1461 b'expected sender protocol settings or command request '
1460 b'frame; got %d'
1462 b'frame; got %d'
1461 )
1463 )
1462 % frame.typeid
1464 % frame.typeid
1463 )
1465 )
1464
1466
1465 def _onframeprotocolsettings(self, frame):
1467 def _onframeprotocolsettings(self, frame):
1466 assert self._state == b'protocol-settings-receiving'
1468 assert self._state == b'protocol-settings-receiving'
1467 assert self._protocolsettingsdecoder is not None
1469 assert self._protocolsettingsdecoder is not None
1468
1470
1469 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1471 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1470 self._state = b'errored'
1472 self._state = b'errored'
1471 return self._makeerrorresult(
1473 return self._makeerrorresult(
1472 _(b'expected sender protocol settings frame; got %d')
1474 _(b'expected sender protocol settings frame; got %d')
1473 % frame.typeid
1475 % frame.typeid
1474 )
1476 )
1475
1477
1476 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1478 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1477 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1479 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1478
1480
1479 if more and eos:
1481 if more and eos:
1480 self._state = b'errored'
1482 self._state = b'errored'
1481 return self._makeerrorresult(
1483 return self._makeerrorresult(
1482 _(
1484 _(
1483 b'sender protocol settings frame cannot have both '
1485 b'sender protocol settings frame cannot have both '
1484 b'continuation and end of stream flags set'
1486 b'continuation and end of stream flags set'
1485 )
1487 )
1486 )
1488 )
1487
1489
1488 if not more and not eos:
1490 if not more and not eos:
1489 self._state = b'errored'
1491 self._state = b'errored'
1490 return self._makeerrorresult(
1492 return self._makeerrorresult(
1491 _(
1493 _(
1492 b'sender protocol settings frame must have continuation or '
1494 b'sender protocol settings frame must have continuation or '
1493 b'end of stream flag set'
1495 b'end of stream flag set'
1494 )
1496 )
1495 )
1497 )
1496
1498
1497 # TODO establish limits for maximum amount of data that can be
1499 # TODO establish limits for maximum amount of data that can be
1498 # buffered.
1500 # buffered.
1499 try:
1501 try:
1500 self._protocolsettingsdecoder.decode(frame.payload)
1502 self._protocolsettingsdecoder.decode(frame.payload)
1501 except Exception as e:
1503 except Exception as e:
1502 self._state = b'errored'
1504 self._state = b'errored'
1503 return self._makeerrorresult(
1505 return self._makeerrorresult(
1504 _(
1506 _(
1505 b'error decoding CBOR from sender protocol settings frame: %s'
1507 b'error decoding CBOR from sender protocol settings frame: %s'
1506 )
1508 )
1507 % stringutil.forcebytestr(e)
1509 % stringutil.forcebytestr(e)
1508 )
1510 )
1509
1511
1510 if more:
1512 if more:
1511 return self._makewantframeresult()
1513 return self._makewantframeresult()
1512
1514
1513 assert eos
1515 assert eos
1514
1516
1515 decoded = self._protocolsettingsdecoder.getavailable()
1517 decoded = self._protocolsettingsdecoder.getavailable()
1516 self._protocolsettingsdecoder = None
1518 self._protocolsettingsdecoder = None
1517
1519
1518 if not decoded:
1520 if not decoded:
1519 self._state = b'errored'
1521 self._state = b'errored'
1520 return self._makeerrorresult(
1522 return self._makeerrorresult(
1521 _(b'sender protocol settings frame did not contain CBOR data')
1523 _(b'sender protocol settings frame did not contain CBOR data')
1522 )
1524 )
1523 elif len(decoded) > 1:
1525 elif len(decoded) > 1:
1524 self._state = b'errored'
1526 self._state = b'errored'
1525 return self._makeerrorresult(
1527 return self._makeerrorresult(
1526 _(
1528 _(
1527 b'sender protocol settings frame contained multiple CBOR '
1529 b'sender protocol settings frame contained multiple CBOR '
1528 b'values'
1530 b'values'
1529 )
1531 )
1530 )
1532 )
1531
1533
1532 d = decoded[0]
1534 d = decoded[0]
1533
1535
1534 if b'contentencodings' in d:
1536 if b'contentencodings' in d:
1535 self._sendersettings[b'contentencodings'] = d[b'contentencodings']
1537 self._sendersettings[b'contentencodings'] = d[b'contentencodings']
1536
1538
1537 self._state = b'idle'
1539 self._state = b'idle'
1538
1540
1539 return self._makewantframeresult()
1541 return self._makewantframeresult()
1540
1542
1541 def _onframeidle(self, frame):
1543 def _onframeidle(self, frame):
1542 # The only frame type that should be received in this state is a
1544 # The only frame type that should be received in this state is a
1543 # command request.
1545 # command request.
1544 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1546 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1545 self._state = b'errored'
1547 self._state = b'errored'
1546 return self._makeerrorresult(
1548 return self._makeerrorresult(
1547 _(b'expected command request frame; got %d') % frame.typeid
1549 _(b'expected command request frame; got %d') % frame.typeid
1548 )
1550 )
1549
1551
1550 res = self._validatecommandrequestframe(frame)
1552 res = self._validatecommandrequestframe(frame)
1551 if res:
1553 if res:
1552 return res
1554 return res
1553
1555
1554 if frame.requestid in self._receivingcommands:
1556 if frame.requestid in self._receivingcommands:
1555 self._state = b'errored'
1557 self._state = b'errored'
1556 return self._makeerrorresult(
1558 return self._makeerrorresult(
1557 _(b'request with ID %d already received') % frame.requestid
1559 _(b'request with ID %d already received') % frame.requestid
1558 )
1560 )
1559
1561
1560 if frame.requestid in self._activecommands:
1562 if frame.requestid in self._activecommands:
1561 self._state = b'errored'
1563 self._state = b'errored'
1562 return self._makeerrorresult(
1564 return self._makeerrorresult(
1563 _(b'request with ID %d is already active') % frame.requestid
1565 _(b'request with ID %d is already active') % frame.requestid
1564 )
1566 )
1565
1567
1566 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1568 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1567 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1569 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1568 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1570 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1569
1571
1570 if not new:
1572 if not new:
1571 self._state = b'errored'
1573 self._state = b'errored'
1572 return self._makeerrorresult(
1574 return self._makeerrorresult(
1573 _(b'received command request frame without new flag set')
1575 _(b'received command request frame without new flag set')
1574 )
1576 )
1575
1577
1576 payload = util.bytesio()
1578 payload = util.bytesio()
1577 payload.write(frame.payload)
1579 payload.write(frame.payload)
1578
1580
1579 self._receivingcommands[frame.requestid] = {
1581 self._receivingcommands[frame.requestid] = {
1580 b'payload': payload,
1582 b'payload': payload,
1581 b'data': None,
1583 b'data': None,
1582 b'requestdone': not moreframes,
1584 b'requestdone': not moreframes,
1583 b'expectingdata': bool(expectingdata),
1585 b'expectingdata': bool(expectingdata),
1584 }
1586 }
1585
1587
1586 # This is the final frame for this request. Dispatch it.
1588 # This is the final frame for this request. Dispatch it.
1587 if not moreframes and not expectingdata:
1589 if not moreframes and not expectingdata:
1588 return self._makeruncommandresult(frame.requestid)
1590 return self._makeruncommandresult(frame.requestid)
1589
1591
1590 assert moreframes or expectingdata
1592 assert moreframes or expectingdata
1591 self._state = b'command-receiving'
1593 self._state = b'command-receiving'
1592 return self._makewantframeresult()
1594 return self._makewantframeresult()
1593
1595
1594 def _onframecommandreceiving(self, frame):
1596 def _onframecommandreceiving(self, frame):
1595 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1597 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1596 # Process new command requests as such.
1598 # Process new command requests as such.
1597 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1599 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1598 return self._onframeidle(frame)
1600 return self._onframeidle(frame)
1599
1601
1600 res = self._validatecommandrequestframe(frame)
1602 res = self._validatecommandrequestframe(frame)
1601 if res:
1603 if res:
1602 return res
1604 return res
1603
1605
1604 # All other frames should be related to a command that is currently
1606 # All other frames should be related to a command that is currently
1605 # receiving but is not active.
1607 # receiving but is not active.
1606 if frame.requestid in self._activecommands:
1608 if frame.requestid in self._activecommands:
1607 self._state = b'errored'
1609 self._state = b'errored'
1608 return self._makeerrorresult(
1610 return self._makeerrorresult(
1609 _(b'received frame for request that is still active: %d')
1611 _(b'received frame for request that is still active: %d')
1610 % frame.requestid
1612 % frame.requestid
1611 )
1613 )
1612
1614
1613 if frame.requestid not in self._receivingcommands:
1615 if frame.requestid not in self._receivingcommands:
1614 self._state = b'errored'
1616 self._state = b'errored'
1615 return self._makeerrorresult(
1617 return self._makeerrorresult(
1616 _(b'received frame for request that is not receiving: %d')
1618 _(b'received frame for request that is not receiving: %d')
1617 % frame.requestid
1619 % frame.requestid
1618 )
1620 )
1619
1621
1620 entry = self._receivingcommands[frame.requestid]
1622 entry = self._receivingcommands[frame.requestid]
1621
1623
1622 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1624 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1623 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1625 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1624 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1626 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1625
1627
1626 if entry[b'requestdone']:
1628 if entry[b'requestdone']:
1627 self._state = b'errored'
1629 self._state = b'errored'
1628 return self._makeerrorresult(
1630 return self._makeerrorresult(
1629 _(
1631 _(
1630 b'received command request frame when request frames '
1632 b'received command request frame when request frames '
1631 b'were supposedly done'
1633 b'were supposedly done'
1632 )
1634 )
1633 )
1635 )
1634
1636
1635 if expectingdata != entry[b'expectingdata']:
1637 if expectingdata != entry[b'expectingdata']:
1636 self._state = b'errored'
1638 self._state = b'errored'
1637 return self._makeerrorresult(
1639 return self._makeerrorresult(
1638 _(b'mismatch between expect data flag and previous frame')
1640 _(b'mismatch between expect data flag and previous frame')
1639 )
1641 )
1640
1642
1641 entry[b'payload'].write(frame.payload)
1643 entry[b'payload'].write(frame.payload)
1642
1644
1643 if not moreframes:
1645 if not moreframes:
1644 entry[b'requestdone'] = True
1646 entry[b'requestdone'] = True
1645
1647
1646 if not moreframes and not expectingdata:
1648 if not moreframes and not expectingdata:
1647 return self._makeruncommandresult(frame.requestid)
1649 return self._makeruncommandresult(frame.requestid)
1648
1650
1649 return self._makewantframeresult()
1651 return self._makewantframeresult()
1650
1652
1651 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1653 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1652 if not entry[b'expectingdata']:
1654 if not entry[b'expectingdata']:
1653 self._state = b'errored'
1655 self._state = b'errored'
1654 return self._makeerrorresult(
1656 return self._makeerrorresult(
1655 _(
1657 _(
1656 b'received command data frame for request that is not '
1658 b'received command data frame for request that is not '
1657 b'expecting data: %d'
1659 b'expecting data: %d'
1658 )
1660 )
1659 % frame.requestid
1661 % frame.requestid
1660 )
1662 )
1661
1663
1662 if entry[b'data'] is None:
1664 if entry[b'data'] is None:
1663 entry[b'data'] = util.bytesio()
1665 entry[b'data'] = util.bytesio()
1664
1666
1665 return self._handlecommanddataframe(frame, entry)
1667 return self._handlecommanddataframe(frame, entry)
1666 else:
1668 else:
1667 self._state = b'errored'
1669 self._state = b'errored'
1668 return self._makeerrorresult(
1670 return self._makeerrorresult(
1669 _(b'received unexpected frame type: %d') % frame.typeid
1671 _(b'received unexpected frame type: %d') % frame.typeid
1670 )
1672 )
1671
1673
1672 def _handlecommanddataframe(self, frame, entry):
1674 def _handlecommanddataframe(self, frame, entry):
1673 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1675 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1674
1676
1675 # TODO support streaming data instead of buffering it.
1677 # TODO support streaming data instead of buffering it.
1676 entry[b'data'].write(frame.payload)
1678 entry[b'data'].write(frame.payload)
1677
1679
1678 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1680 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1679 return self._makewantframeresult()
1681 return self._makewantframeresult()
1680 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1682 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1681 entry[b'data'].seek(0)
1683 entry[b'data'].seek(0)
1682 return self._makeruncommandresult(frame.requestid)
1684 return self._makeruncommandresult(frame.requestid)
1683 else:
1685 else:
1684 self._state = b'errored'
1686 self._state = b'errored'
1685 return self._makeerrorresult(_(b'command data frame without flags'))
1687 return self._makeerrorresult(_(b'command data frame without flags'))
1686
1688
1687 def _onframeerrored(self, frame):
1689 def _onframeerrored(self, frame):
1688 return self._makeerrorresult(_(b'server already errored'))
1690 return self._makeerrorresult(_(b'server already errored'))
1689
1691
1690
1692
1691 class commandrequest:
1693 class commandrequest:
1692 """Represents a request to run a command."""
1694 """Represents a request to run a command."""
1693
1695
1694 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1696 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1695 self.requestid = requestid
1697 self.requestid = requestid
1696 self.name = name
1698 self.name = name
1697 self.args = args
1699 self.args = args
1698 self.datafh = datafh
1700 self.datafh = datafh
1699 self.redirect = redirect
1701 self.redirect = redirect
1700 self.state = b'pending'
1702 self.state = b'pending'
1701
1703
1702
1704
1703 class clientreactor:
1705 class clientreactor:
1704 """Holds state of a client issuing frame-based protocol requests.
1706 """Holds state of a client issuing frame-based protocol requests.
1705
1707
1706 This is like ``serverreactor`` but for client-side state.
1708 This is like ``serverreactor`` but for client-side state.
1707
1709
1708 Each instance is bound to the lifetime of a connection. For persistent
1710 Each instance is bound to the lifetime of a connection. For persistent
1709 connection transports using e.g. TCP sockets and speaking the raw
1711 connection transports using e.g. TCP sockets and speaking the raw
1710 framing protocol, there will be a single instance for the lifetime of
1712 framing protocol, there will be a single instance for the lifetime of
1711 the TCP socket. For transports where there are multiple discrete
1713 the TCP socket. For transports where there are multiple discrete
1712 interactions (say tunneled within in HTTP request), there will be a
1714 interactions (say tunneled within in HTTP request), there will be a
1713 separate instance for each distinct interaction.
1715 separate instance for each distinct interaction.
1714
1716
1715 Consumers are expected to tell instances when events occur by calling
1717 Consumers are expected to tell instances when events occur by calling
1716 various methods. These methods return a 2-tuple describing any follow-up
1718 various methods. These methods return a 2-tuple describing any follow-up
1717 action(s) to take. The first element is the name of an action to
1719 action(s) to take. The first element is the name of an action to
1718 perform. The second is a data structure (usually a dict) specific to
1720 perform. The second is a data structure (usually a dict) specific to
1719 that action that contains more information. e.g. if the reactor wants
1721 that action that contains more information. e.g. if the reactor wants
1720 to send frames to the server, the data structure will contain a reference
1722 to send frames to the server, the data structure will contain a reference
1721 to those frames.
1723 to those frames.
1722
1724
1723 Valid actions that consumers can be instructed to take are:
1725 Valid actions that consumers can be instructed to take are:
1724
1726
1725 noop
1727 noop
1726 Indicates no additional action is required.
1728 Indicates no additional action is required.
1727
1729
1728 sendframes
1730 sendframes
1729 Indicates that frames should be sent to the server. The ``framegen``
1731 Indicates that frames should be sent to the server. The ``framegen``
1730 key contains a generator of frames that should be sent. The reactor
1732 key contains a generator of frames that should be sent. The reactor
1731 assumes that all frames in this generator are sent to the server.
1733 assumes that all frames in this generator are sent to the server.
1732
1734
1733 error
1735 error
1734 Indicates that an error occurred. The ``message`` key contains an
1736 Indicates that an error occurred. The ``message`` key contains an
1735 error message describing the failure.
1737 error message describing the failure.
1736
1738
1737 responsedata
1739 responsedata
1738 Indicates a response to a previously-issued command was received.
1740 Indicates a response to a previously-issued command was received.
1739
1741
1740 The ``request`` key contains the ``commandrequest`` instance that
1742 The ``request`` key contains the ``commandrequest`` instance that
1741 represents the request this data is for.
1743 represents the request this data is for.
1742
1744
1743 The ``data`` key contains the decoded data from the server.
1745 The ``data`` key contains the decoded data from the server.
1744
1746
1745 ``expectmore`` and ``eos`` evaluate to True when more response data
1747 ``expectmore`` and ``eos`` evaluate to True when more response data
1746 is expected to follow or we're at the end of the response stream,
1748 is expected to follow or we're at the end of the response stream,
1747 respectively.
1749 respectively.
1748 """
1750 """
1749
1751
1750 def __init__(
1752 def __init__(
1751 self,
1753 self,
1752 ui,
1754 ui,
1753 hasmultiplesend=False,
1755 hasmultiplesend=False,
1754 buffersends=True,
1756 buffersends=True,
1755 clientcontentencoders=None,
1757 clientcontentencoders=None,
1756 ):
1758 ):
1757 """Create a new instance.
1759 """Create a new instance.
1758
1760
1759 ``hasmultiplesend`` indicates whether multiple sends are supported
1761 ``hasmultiplesend`` indicates whether multiple sends are supported
1760 by the transport. When True, it is possible to send commands immediately
1762 by the transport. When True, it is possible to send commands immediately
1761 instead of buffering until the caller signals an intent to finish a
1763 instead of buffering until the caller signals an intent to finish a
1762 send operation.
1764 send operation.
1763
1765
1764 ``buffercommands`` indicates whether sends should be buffered until the
1766 ``buffercommands`` indicates whether sends should be buffered until the
1765 last request has been issued.
1767 last request has been issued.
1766
1768
1767 ``clientcontentencoders`` is an iterable of content encoders the client
1769 ``clientcontentencoders`` is an iterable of content encoders the client
1768 will advertise to the server and that the server can use for encoding
1770 will advertise to the server and that the server can use for encoding
1769 data. If not defined, the client will not advertise content encoders
1771 data. If not defined, the client will not advertise content encoders
1770 to the server.
1772 to the server.
1771 """
1773 """
1772 self._ui = ui
1774 self._ui = ui
1773 self._hasmultiplesend = hasmultiplesend
1775 self._hasmultiplesend = hasmultiplesend
1774 self._buffersends = buffersends
1776 self._buffersends = buffersends
1775 self._clientcontentencoders = clientcontentencoders
1777 self._clientcontentencoders = clientcontentencoders
1776
1778
1777 self._canissuecommands = True
1779 self._canissuecommands = True
1778 self._cansend = True
1780 self._cansend = True
1779 self._protocolsettingssent = False
1781 self._protocolsettingssent = False
1780
1782
1781 self._nextrequestid = 1
1783 self._nextrequestid = 1
1782 # We only support a single outgoing stream for now.
1784 # We only support a single outgoing stream for now.
1783 self._outgoingstream = outputstream(1)
1785 self._outgoingstream = outputstream(1)
1784 self._pendingrequests = collections.deque()
1786 self._pendingrequests = collections.deque()
1785 self._activerequests = {}
1787 self._activerequests = {}
1786 self._incomingstreams = {}
1788 self._incomingstreams = {}
1787 self._streamsettingsdecoders = {}
1789 self._streamsettingsdecoders = {}
1788
1790
1789 populatestreamencoders()
1791 populatestreamencoders()
1790
1792
1791 def callcommand(self, name, args, datafh=None, redirect=None):
1793 def callcommand(self, name, args, datafh=None, redirect=None):
1792 """Request that a command be executed.
1794 """Request that a command be executed.
1793
1795
1794 Receives the command name, a dict of arguments to pass to the command,
1796 Receives the command name, a dict of arguments to pass to the command,
1795 and an optional file object containing the raw data for the command.
1797 and an optional file object containing the raw data for the command.
1796
1798
1797 Returns a 3-tuple of (request, action, action data).
1799 Returns a 3-tuple of (request, action, action data).
1798 """
1800 """
1799 if not self._canissuecommands:
1801 if not self._canissuecommands:
1800 raise error.ProgrammingError(b'cannot issue new commands')
1802 raise error.ProgrammingError(b'cannot issue new commands')
1801
1803
1802 requestid = self._nextrequestid
1804 requestid = self._nextrequestid
1803 self._nextrequestid += 2
1805 self._nextrequestid += 2
1804
1806
1805 request = commandrequest(
1807 request = commandrequest(
1806 requestid, name, args, datafh=datafh, redirect=redirect
1808 requestid, name, args, datafh=datafh, redirect=redirect
1807 )
1809 )
1808
1810
1809 if self._buffersends:
1811 if self._buffersends:
1810 self._pendingrequests.append(request)
1812 self._pendingrequests.append(request)
1811 return request, b'noop', {}
1813 return request, b'noop', {}
1812 else:
1814 else:
1813 if not self._cansend:
1815 if not self._cansend:
1814 raise error.ProgrammingError(
1816 raise error.ProgrammingError(
1815 b'sends cannot be performed on this instance'
1817 b'sends cannot be performed on this instance'
1816 )
1818 )
1817
1819
1818 if not self._hasmultiplesend:
1820 if not self._hasmultiplesend:
1819 self._cansend = False
1821 self._cansend = False
1820 self._canissuecommands = False
1822 self._canissuecommands = False
1821
1823
1822 return (
1824 return (
1823 request,
1825 request,
1824 b'sendframes',
1826 b'sendframes',
1825 {
1827 {
1826 b'framegen': self._makecommandframes(request),
1828 b'framegen': self._makecommandframes(request),
1827 },
1829 },
1828 )
1830 )
1829
1831
1830 def flushcommands(self):
1832 def flushcommands(self):
1831 """Request that all queued commands be sent.
1833 """Request that all queued commands be sent.
1832
1834
1833 If any commands are buffered, this will instruct the caller to send
1835 If any commands are buffered, this will instruct the caller to send
1834 them over the wire. If no commands are buffered it instructs the client
1836 them over the wire. If no commands are buffered it instructs the client
1835 to no-op.
1837 to no-op.
1836
1838
1837 If instances aren't configured for multiple sends, no new command
1839 If instances aren't configured for multiple sends, no new command
1838 requests are allowed after this is called.
1840 requests are allowed after this is called.
1839 """
1841 """
1840 if not self._pendingrequests:
1842 if not self._pendingrequests:
1841 return b'noop', {}
1843 return b'noop', {}
1842
1844
1843 if not self._cansend:
1845 if not self._cansend:
1844 raise error.ProgrammingError(
1846 raise error.ProgrammingError(
1845 b'sends cannot be performed on this instance'
1847 b'sends cannot be performed on this instance'
1846 )
1848 )
1847
1849
1848 # If the instance only allows sending once, mark that we have fired
1850 # If the instance only allows sending once, mark that we have fired
1849 # our one shot.
1851 # our one shot.
1850 if not self._hasmultiplesend:
1852 if not self._hasmultiplesend:
1851 self._canissuecommands = False
1853 self._canissuecommands = False
1852 self._cansend = False
1854 self._cansend = False
1853
1855
1854 def makeframes():
1856 def makeframes():
1855 while self._pendingrequests:
1857 while self._pendingrequests:
1856 request = self._pendingrequests.popleft()
1858 request = self._pendingrequests.popleft()
1857 for frame in self._makecommandframes(request):
1859 for frame in self._makecommandframes(request):
1858 yield frame
1860 yield frame
1859
1861
1860 return b'sendframes', {
1862 return b'sendframes', {
1861 b'framegen': makeframes(),
1863 b'framegen': makeframes(),
1862 }
1864 }
1863
1865
1864 def _makecommandframes(self, request):
1866 def _makecommandframes(self, request):
1865 """Emit frames to issue a command request.
1867 """Emit frames to issue a command request.
1866
1868
1867 As a side-effect, update request accounting to reflect its changed
1869 As a side-effect, update request accounting to reflect its changed
1868 state.
1870 state.
1869 """
1871 """
1870 self._activerequests[request.requestid] = request
1872 self._activerequests[request.requestid] = request
1871 request.state = b'sending'
1873 request.state = b'sending'
1872
1874
1873 if not self._protocolsettingssent and self._clientcontentencoders:
1875 if not self._protocolsettingssent and self._clientcontentencoders:
1874 self._protocolsettingssent = True
1876 self._protocolsettingssent = True
1875
1877
1876 payload = b''.join(
1878 payload = b''.join(
1877 cborutil.streamencode(
1879 cborutil.streamencode(
1878 {
1880 {
1879 b'contentencodings': self._clientcontentencoders,
1881 b'contentencodings': self._clientcontentencoders,
1880 }
1882 }
1881 )
1883 )
1882 )
1884 )
1883
1885
1884 yield self._outgoingstream.makeframe(
1886 yield self._outgoingstream.makeframe(
1885 requestid=request.requestid,
1887 requestid=request.requestid,
1886 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1888 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1887 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1889 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1888 payload=payload,
1890 payload=payload,
1889 )
1891 )
1890
1892
1891 res = createcommandframes(
1893 res = createcommandframes(
1892 self._outgoingstream,
1894 self._outgoingstream,
1893 request.requestid,
1895 request.requestid,
1894 request.name,
1896 request.name,
1895 request.args,
1897 request.args,
1896 datafh=request.datafh,
1898 datafh=request.datafh,
1897 redirect=request.redirect,
1899 redirect=request.redirect,
1898 )
1900 )
1899
1901
1900 for frame in res:
1902 for frame in res:
1901 yield frame
1903 yield frame
1902
1904
1903 request.state = b'sent'
1905 request.state = b'sent'
1904
1906
1905 def onframerecv(self, frame):
1907 def onframerecv(self, frame):
1906 """Process a frame that has been received off the wire.
1908 """Process a frame that has been received off the wire.
1907
1909
1908 Returns a 2-tuple of (action, meta) describing further action the
1910 Returns a 2-tuple of (action, meta) describing further action the
1909 caller needs to take as a result of receiving this frame.
1911 caller needs to take as a result of receiving this frame.
1910 """
1912 """
1911 if frame.streamid % 2:
1913 if frame.streamid % 2:
1912 return (
1914 return (
1913 b'error',
1915 b'error',
1914 {
1916 {
1915 b'message': (
1917 b'message': (
1916 _(b'received frame with odd numbered stream ID: %d')
1918 _(b'received frame with odd numbered stream ID: %d')
1917 % frame.streamid
1919 % frame.streamid
1918 ),
1920 ),
1919 },
1921 },
1920 )
1922 )
1921
1923
1922 if frame.streamid not in self._incomingstreams:
1924 if frame.streamid not in self._incomingstreams:
1923 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1925 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1924 return (
1926 return (
1925 b'error',
1927 b'error',
1926 {
1928 {
1927 b'message': _(
1929 b'message': _(
1928 b'received frame on unknown stream '
1930 b'received frame on unknown stream '
1929 b'without beginning of stream flag set'
1931 b'without beginning of stream flag set'
1930 ),
1932 ),
1931 },
1933 },
1932 )
1934 )
1933
1935
1934 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1936 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1935
1937
1936 stream = self._incomingstreams[frame.streamid]
1938 stream = self._incomingstreams[frame.streamid]
1937
1939
1938 # If the payload is encoded, ask the stream to decode it. We
1940 # If the payload is encoded, ask the stream to decode it. We
1939 # merely substitute the decoded result into the frame payload as
1941 # merely substitute the decoded result into the frame payload as
1940 # if it had been transferred all along.
1942 # if it had been transferred all along.
1941 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1943 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1942 frame.payload = stream.decode(frame.payload)
1944 frame.payload = stream.decode(frame.payload)
1943
1945
1944 if frame.streamflags & STREAM_FLAG_END_STREAM:
1946 if frame.streamflags & STREAM_FLAG_END_STREAM:
1945 del self._incomingstreams[frame.streamid]
1947 del self._incomingstreams[frame.streamid]
1946
1948
1947 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1949 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1948 return self._onstreamsettingsframe(frame)
1950 return self._onstreamsettingsframe(frame)
1949
1951
1950 if frame.requestid not in self._activerequests:
1952 if frame.requestid not in self._activerequests:
1951 return (
1953 return (
1952 b'error',
1954 b'error',
1953 {
1955 {
1954 b'message': (
1956 b'message': (
1955 _(b'received frame for inactive request ID: %d')
1957 _(b'received frame for inactive request ID: %d')
1956 % frame.requestid
1958 % frame.requestid
1957 ),
1959 ),
1958 },
1960 },
1959 )
1961 )
1960
1962
1961 request = self._activerequests[frame.requestid]
1963 request = self._activerequests[frame.requestid]
1962 request.state = b'receiving'
1964 request.state = b'receiving'
1963
1965
1964 handlers = {
1966 handlers = {
1965 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1967 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1966 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1968 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1967 }
1969 }
1968
1970
1969 meth = handlers.get(frame.typeid)
1971 meth = handlers.get(frame.typeid)
1970 if not meth:
1972 if not meth:
1971 raise error.ProgrammingError(
1973 raise error.ProgrammingError(
1972 b'unhandled frame type: %d' % frame.typeid
1974 b'unhandled frame type: %d' % frame.typeid
1973 )
1975 )
1974
1976
1975 return meth(request, frame)
1977 return meth(request, frame)
1976
1978
1977 def _onstreamsettingsframe(self, frame):
1979 def _onstreamsettingsframe(self, frame):
1978 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1980 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1979
1981
1980 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1982 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1981 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1983 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1982
1984
1983 if more and eos:
1985 if more and eos:
1984 return (
1986 return (
1985 b'error',
1987 b'error',
1986 {
1988 {
1987 b'message': (
1989 b'message': (
1988 _(
1990 _(
1989 b'stream encoding settings frame cannot have both '
1991 b'stream encoding settings frame cannot have both '
1990 b'continuation and end of stream flags set'
1992 b'continuation and end of stream flags set'
1991 )
1993 )
1992 ),
1994 ),
1993 },
1995 },
1994 )
1996 )
1995
1997
1996 if not more and not eos:
1998 if not more and not eos:
1997 return (
1999 return (
1998 b'error',
2000 b'error',
1999 {
2001 {
2000 b'message': _(
2002 b'message': _(
2001 b'stream encoding settings frame must have '
2003 b'stream encoding settings frame must have '
2002 b'continuation or end of stream flag set'
2004 b'continuation or end of stream flag set'
2003 ),
2005 ),
2004 },
2006 },
2005 )
2007 )
2006
2008
2007 if frame.streamid not in self._streamsettingsdecoders:
2009 if frame.streamid not in self._streamsettingsdecoders:
2008 decoder = cborutil.bufferingdecoder()
2010 decoder = cborutil.bufferingdecoder()
2009 self._streamsettingsdecoders[frame.streamid] = decoder
2011 self._streamsettingsdecoders[frame.streamid] = decoder
2010
2012
2011 decoder = self._streamsettingsdecoders[frame.streamid]
2013 decoder = self._streamsettingsdecoders[frame.streamid]
2012
2014
2013 try:
2015 try:
2014 decoder.decode(frame.payload)
2016 decoder.decode(frame.payload)
2015 except Exception as e:
2017 except Exception as e:
2016 return (
2018 return (
2017 b'error',
2019 b'error',
2018 {
2020 {
2019 b'message': (
2021 b'message': (
2020 _(
2022 _(
2021 b'error decoding CBOR from stream encoding '
2023 b'error decoding CBOR from stream encoding '
2022 b'settings frame: %s'
2024 b'settings frame: %s'
2023 )
2025 )
2024 % stringutil.forcebytestr(e)
2026 % stringutil.forcebytestr(e)
2025 ),
2027 ),
2026 },
2028 },
2027 )
2029 )
2028
2030
2029 if more:
2031 if more:
2030 return b'noop', {}
2032 return b'noop', {}
2031
2033
2032 assert eos
2034 assert eos
2033
2035
2034 decoded = decoder.getavailable()
2036 decoded = decoder.getavailable()
2035 del self._streamsettingsdecoders[frame.streamid]
2037 del self._streamsettingsdecoders[frame.streamid]
2036
2038
2037 if not decoded:
2039 if not decoded:
2038 return (
2040 return (
2039 b'error',
2041 b'error',
2040 {
2042 {
2041 b'message': _(
2043 b'message': _(
2042 b'stream encoding settings frame did not contain '
2044 b'stream encoding settings frame did not contain '
2043 b'CBOR data'
2045 b'CBOR data'
2044 ),
2046 ),
2045 },
2047 },
2046 )
2048 )
2047
2049
2048 try:
2050 try:
2049 self._incomingstreams[frame.streamid].setdecoder(
2051 self._incomingstreams[frame.streamid].setdecoder(
2050 self._ui, decoded[0], decoded[1:]
2052 self._ui, decoded[0], decoded[1:]
2051 )
2053 )
2052 except Exception as e:
2054 except Exception as e:
2053 return (
2055 return (
2054 b'error',
2056 b'error',
2055 {
2057 {
2056 b'message': (
2058 b'message': (
2057 _(b'error setting stream decoder: %s')
2059 _(b'error setting stream decoder: %s')
2058 % stringutil.forcebytestr(e)
2060 % stringutil.forcebytestr(e)
2059 ),
2061 ),
2060 },
2062 },
2061 )
2063 )
2062
2064
2063 return b'noop', {}
2065 return b'noop', {}
2064
2066
2065 def _oncommandresponseframe(self, request, frame):
2067 def _oncommandresponseframe(self, request, frame):
2066 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
2068 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
2067 request.state = b'received'
2069 request.state = b'received'
2068 del self._activerequests[request.requestid]
2070 del self._activerequests[request.requestid]
2069
2071
2070 return (
2072 return (
2071 b'responsedata',
2073 b'responsedata',
2072 {
2074 {
2073 b'request': request,
2075 b'request': request,
2074 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
2076 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
2075 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
2077 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
2076 b'data': frame.payload,
2078 b'data': frame.payload,
2077 },
2079 },
2078 )
2080 )
2079
2081
2080 def _onerrorresponseframe(self, request, frame):
2082 def _onerrorresponseframe(self, request, frame):
2081 request.state = b'errored'
2083 request.state = b'errored'
2082 del self._activerequests[request.requestid]
2084 del self._activerequests[request.requestid]
2083
2085
2084 # The payload should be a CBOR map.
2086 # The payload should be a CBOR map.
2085 m = cborutil.decodeall(frame.payload)[0]
2087 m = cborutil.decodeall(frame.payload)[0]
2086
2088
2087 return (
2089 return (
2088 b'error',
2090 b'error',
2089 {
2091 {
2090 b'request': request,
2092 b'request': request,
2091 b'type': m[b'type'],
2093 b'type': m[b'type'],
2092 b'message': m[b'message'],
2094 b'message': m[b'message'],
2093 },
2095 },
2094 )
2096 )
General Comments 0
You need to be logged in to leave comments. Login now