##// END OF EJS Templates
wireprotov2: define and use stream encoders...
Gregory Szorc -
r40167:e6752241 default
parent child Browse files
Show More
@@ -1,1609 +1,1806 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
371
371
372 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
373 """
373 """
374 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
378
378
379 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
384 flags=flags,
385 payload=overall + data)
385 payload=overall + data)
386 return
386 return
387
387
388 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
393 payload=overall)
394
394
395 offset = 0
395 offset = 0
396 while True:
396 while True:
397 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
398 offset += len(chunk)
399 done = offset == len(data)
399 done = offset == len(data)
400
400
401 if done:
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
405
406 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
408 flags=flags,
409 payload=chunk)
409 payload=chunk)
410
410
411 if done:
411 if done:
412 break
412 break
413
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
417
417
418 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
420 flag.
421 """
421 """
422 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
424
425 while True:
425 while True:
426 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
427 if not chunk:
427 if not chunk:
428 break
428 break
429
429
430 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
432 flags=flags,
433 payload=chunk)
433 payload=chunk)
434
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
436
437 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
439
440 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
443 payload=overall)
444
444
445 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
450 payload=b'')
451
451
452 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
453 data = {
454 b'status': b'redirect',
454 b'status': b'redirect',
455 b'location': {
455 b'location': {
456 b'url': location.url,
456 b'url': location.url,
457 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
458 }
458 }
459 }
459 }
460
460
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
462 r'servercadercerts'):
463 value = getattr(location, a)
463 value = getattr(location, a)
464 if value is not None:
464 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
466
466
467 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
471
471
472 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
474 # formatting works consistently?
475 m = {
475 m = {
476 b'status': b'error',
476 b'status': b'error',
477 b'error': {
477 b'error': {
478 b'message': message,
478 b'message': message,
479 }
479 }
480 }
480 }
481
481
482 if args:
482 if args:
483 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
484
484
485 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
486
486
487 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
490 payload=overall)
491
491
492 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
495
496 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
497 b'type': errtype,
498 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
499 }))
499 }))
500
500
501 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
503 flags=0,
504 payload=payload)
504 payload=payload)
505
505
506 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
509
509
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
511
512 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
516 """
516 """
517 atomdicts = []
517 atomdicts = []
518
518
519 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
521
521
522 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
524 for arg in args:
524 for arg in args:
525 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
527 for label in labels:
527 for label in labels:
528 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
530
530
531 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
533
534 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
536
537 # Labels must be ASCII.
537 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
539 for l in labels]
540
540
541 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
542 if args:
542 if args:
543 atom[b'args'] = args
543 atom[b'args'] = args
544 if labels:
544 if labels:
545 atom[b'labels'] = labels
545 atom[b'labels'] = labels
546
546
547 atomdicts.append(atom)
547 atomdicts.append(atom)
548
548
549 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
550
550
551 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
553
553
554 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
556 flags=0,
557 payload=payload)
557 payload=payload)
558
558
559 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
561
561
562 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
565
565
566 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
568 level.
568 level.
569 """
569 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
571 self._stream = stream
572 self._requestid = requestid
572 self._requestid = requestid
573 self._maxsize = maxframesize
573 self._maxsize = maxframesize
574 self._chunks = []
574 self._chunks = []
575 self._chunkssize = 0
575 self._chunkssize = 0
576
576
577 def send(self, data):
577 def send(self, data):
578 """Send new data for emission.
578 """Send new data for emission.
579
579
580 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
581
581
582 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
583 data to frames.
583 data to frames.
584 """
584 """
585
585
586 if data is None:
586 if data is None:
587 for frame in self._flush():
587 for frame in self._flush():
588 yield frame
588 yield frame
589 return
589 return
590
590
591 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
594 # a simple implementation:
595 #
595 #
596 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
597 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
599 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
601 # and then buffer the new chunk.
602
602
603 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
604 for frame in self._flush():
604 for frame in self._flush():
605 yield frame
605 yield frame
606
606
607 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
608 offset = 0
608 offset = 0
609 while True:
609 while True:
610 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
611 offset += len(chunk)
612
612
613 yield self._stream.makeframe(
613 yield self._stream.makeframe(
614 self._requestid,
614 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
617 payload=chunk)
618
618
619 if offset == len(data):
619 if offset == len(data):
620 return
620 return
621
621
622 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
623 # return.
623 # return.
624 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
625 self._chunks.append(data)
626 self._chunkssize += len(data)
626 self._chunkssize += len(data)
627 return
627 return
628
628
629 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
631 # keep things simple for now.
632 for frame in self._flush():
632 for frame in self._flush():
633 yield frame
633 yield frame
634
634
635 self._chunks.append(data)
635 self._chunks.append(data)
636 self._chunkssize = len(data)
636 self._chunkssize = len(data)
637
637
638 def _flush(self):
638 def _flush(self):
639 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
641
641
642 self._chunks[:] = []
642 self._chunks[:] = []
643 self._chunkssize = 0
643 self._chunkssize = 0
644
644
645 yield self._stream.makeframe(
645 yield self._stream.makeframe(
646 self._requestid,
646 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
649 payload=payload)
650
650
651 # TODO consider defining encoders/decoders using the util.compressionengine
652 # mechanism.
653
654 class identityencoder(object):
655 """Encoder for the "identity" stream encoding profile."""
656 def __init__(self, ui):
657 pass
658
659 def encode(self, data):
660 return data
661
662 def flush(self):
663 return b''
664
665 def finish(self):
666 return b''
667
668 class identitydecoder(object):
669 """Decoder for the "identity" stream encoding profile."""
670
671 def __init__(self, ui, extraobjs):
672 if extraobjs:
673 raise error.Abort(_('identity decoder received unexpected '
674 'additional values'))
675
676 def decode(self, data):
677 return data
678
679 class zlibencoder(object):
680 def __init__(self, ui):
681 import zlib
682 self._zlib = zlib
683 self._compressor = zlib.compressobj()
684
685 def encode(self, data):
686 return self._compressor.compress(data)
687
688 def flush(self):
689 # Z_SYNC_FLUSH doesn't reset compression context, which is
690 # what we want.
691 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
692
693 def finish(self):
694 res = self._compressor.flush(self._zlib.Z_FINISH)
695 self._compressor = None
696 return res
697
698 class zlibdecoder(object):
699 def __init__(self, ui, extraobjs):
700 import zlib
701
702 if extraobjs:
703 raise error.Abort(_('zlib decoder received unexpected '
704 'additional values'))
705
706 self._decompressor = zlib.decompressobj()
707
708 def decode(self, data):
709 # Python 2's zlib module doesn't use the buffer protocol and can't
710 # handle all bytes-like types.
711 if not pycompat.ispy3 and isinstance(data, bytearray):
712 data = bytes(data)
713
714 return self._decompressor.decompress(data)
715
716 class zstdbaseencoder(object):
717 def __init__(self, level):
718 from . import zstd
719
720 self._zstd = zstd
721 cctx = zstd.ZstdCompressor(level=level)
722 self._compressor = cctx.compressobj()
723
724 def encode(self, data):
725 return self._compressor.compress(data)
726
727 def flush(self):
728 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
729 # compressor and allows a decompressor to access all encoded data
730 # up to this point.
731 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
732
733 def finish(self):
734 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
735 self._compressor = None
736 return res
737
738 class zstd8mbencoder(zstdbaseencoder):
739 def __init__(self, ui):
740 super(zstd8mbencoder, self).__init__(3)
741
742 class zstdbasedecoder(object):
743 def __init__(self, maxwindowsize):
744 from . import zstd
745 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
746 self._decompressor = dctx.decompressobj()
747
748 def decode(self, data):
749 return self._decompressor.decompress(data)
750
751 class zstd8mbdecoder(zstdbasedecoder):
752 def __init__(self, ui, extraobjs):
753 if extraobjs:
754 raise error.Abort(_('zstd8mb decoder received unexpected '
755 'additional values'))
756
757 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
758
759 # We lazily populate this to avoid excessive module imports when importing
760 # this module.
761 STREAM_ENCODERS = {}
762 STREAM_ENCODERS_ORDER = []
763
764 def populatestreamencoders():
765 if STREAM_ENCODERS:
766 return
767
768 try:
769 from . import zstd
770 zstd.__version__
771 except ImportError:
772 zstd = None
773
774 # zstandard is fastest and is preferred.
775 if zstd:
776 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
777 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
778
779 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
780 STREAM_ENCODERS_ORDER.append(b'zlib')
781
782 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
783 STREAM_ENCODERS_ORDER.append(b'identity')
784
651 class stream(object):
785 class stream(object):
652 """Represents a logical unidirectional series of frames."""
786 """Represents a logical unidirectional series of frames."""
653
787
654 def __init__(self, streamid, active=False):
788 def __init__(self, streamid, active=False):
655 self.streamid = streamid
789 self.streamid = streamid
656 self._active = active
790 self._active = active
657
791
658 def makeframe(self, requestid, typeid, flags, payload):
792 def makeframe(self, requestid, typeid, flags, payload):
659 """Create a frame to be sent out over this stream.
793 """Create a frame to be sent out over this stream.
660
794
661 Only returns the frame instance. Does not actually send it.
795 Only returns the frame instance. Does not actually send it.
662 """
796 """
663 streamflags = 0
797 streamflags = 0
664 if not self._active:
798 if not self._active:
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
799 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 self._active = True
800 self._active = True
667
801
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
802 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 payload)
803 payload)
670
804
671 class inputstream(stream):
805 class inputstream(stream):
672 """Represents a stream used for receiving data."""
806 """Represents a stream used for receiving data."""
673
807
674 def setdecoder(self, name, extraobjs):
808 def __init__(self, streamid, active=False):
809 super(inputstream, self).__init__(streamid, active=active)
810 self._decoder = None
811
812 def setdecoder(self, ui, name, extraobjs):
675 """Set the decoder for this stream.
813 """Set the decoder for this stream.
676
814
677 Receives the stream profile name and any additional CBOR objects
815 Receives the stream profile name and any additional CBOR objects
678 decoded from the stream encoding settings frame payloads.
816 decoded from the stream encoding settings frame payloads.
679 """
817 """
818 if name not in STREAM_ENCODERS:
819 raise error.Abort(_('unknown stream decoder: %s') % name)
820
821 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
822
823 def decode(self, data):
824 # Default is identity decoder. We don't bother instantiating one
825 # because it is trivial.
826 if not self._decoder:
827 return data
828
829 return self._decoder.decode(data)
830
831 def flush(self):
832 if not self._decoder:
833 return b''
834
835 return self._decoder.flush()
680
836
681 class outputstream(stream):
837 class outputstream(stream):
682 """Represents a stream used for sending data."""
838 """Represents a stream used for sending data."""
683
839
840 def __init__(self, streamid, active=False):
841 super(outputstream, self).__init__(streamid, active=active)
842 self._encoder = None
843
844 def setencoder(self, ui, name):
845 """Set the encoder for this stream.
846
847 Receives the stream profile name.
848 """
849 if name not in STREAM_ENCODERS:
850 raise error.Abort(_('unknown stream encoder: %s') % name)
851
852 self._encoder = STREAM_ENCODERS[name][0](ui)
853
854 def encode(self, data):
855 if not self._encoder:
856 return data
857
858 return self._encoder.encode(data)
859
860 def flush(self):
861 if not self._encoder:
862 return b''
863
864 return self._encoder.flush()
865
866 def finish(self):
867 if not self._encoder:
868 return b''
869
870 self._encoder.finish()
871
684 def ensureserverstream(stream):
872 def ensureserverstream(stream):
685 if stream.streamid % 2:
873 if stream.streamid % 2:
686 raise error.ProgrammingError('server should only write to even '
874 raise error.ProgrammingError('server should only write to even '
687 'numbered streams; %d is not even' %
875 'numbered streams; %d is not even' %
688 stream.streamid)
876 stream.streamid)
689
877
690 DEFAULT_PROTOCOL_SETTINGS = {
878 DEFAULT_PROTOCOL_SETTINGS = {
691 'contentencodings': [b'identity'],
879 'contentencodings': [b'identity'],
692 }
880 }
693
881
694 class serverreactor(object):
882 class serverreactor(object):
695 """Holds state of a server handling frame-based protocol requests.
883 """Holds state of a server handling frame-based protocol requests.
696
884
697 This class is the "brain" of the unified frame-based protocol server
885 This class is the "brain" of the unified frame-based protocol server
698 component. While the protocol is stateless from the perspective of
886 component. While the protocol is stateless from the perspective of
699 requests/commands, something needs to track which frames have been
887 requests/commands, something needs to track which frames have been
700 received, what frames to expect, etc. This class is that thing.
888 received, what frames to expect, etc. This class is that thing.
701
889
702 Instances are modeled as a state machine of sorts. Instances are also
890 Instances are modeled as a state machine of sorts. Instances are also
703 reactionary to external events. The point of this class is to encapsulate
891 reactionary to external events. The point of this class is to encapsulate
704 the state of the connection and the exchange of frames, not to perform
892 the state of the connection and the exchange of frames, not to perform
705 work. Instead, callers tell this class when something occurs, like a
893 work. Instead, callers tell this class when something occurs, like a
706 frame arriving. If that activity is worthy of a follow-up action (say
894 frame arriving. If that activity is worthy of a follow-up action (say
707 *run a command*), the return value of that handler will say so.
895 *run a command*), the return value of that handler will say so.
708
896
709 I/O and CPU intensive operations are purposefully delegated outside of
897 I/O and CPU intensive operations are purposefully delegated outside of
710 this class.
898 this class.
711
899
712 Consumers are expected to tell instances when events occur. They do so by
900 Consumers are expected to tell instances when events occur. They do so by
713 calling the various ``on*`` methods. These methods return a 2-tuple
901 calling the various ``on*`` methods. These methods return a 2-tuple
714 describing any follow-up action(s) to take. The first element is the
902 describing any follow-up action(s) to take. The first element is the
715 name of an action to perform. The second is a data structure (usually
903 name of an action to perform. The second is a data structure (usually
716 a dict) specific to that action that contains more information. e.g.
904 a dict) specific to that action that contains more information. e.g.
717 if the server wants to send frames back to the client, the data structure
905 if the server wants to send frames back to the client, the data structure
718 will contain a reference to those frames.
906 will contain a reference to those frames.
719
907
720 Valid actions that consumers can be instructed to take are:
908 Valid actions that consumers can be instructed to take are:
721
909
722 sendframes
910 sendframes
723 Indicates that frames should be sent to the client. The ``framegen``
911 Indicates that frames should be sent to the client. The ``framegen``
724 key contains a generator of frames that should be sent. The server
912 key contains a generator of frames that should be sent. The server
725 assumes that all frames are sent to the client.
913 assumes that all frames are sent to the client.
726
914
727 error
915 error
728 Indicates that an error occurred. Consumer should probably abort.
916 Indicates that an error occurred. Consumer should probably abort.
729
917
730 runcommand
918 runcommand
731 Indicates that the consumer should run a wire protocol command. Details
919 Indicates that the consumer should run a wire protocol command. Details
732 of the command to run are given in the data structure.
920 of the command to run are given in the data structure.
733
921
734 wantframe
922 wantframe
735 Indicates that nothing of interest happened and the server is waiting on
923 Indicates that nothing of interest happened and the server is waiting on
736 more frames from the client before anything interesting can be done.
924 more frames from the client before anything interesting can be done.
737
925
738 noop
926 noop
739 Indicates no additional action is required.
927 Indicates no additional action is required.
740
928
741 Known Issues
929 Known Issues
742 ------------
930 ------------
743
931
744 There are no limits to the number of partially received commands or their
932 There are no limits to the number of partially received commands or their
745 size. A malicious client could stream command request data and exhaust the
933 size. A malicious client could stream command request data and exhaust the
746 server's memory.
934 server's memory.
747
935
748 Partially received commands are not acted upon when end of input is
936 Partially received commands are not acted upon when end of input is
749 reached. Should the server error if it receives a partial request?
937 reached. Should the server error if it receives a partial request?
750 Should the client send a message to abort a partially transmitted request
938 Should the client send a message to abort a partially transmitted request
751 to facilitate graceful shutdown?
939 to facilitate graceful shutdown?
752
940
753 Active requests that haven't been responded to aren't tracked. This means
941 Active requests that haven't been responded to aren't tracked. This means
754 that if we receive a command and instruct its dispatch, another command
942 that if we receive a command and instruct its dispatch, another command
755 with its request ID can come in over the wire and there will be a race
943 with its request ID can come in over the wire and there will be a race
756 between who responds to what.
944 between who responds to what.
757 """
945 """
758
946
759 def __init__(self, ui, deferoutput=False):
947 def __init__(self, ui, deferoutput=False):
760 """Construct a new server reactor.
948 """Construct a new server reactor.
761
949
762 ``deferoutput`` can be used to indicate that no output frames should be
950 ``deferoutput`` can be used to indicate that no output frames should be
763 instructed to be sent until input has been exhausted. In this mode,
951 instructed to be sent until input has been exhausted. In this mode,
764 events that would normally generate output frames (such as a command
952 events that would normally generate output frames (such as a command
765 response being ready) will instead defer instructing the consumer to
953 response being ready) will instead defer instructing the consumer to
766 send those frames. This is useful for half-duplex transports where the
954 send those frames. This is useful for half-duplex transports where the
767 sender cannot receive until all data has been transmitted.
955 sender cannot receive until all data has been transmitted.
768 """
956 """
769 self._ui = ui
957 self._ui = ui
770 self._deferoutput = deferoutput
958 self._deferoutput = deferoutput
771 self._state = 'initial'
959 self._state = 'initial'
772 self._nextoutgoingstreamid = 2
960 self._nextoutgoingstreamid = 2
773 self._bufferedframegens = []
961 self._bufferedframegens = []
774 # stream id -> stream instance for all active streams from the client.
962 # stream id -> stream instance for all active streams from the client.
775 self._incomingstreams = {}
963 self._incomingstreams = {}
776 self._outgoingstreams = {}
964 self._outgoingstreams = {}
777 # request id -> dict of commands that are actively being received.
965 # request id -> dict of commands that are actively being received.
778 self._receivingcommands = {}
966 self._receivingcommands = {}
779 # Request IDs that have been received and are actively being processed.
967 # Request IDs that have been received and are actively being processed.
780 # Once all output for a request has been sent, it is removed from this
968 # Once all output for a request has been sent, it is removed from this
781 # set.
969 # set.
782 self._activecommands = set()
970 self._activecommands = set()
783
971
784 self._protocolsettingsdecoder = None
972 self._protocolsettingsdecoder = None
785
973
786 # Sender protocol settings are optional. Set implied default values.
974 # Sender protocol settings are optional. Set implied default values.
787 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
975 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
788
976
977 populatestreamencoders()
978
789 def onframerecv(self, frame):
979 def onframerecv(self, frame):
790 """Process a frame that has been received off the wire.
980 """Process a frame that has been received off the wire.
791
981
792 Returns a dict with an ``action`` key that details what action,
982 Returns a dict with an ``action`` key that details what action,
793 if any, the consumer should take next.
983 if any, the consumer should take next.
794 """
984 """
795 if not frame.streamid % 2:
985 if not frame.streamid % 2:
796 self._state = 'errored'
986 self._state = 'errored'
797 return self._makeerrorresult(
987 return self._makeerrorresult(
798 _('received frame with even numbered stream ID: %d') %
988 _('received frame with even numbered stream ID: %d') %
799 frame.streamid)
989 frame.streamid)
800
990
801 if frame.streamid not in self._incomingstreams:
991 if frame.streamid not in self._incomingstreams:
802 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
992 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
803 self._state = 'errored'
993 self._state = 'errored'
804 return self._makeerrorresult(
994 return self._makeerrorresult(
805 _('received frame on unknown inactive stream without '
995 _('received frame on unknown inactive stream without '
806 'beginning of stream flag set'))
996 'beginning of stream flag set'))
807
997
808 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
998 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
809
999
810 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1000 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
811 # TODO handle decoding frames
1001 # TODO handle decoding frames
812 self._state = 'errored'
1002 self._state = 'errored'
813 raise error.ProgrammingError('support for decoding stream payloads '
1003 raise error.ProgrammingError('support for decoding stream payloads '
814 'not yet implemented')
1004 'not yet implemented')
815
1005
816 if frame.streamflags & STREAM_FLAG_END_STREAM:
1006 if frame.streamflags & STREAM_FLAG_END_STREAM:
817 del self._incomingstreams[frame.streamid]
1007 del self._incomingstreams[frame.streamid]
818
1008
819 handlers = {
1009 handlers = {
820 'initial': self._onframeinitial,
1010 'initial': self._onframeinitial,
821 'protocol-settings-receiving': self._onframeprotocolsettings,
1011 'protocol-settings-receiving': self._onframeprotocolsettings,
822 'idle': self._onframeidle,
1012 'idle': self._onframeidle,
823 'command-receiving': self._onframecommandreceiving,
1013 'command-receiving': self._onframecommandreceiving,
824 'errored': self._onframeerrored,
1014 'errored': self._onframeerrored,
825 }
1015 }
826
1016
827 meth = handlers.get(self._state)
1017 meth = handlers.get(self._state)
828 if not meth:
1018 if not meth:
829 raise error.ProgrammingError('unhandled state: %s' % self._state)
1019 raise error.ProgrammingError('unhandled state: %s' % self._state)
830
1020
831 return meth(frame)
1021 return meth(frame)
832
1022
833 def oncommandresponseready(self, stream, requestid, data):
1023 def oncommandresponseready(self, stream, requestid, data):
834 """Signal that a bytes response is ready to be sent to the client.
1024 """Signal that a bytes response is ready to be sent to the client.
835
1025
836 The raw bytes response is passed as an argument.
1026 The raw bytes response is passed as an argument.
837 """
1027 """
838 ensureserverstream(stream)
1028 ensureserverstream(stream)
839
1029
840 def sendframes():
1030 def sendframes():
841 for frame in createcommandresponseframesfrombytes(stream, requestid,
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
842 data):
1032 data):
843 yield frame
1033 yield frame
844
1034
845 self._activecommands.remove(requestid)
1035 self._activecommands.remove(requestid)
846
1036
847 result = sendframes()
1037 result = sendframes()
848
1038
849 if self._deferoutput:
1039 if self._deferoutput:
850 self._bufferedframegens.append(result)
1040 self._bufferedframegens.append(result)
851 return 'noop', {}
1041 return 'noop', {}
852 else:
1042 else:
853 return 'sendframes', {
1043 return 'sendframes', {
854 'framegen': result,
1044 'framegen': result,
855 }
1045 }
856
1046
857 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1047 def oncommandresponsereadyobjects(self, stream, requestid, objs):
858 """Signal that objects are ready to be sent to the client.
1048 """Signal that objects are ready to be sent to the client.
859
1049
860 ``objs`` is an iterable of objects (typically a generator) that will
1050 ``objs`` is an iterable of objects (typically a generator) that will
861 be encoded via CBOR and added to frames, which will be sent to the
1051 be encoded via CBOR and added to frames, which will be sent to the
862 client.
1052 client.
863 """
1053 """
864 ensureserverstream(stream)
1054 ensureserverstream(stream)
865
1055
866 # We need to take care over exception handling. Uncaught exceptions
1056 # We need to take care over exception handling. Uncaught exceptions
867 # when generating frames could lead to premature end of the frame
1057 # when generating frames could lead to premature end of the frame
868 # stream and the possibility of the server or client process getting
1058 # stream and the possibility of the server or client process getting
869 # in a bad state.
1059 # in a bad state.
870 #
1060 #
871 # Keep in mind that if ``objs`` is a generator, advancing it could
1061 # Keep in mind that if ``objs`` is a generator, advancing it could
872 # raise exceptions that originated in e.g. wire protocol command
1062 # raise exceptions that originated in e.g. wire protocol command
873 # functions. That is why we differentiate between exceptions raised
1063 # functions. That is why we differentiate between exceptions raised
874 # when iterating versus other exceptions that occur.
1064 # when iterating versus other exceptions that occur.
875 #
1065 #
876 # In all cases, when the function finishes, the request is fully
1066 # In all cases, when the function finishes, the request is fully
877 # handled and no new frames for it should be seen.
1067 # handled and no new frames for it should be seen.
878
1068
879 def sendframes():
1069 def sendframes():
880 emitted = False
1070 emitted = False
881 alternatelocationsent = False
1071 alternatelocationsent = False
882 emitter = bufferingcommandresponseemitter(stream, requestid)
1072 emitter = bufferingcommandresponseemitter(stream, requestid)
883 while True:
1073 while True:
884 try:
1074 try:
885 o = next(objs)
1075 o = next(objs)
886 except StopIteration:
1076 except StopIteration:
887 for frame in emitter.send(None):
1077 for frame in emitter.send(None):
888 yield frame
1078 yield frame
889
1079
890 if emitted:
1080 if emitted:
891 yield createcommandresponseeosframe(stream, requestid)
1081 yield createcommandresponseeosframe(stream, requestid)
892 break
1082 break
893
1083
894 except error.WireprotoCommandError as e:
1084 except error.WireprotoCommandError as e:
895 for frame in createcommanderrorresponse(
1085 for frame in createcommanderrorresponse(
896 stream, requestid, e.message, e.messageargs):
1086 stream, requestid, e.message, e.messageargs):
897 yield frame
1087 yield frame
898 break
1088 break
899
1089
900 except Exception as e:
1090 except Exception as e:
901 for frame in createerrorframe(
1091 for frame in createerrorframe(
902 stream, requestid, '%s' % stringutil.forcebytestr(e),
1092 stream, requestid, '%s' % stringutil.forcebytestr(e),
903 errtype='server'):
1093 errtype='server'):
904
1094
905 yield frame
1095 yield frame
906
1096
907 break
1097 break
908
1098
909 try:
1099 try:
910 # Alternate location responses can only be the first and
1100 # Alternate location responses can only be the first and
911 # only object in the output stream.
1101 # only object in the output stream.
912 if isinstance(o, wireprototypes.alternatelocationresponse):
1102 if isinstance(o, wireprototypes.alternatelocationresponse):
913 if emitted:
1103 if emitted:
914 raise error.ProgrammingError(
1104 raise error.ProgrammingError(
915 'alternatelocationresponse seen after initial '
1105 'alternatelocationresponse seen after initial '
916 'output object')
1106 'output object')
917
1107
918 yield createalternatelocationresponseframe(
1108 yield createalternatelocationresponseframe(
919 stream, requestid, o)
1109 stream, requestid, o)
920
1110
921 alternatelocationsent = True
1111 alternatelocationsent = True
922 emitted = True
1112 emitted = True
923 continue
1113 continue
924
1114
925 if alternatelocationsent:
1115 if alternatelocationsent:
926 raise error.ProgrammingError(
1116 raise error.ProgrammingError(
927 'object follows alternatelocationresponse')
1117 'object follows alternatelocationresponse')
928
1118
929 if not emitted:
1119 if not emitted:
930 yield createcommandresponseokframe(stream, requestid)
1120 yield createcommandresponseokframe(stream, requestid)
931 emitted = True
1121 emitted = True
932
1122
933 # Objects emitted by command functions can be serializable
1123 # Objects emitted by command functions can be serializable
934 # data structures or special types.
1124 # data structures or special types.
935 # TODO consider extracting the content normalization to a
1125 # TODO consider extracting the content normalization to a
936 # standalone function, as it may be useful for e.g. cachers.
1126 # standalone function, as it may be useful for e.g. cachers.
937
1127
938 # A pre-encoded object is sent directly to the emitter.
1128 # A pre-encoded object is sent directly to the emitter.
939 if isinstance(o, wireprototypes.encodedresponse):
1129 if isinstance(o, wireprototypes.encodedresponse):
940 for frame in emitter.send(o.data):
1130 for frame in emitter.send(o.data):
941 yield frame
1131 yield frame
942
1132
943 # A regular object is CBOR encoded.
1133 # A regular object is CBOR encoded.
944 else:
1134 else:
945 for chunk in cborutil.streamencode(o):
1135 for chunk in cborutil.streamencode(o):
946 for frame in emitter.send(chunk):
1136 for frame in emitter.send(chunk):
947 yield frame
1137 yield frame
948
1138
949 except Exception as e:
1139 except Exception as e:
950 for frame in createerrorframe(stream, requestid,
1140 for frame in createerrorframe(stream, requestid,
951 '%s' % e,
1141 '%s' % e,
952 errtype='server'):
1142 errtype='server'):
953 yield frame
1143 yield frame
954
1144
955 break
1145 break
956
1146
957 self._activecommands.remove(requestid)
1147 self._activecommands.remove(requestid)
958
1148
959 return self._handlesendframes(sendframes())
1149 return self._handlesendframes(sendframes())
960
1150
961 def oninputeof(self):
1151 def oninputeof(self):
962 """Signals that end of input has been received.
1152 """Signals that end of input has been received.
963
1153
964 No more frames will be received. All pending activity should be
1154 No more frames will be received. All pending activity should be
965 completed.
1155 completed.
966 """
1156 """
967 # TODO should we do anything about in-flight commands?
1157 # TODO should we do anything about in-flight commands?
968
1158
969 if not self._deferoutput or not self._bufferedframegens:
1159 if not self._deferoutput or not self._bufferedframegens:
970 return 'noop', {}
1160 return 'noop', {}
971
1161
972 # If we buffered all our responses, emit those.
1162 # If we buffered all our responses, emit those.
973 def makegen():
1163 def makegen():
974 for gen in self._bufferedframegens:
1164 for gen in self._bufferedframegens:
975 for frame in gen:
1165 for frame in gen:
976 yield frame
1166 yield frame
977
1167
978 return 'sendframes', {
1168 return 'sendframes', {
979 'framegen': makegen(),
1169 'framegen': makegen(),
980 }
1170 }
981
1171
982 def _handlesendframes(self, framegen):
1172 def _handlesendframes(self, framegen):
983 if self._deferoutput:
1173 if self._deferoutput:
984 self._bufferedframegens.append(framegen)
1174 self._bufferedframegens.append(framegen)
985 return 'noop', {}
1175 return 'noop', {}
986 else:
1176 else:
987 return 'sendframes', {
1177 return 'sendframes', {
988 'framegen': framegen,
1178 'framegen': framegen,
989 }
1179 }
990
1180
991 def onservererror(self, stream, requestid, msg):
1181 def onservererror(self, stream, requestid, msg):
992 ensureserverstream(stream)
1182 ensureserverstream(stream)
993
1183
994 def sendframes():
1184 def sendframes():
995 for frame in createerrorframe(stream, requestid, msg,
1185 for frame in createerrorframe(stream, requestid, msg,
996 errtype='server'):
1186 errtype='server'):
997 yield frame
1187 yield frame
998
1188
999 self._activecommands.remove(requestid)
1189 self._activecommands.remove(requestid)
1000
1190
1001 return self._handlesendframes(sendframes())
1191 return self._handlesendframes(sendframes())
1002
1192
1003 def oncommanderror(self, stream, requestid, message, args=None):
1193 def oncommanderror(self, stream, requestid, message, args=None):
1004 """Called when a command encountered an error before sending output."""
1194 """Called when a command encountered an error before sending output."""
1005 ensureserverstream(stream)
1195 ensureserverstream(stream)
1006
1196
1007 def sendframes():
1197 def sendframes():
1008 for frame in createcommanderrorresponse(stream, requestid, message,
1198 for frame in createcommanderrorresponse(stream, requestid, message,
1009 args):
1199 args):
1010 yield frame
1200 yield frame
1011
1201
1012 self._activecommands.remove(requestid)
1202 self._activecommands.remove(requestid)
1013
1203
1014 return self._handlesendframes(sendframes())
1204 return self._handlesendframes(sendframes())
1015
1205
1016 def makeoutputstream(self):
1206 def makeoutputstream(self):
1017 """Create a stream to be used for sending data to the client."""
1207 """Create a stream to be used for sending data to the client."""
1018 streamid = self._nextoutgoingstreamid
1208 streamid = self._nextoutgoingstreamid
1019 self._nextoutgoingstreamid += 2
1209 self._nextoutgoingstreamid += 2
1020
1210
1021 s = outputstream(streamid)
1211 s = outputstream(streamid)
1022 self._outgoingstreams[streamid] = s
1212 self._outgoingstreams[streamid] = s
1023
1213
1024 return s
1214 return s
1025
1215
1026 def _makeerrorresult(self, msg):
1216 def _makeerrorresult(self, msg):
1027 return 'error', {
1217 return 'error', {
1028 'message': msg,
1218 'message': msg,
1029 }
1219 }
1030
1220
1031 def _makeruncommandresult(self, requestid):
1221 def _makeruncommandresult(self, requestid):
1032 entry = self._receivingcommands[requestid]
1222 entry = self._receivingcommands[requestid]
1033
1223
1034 if not entry['requestdone']:
1224 if not entry['requestdone']:
1035 self._state = 'errored'
1225 self._state = 'errored'
1036 raise error.ProgrammingError('should not be called without '
1226 raise error.ProgrammingError('should not be called without '
1037 'requestdone set')
1227 'requestdone set')
1038
1228
1039 del self._receivingcommands[requestid]
1229 del self._receivingcommands[requestid]
1040
1230
1041 if self._receivingcommands:
1231 if self._receivingcommands:
1042 self._state = 'command-receiving'
1232 self._state = 'command-receiving'
1043 else:
1233 else:
1044 self._state = 'idle'
1234 self._state = 'idle'
1045
1235
1046 # Decode the payloads as CBOR.
1236 # Decode the payloads as CBOR.
1047 entry['payload'].seek(0)
1237 entry['payload'].seek(0)
1048 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1238 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1049
1239
1050 if b'name' not in request:
1240 if b'name' not in request:
1051 self._state = 'errored'
1241 self._state = 'errored'
1052 return self._makeerrorresult(
1242 return self._makeerrorresult(
1053 _('command request missing "name" field'))
1243 _('command request missing "name" field'))
1054
1244
1055 if b'args' not in request:
1245 if b'args' not in request:
1056 request[b'args'] = {}
1246 request[b'args'] = {}
1057
1247
1058 assert requestid not in self._activecommands
1248 assert requestid not in self._activecommands
1059 self._activecommands.add(requestid)
1249 self._activecommands.add(requestid)
1060
1250
1061 return 'runcommand', {
1251 return 'runcommand', {
1062 'requestid': requestid,
1252 'requestid': requestid,
1063 'command': request[b'name'],
1253 'command': request[b'name'],
1064 'args': request[b'args'],
1254 'args': request[b'args'],
1065 'redirect': request.get(b'redirect'),
1255 'redirect': request.get(b'redirect'),
1066 'data': entry['data'].getvalue() if entry['data'] else None,
1256 'data': entry['data'].getvalue() if entry['data'] else None,
1067 }
1257 }
1068
1258
1069 def _makewantframeresult(self):
1259 def _makewantframeresult(self):
1070 return 'wantframe', {
1260 return 'wantframe', {
1071 'state': self._state,
1261 'state': self._state,
1072 }
1262 }
1073
1263
1074 def _validatecommandrequestframe(self, frame):
1264 def _validatecommandrequestframe(self, frame):
1075 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1265 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1076 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1266 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1077
1267
1078 if new and continuation:
1268 if new and continuation:
1079 self._state = 'errored'
1269 self._state = 'errored'
1080 return self._makeerrorresult(
1270 return self._makeerrorresult(
1081 _('received command request frame with both new and '
1271 _('received command request frame with both new and '
1082 'continuation flags set'))
1272 'continuation flags set'))
1083
1273
1084 if not new and not continuation:
1274 if not new and not continuation:
1085 self._state = 'errored'
1275 self._state = 'errored'
1086 return self._makeerrorresult(
1276 return self._makeerrorresult(
1087 _('received command request frame with neither new nor '
1277 _('received command request frame with neither new nor '
1088 'continuation flags set'))
1278 'continuation flags set'))
1089
1279
1090 def _onframeinitial(self, frame):
1280 def _onframeinitial(self, frame):
1091 # Called when we receive a frame when in the "initial" state.
1281 # Called when we receive a frame when in the "initial" state.
1092 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1282 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1093 self._state = 'protocol-settings-receiving'
1283 self._state = 'protocol-settings-receiving'
1094 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1284 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1095 return self._onframeprotocolsettings(frame)
1285 return self._onframeprotocolsettings(frame)
1096
1286
1097 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1287 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1098 self._state = 'idle'
1288 self._state = 'idle'
1099 return self._onframeidle(frame)
1289 return self._onframeidle(frame)
1100
1290
1101 else:
1291 else:
1102 self._state = 'errored'
1292 self._state = 'errored'
1103 return self._makeerrorresult(
1293 return self._makeerrorresult(
1104 _('expected sender protocol settings or command request '
1294 _('expected sender protocol settings or command request '
1105 'frame; got %d') % frame.typeid)
1295 'frame; got %d') % frame.typeid)
1106
1296
1107 def _onframeprotocolsettings(self, frame):
1297 def _onframeprotocolsettings(self, frame):
1108 assert self._state == 'protocol-settings-receiving'
1298 assert self._state == 'protocol-settings-receiving'
1109 assert self._protocolsettingsdecoder is not None
1299 assert self._protocolsettingsdecoder is not None
1110
1300
1111 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1301 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1112 self._state = 'errored'
1302 self._state = 'errored'
1113 return self._makeerrorresult(
1303 return self._makeerrorresult(
1114 _('expected sender protocol settings frame; got %d') %
1304 _('expected sender protocol settings frame; got %d') %
1115 frame.typeid)
1305 frame.typeid)
1116
1306
1117 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1307 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1118 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1308 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1119
1309
1120 if more and eos:
1310 if more and eos:
1121 self._state = 'errored'
1311 self._state = 'errored'
1122 return self._makeerrorresult(
1312 return self._makeerrorresult(
1123 _('sender protocol settings frame cannot have both '
1313 _('sender protocol settings frame cannot have both '
1124 'continuation and end of stream flags set'))
1314 'continuation and end of stream flags set'))
1125
1315
1126 if not more and not eos:
1316 if not more and not eos:
1127 self._state = 'errored'
1317 self._state = 'errored'
1128 return self._makeerrorresult(
1318 return self._makeerrorresult(
1129 _('sender protocol settings frame must have continuation or '
1319 _('sender protocol settings frame must have continuation or '
1130 'end of stream flag set'))
1320 'end of stream flag set'))
1131
1321
1132 # TODO establish limits for maximum amount of data that can be
1322 # TODO establish limits for maximum amount of data that can be
1133 # buffered.
1323 # buffered.
1134 try:
1324 try:
1135 self._protocolsettingsdecoder.decode(frame.payload)
1325 self._protocolsettingsdecoder.decode(frame.payload)
1136 except Exception as e:
1326 except Exception as e:
1137 self._state = 'errored'
1327 self._state = 'errored'
1138 return self._makeerrorresult(
1328 return self._makeerrorresult(
1139 _('error decoding CBOR from sender protocol settings frame: %s')
1329 _('error decoding CBOR from sender protocol settings frame: %s')
1140 % stringutil.forcebytestr(e))
1330 % stringutil.forcebytestr(e))
1141
1331
1142 if more:
1332 if more:
1143 return self._makewantframeresult()
1333 return self._makewantframeresult()
1144
1334
1145 assert eos
1335 assert eos
1146
1336
1147 decoded = self._protocolsettingsdecoder.getavailable()
1337 decoded = self._protocolsettingsdecoder.getavailable()
1148 self._protocolsettingsdecoder = None
1338 self._protocolsettingsdecoder = None
1149
1339
1150 if not decoded:
1340 if not decoded:
1151 self._state = 'errored'
1341 self._state = 'errored'
1152 return self._makeerrorresult(
1342 return self._makeerrorresult(
1153 _('sender protocol settings frame did not contain CBOR data'))
1343 _('sender protocol settings frame did not contain CBOR data'))
1154 elif len(decoded) > 1:
1344 elif len(decoded) > 1:
1155 self._state = 'errored'
1345 self._state = 'errored'
1156 return self._makeerrorresult(
1346 return self._makeerrorresult(
1157 _('sender protocol settings frame contained multiple CBOR '
1347 _('sender protocol settings frame contained multiple CBOR '
1158 'values'))
1348 'values'))
1159
1349
1160 d = decoded[0]
1350 d = decoded[0]
1161
1351
1162 if b'contentencodings' in d:
1352 if b'contentencodings' in d:
1163 self._sendersettings['contentencodings'] = d[b'contentencodings']
1353 self._sendersettings['contentencodings'] = d[b'contentencodings']
1164
1354
1165 self._state = 'idle'
1355 self._state = 'idle'
1166
1356
1167 return self._makewantframeresult()
1357 return self._makewantframeresult()
1168
1358
1169 def _onframeidle(self, frame):
1359 def _onframeidle(self, frame):
1170 # The only frame type that should be received in this state is a
1360 # The only frame type that should be received in this state is a
1171 # command request.
1361 # command request.
1172 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1362 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1173 self._state = 'errored'
1363 self._state = 'errored'
1174 return self._makeerrorresult(
1364 return self._makeerrorresult(
1175 _('expected command request frame; got %d') % frame.typeid)
1365 _('expected command request frame; got %d') % frame.typeid)
1176
1366
1177 res = self._validatecommandrequestframe(frame)
1367 res = self._validatecommandrequestframe(frame)
1178 if res:
1368 if res:
1179 return res
1369 return res
1180
1370
1181 if frame.requestid in self._receivingcommands:
1371 if frame.requestid in self._receivingcommands:
1182 self._state = 'errored'
1372 self._state = 'errored'
1183 return self._makeerrorresult(
1373 return self._makeerrorresult(
1184 _('request with ID %d already received') % frame.requestid)
1374 _('request with ID %d already received') % frame.requestid)
1185
1375
1186 if frame.requestid in self._activecommands:
1376 if frame.requestid in self._activecommands:
1187 self._state = 'errored'
1377 self._state = 'errored'
1188 return self._makeerrorresult(
1378 return self._makeerrorresult(
1189 _('request with ID %d is already active') % frame.requestid)
1379 _('request with ID %d is already active') % frame.requestid)
1190
1380
1191 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1381 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1192 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1382 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1193 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1383 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1194
1384
1195 if not new:
1385 if not new:
1196 self._state = 'errored'
1386 self._state = 'errored'
1197 return self._makeerrorresult(
1387 return self._makeerrorresult(
1198 _('received command request frame without new flag set'))
1388 _('received command request frame without new flag set'))
1199
1389
1200 payload = util.bytesio()
1390 payload = util.bytesio()
1201 payload.write(frame.payload)
1391 payload.write(frame.payload)
1202
1392
1203 self._receivingcommands[frame.requestid] = {
1393 self._receivingcommands[frame.requestid] = {
1204 'payload': payload,
1394 'payload': payload,
1205 'data': None,
1395 'data': None,
1206 'requestdone': not moreframes,
1396 'requestdone': not moreframes,
1207 'expectingdata': bool(expectingdata),
1397 'expectingdata': bool(expectingdata),
1208 }
1398 }
1209
1399
1210 # This is the final frame for this request. Dispatch it.
1400 # This is the final frame for this request. Dispatch it.
1211 if not moreframes and not expectingdata:
1401 if not moreframes and not expectingdata:
1212 return self._makeruncommandresult(frame.requestid)
1402 return self._makeruncommandresult(frame.requestid)
1213
1403
1214 assert moreframes or expectingdata
1404 assert moreframes or expectingdata
1215 self._state = 'command-receiving'
1405 self._state = 'command-receiving'
1216 return self._makewantframeresult()
1406 return self._makewantframeresult()
1217
1407
1218 def _onframecommandreceiving(self, frame):
1408 def _onframecommandreceiving(self, frame):
1219 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1409 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1220 # Process new command requests as such.
1410 # Process new command requests as such.
1221 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1411 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1222 return self._onframeidle(frame)
1412 return self._onframeidle(frame)
1223
1413
1224 res = self._validatecommandrequestframe(frame)
1414 res = self._validatecommandrequestframe(frame)
1225 if res:
1415 if res:
1226 return res
1416 return res
1227
1417
1228 # All other frames should be related to a command that is currently
1418 # All other frames should be related to a command that is currently
1229 # receiving but is not active.
1419 # receiving but is not active.
1230 if frame.requestid in self._activecommands:
1420 if frame.requestid in self._activecommands:
1231 self._state = 'errored'
1421 self._state = 'errored'
1232 return self._makeerrorresult(
1422 return self._makeerrorresult(
1233 _('received frame for request that is still active: %d') %
1423 _('received frame for request that is still active: %d') %
1234 frame.requestid)
1424 frame.requestid)
1235
1425
1236 if frame.requestid not in self._receivingcommands:
1426 if frame.requestid not in self._receivingcommands:
1237 self._state = 'errored'
1427 self._state = 'errored'
1238 return self._makeerrorresult(
1428 return self._makeerrorresult(
1239 _('received frame for request that is not receiving: %d') %
1429 _('received frame for request that is not receiving: %d') %
1240 frame.requestid)
1430 frame.requestid)
1241
1431
1242 entry = self._receivingcommands[frame.requestid]
1432 entry = self._receivingcommands[frame.requestid]
1243
1433
1244 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1434 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1245 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1435 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1246 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1436 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1247
1437
1248 if entry['requestdone']:
1438 if entry['requestdone']:
1249 self._state = 'errored'
1439 self._state = 'errored'
1250 return self._makeerrorresult(
1440 return self._makeerrorresult(
1251 _('received command request frame when request frames '
1441 _('received command request frame when request frames '
1252 'were supposedly done'))
1442 'were supposedly done'))
1253
1443
1254 if expectingdata != entry['expectingdata']:
1444 if expectingdata != entry['expectingdata']:
1255 self._state = 'errored'
1445 self._state = 'errored'
1256 return self._makeerrorresult(
1446 return self._makeerrorresult(
1257 _('mismatch between expect data flag and previous frame'))
1447 _('mismatch between expect data flag and previous frame'))
1258
1448
1259 entry['payload'].write(frame.payload)
1449 entry['payload'].write(frame.payload)
1260
1450
1261 if not moreframes:
1451 if not moreframes:
1262 entry['requestdone'] = True
1452 entry['requestdone'] = True
1263
1453
1264 if not moreframes and not expectingdata:
1454 if not moreframes and not expectingdata:
1265 return self._makeruncommandresult(frame.requestid)
1455 return self._makeruncommandresult(frame.requestid)
1266
1456
1267 return self._makewantframeresult()
1457 return self._makewantframeresult()
1268
1458
1269 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1459 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1270 if not entry['expectingdata']:
1460 if not entry['expectingdata']:
1271 self._state = 'errored'
1461 self._state = 'errored'
1272 return self._makeerrorresult(_(
1462 return self._makeerrorresult(_(
1273 'received command data frame for request that is not '
1463 'received command data frame for request that is not '
1274 'expecting data: %d') % frame.requestid)
1464 'expecting data: %d') % frame.requestid)
1275
1465
1276 if entry['data'] is None:
1466 if entry['data'] is None:
1277 entry['data'] = util.bytesio()
1467 entry['data'] = util.bytesio()
1278
1468
1279 return self._handlecommanddataframe(frame, entry)
1469 return self._handlecommanddataframe(frame, entry)
1280 else:
1470 else:
1281 self._state = 'errored'
1471 self._state = 'errored'
1282 return self._makeerrorresult(_(
1472 return self._makeerrorresult(_(
1283 'received unexpected frame type: %d') % frame.typeid)
1473 'received unexpected frame type: %d') % frame.typeid)
1284
1474
1285 def _handlecommanddataframe(self, frame, entry):
1475 def _handlecommanddataframe(self, frame, entry):
1286 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1476 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1287
1477
1288 # TODO support streaming data instead of buffering it.
1478 # TODO support streaming data instead of buffering it.
1289 entry['data'].write(frame.payload)
1479 entry['data'].write(frame.payload)
1290
1480
1291 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1481 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1292 return self._makewantframeresult()
1482 return self._makewantframeresult()
1293 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1483 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1294 entry['data'].seek(0)
1484 entry['data'].seek(0)
1295 return self._makeruncommandresult(frame.requestid)
1485 return self._makeruncommandresult(frame.requestid)
1296 else:
1486 else:
1297 self._state = 'errored'
1487 self._state = 'errored'
1298 return self._makeerrorresult(_('command data frame without '
1488 return self._makeerrorresult(_('command data frame without '
1299 'flags'))
1489 'flags'))
1300
1490
1301 def _onframeerrored(self, frame):
1491 def _onframeerrored(self, frame):
1302 return self._makeerrorresult(_('server already errored'))
1492 return self._makeerrorresult(_('server already errored'))
1303
1493
1304 class commandrequest(object):
1494 class commandrequest(object):
1305 """Represents a request to run a command."""
1495 """Represents a request to run a command."""
1306
1496
1307 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1497 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1308 self.requestid = requestid
1498 self.requestid = requestid
1309 self.name = name
1499 self.name = name
1310 self.args = args
1500 self.args = args
1311 self.datafh = datafh
1501 self.datafh = datafh
1312 self.redirect = redirect
1502 self.redirect = redirect
1313 self.state = 'pending'
1503 self.state = 'pending'
1314
1504
1315 class clientreactor(object):
1505 class clientreactor(object):
1316 """Holds state of a client issuing frame-based protocol requests.
1506 """Holds state of a client issuing frame-based protocol requests.
1317
1507
1318 This is like ``serverreactor`` but for client-side state.
1508 This is like ``serverreactor`` but for client-side state.
1319
1509
1320 Each instance is bound to the lifetime of a connection. For persistent
1510 Each instance is bound to the lifetime of a connection. For persistent
1321 connection transports using e.g. TCP sockets and speaking the raw
1511 connection transports using e.g. TCP sockets and speaking the raw
1322 framing protocol, there will be a single instance for the lifetime of
1512 framing protocol, there will be a single instance for the lifetime of
1323 the TCP socket. For transports where there are multiple discrete
1513 the TCP socket. For transports where there are multiple discrete
1324 interactions (say tunneled within in HTTP request), there will be a
1514 interactions (say tunneled within in HTTP request), there will be a
1325 separate instance for each distinct interaction.
1515 separate instance for each distinct interaction.
1326
1516
1327 Consumers are expected to tell instances when events occur by calling
1517 Consumers are expected to tell instances when events occur by calling
1328 various methods. These methods return a 2-tuple describing any follow-up
1518 various methods. These methods return a 2-tuple describing any follow-up
1329 action(s) to take. The first element is the name of an action to
1519 action(s) to take. The first element is the name of an action to
1330 perform. The second is a data structure (usually a dict) specific to
1520 perform. The second is a data structure (usually a dict) specific to
1331 that action that contains more information. e.g. if the reactor wants
1521 that action that contains more information. e.g. if the reactor wants
1332 to send frames to the server, the data structure will contain a reference
1522 to send frames to the server, the data structure will contain a reference
1333 to those frames.
1523 to those frames.
1334
1524
1335 Valid actions that consumers can be instructed to take are:
1525 Valid actions that consumers can be instructed to take are:
1336
1526
1337 noop
1527 noop
1338 Indicates no additional action is required.
1528 Indicates no additional action is required.
1339
1529
1340 sendframes
1530 sendframes
1341 Indicates that frames should be sent to the server. The ``framegen``
1531 Indicates that frames should be sent to the server. The ``framegen``
1342 key contains a generator of frames that should be sent. The reactor
1532 key contains a generator of frames that should be sent. The reactor
1343 assumes that all frames in this generator are sent to the server.
1533 assumes that all frames in this generator are sent to the server.
1344
1534
1345 error
1535 error
1346 Indicates that an error occurred. The ``message`` key contains an
1536 Indicates that an error occurred. The ``message`` key contains an
1347 error message describing the failure.
1537 error message describing the failure.
1348
1538
1349 responsedata
1539 responsedata
1350 Indicates a response to a previously-issued command was received.
1540 Indicates a response to a previously-issued command was received.
1351
1541
1352 The ``request`` key contains the ``commandrequest`` instance that
1542 The ``request`` key contains the ``commandrequest`` instance that
1353 represents the request this data is for.
1543 represents the request this data is for.
1354
1544
1355 The ``data`` key contains the decoded data from the server.
1545 The ``data`` key contains the decoded data from the server.
1356
1546
1357 ``expectmore`` and ``eos`` evaluate to True when more response data
1547 ``expectmore`` and ``eos`` evaluate to True when more response data
1358 is expected to follow or we're at the end of the response stream,
1548 is expected to follow or we're at the end of the response stream,
1359 respectively.
1549 respectively.
1360 """
1550 """
1361 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1551 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1362 """Create a new instance.
1552 """Create a new instance.
1363
1553
1364 ``hasmultiplesend`` indicates whether multiple sends are supported
1554 ``hasmultiplesend`` indicates whether multiple sends are supported
1365 by the transport. When True, it is possible to send commands immediately
1555 by the transport. When True, it is possible to send commands immediately
1366 instead of buffering until the caller signals an intent to finish a
1556 instead of buffering until the caller signals an intent to finish a
1367 send operation.
1557 send operation.
1368
1558
1369 ``buffercommands`` indicates whether sends should be buffered until the
1559 ``buffercommands`` indicates whether sends should be buffered until the
1370 last request has been issued.
1560 last request has been issued.
1371 """
1561 """
1372 self._ui = ui
1562 self._ui = ui
1373 self._hasmultiplesend = hasmultiplesend
1563 self._hasmultiplesend = hasmultiplesend
1374 self._buffersends = buffersends
1564 self._buffersends = buffersends
1375
1565
1376 self._canissuecommands = True
1566 self._canissuecommands = True
1377 self._cansend = True
1567 self._cansend = True
1378
1568
1379 self._nextrequestid = 1
1569 self._nextrequestid = 1
1380 # We only support a single outgoing stream for now.
1570 # We only support a single outgoing stream for now.
1381 self._outgoingstream = outputstream(1)
1571 self._outgoingstream = outputstream(1)
1382 self._pendingrequests = collections.deque()
1572 self._pendingrequests = collections.deque()
1383 self._activerequests = {}
1573 self._activerequests = {}
1384 self._incomingstreams = {}
1574 self._incomingstreams = {}
1385 self._streamsettingsdecoders = {}
1575 self._streamsettingsdecoders = {}
1386
1576
1577 populatestreamencoders()
1578
1387 def callcommand(self, name, args, datafh=None, redirect=None):
1579 def callcommand(self, name, args, datafh=None, redirect=None):
1388 """Request that a command be executed.
1580 """Request that a command be executed.
1389
1581
1390 Receives the command name, a dict of arguments to pass to the command,
1582 Receives the command name, a dict of arguments to pass to the command,
1391 and an optional file object containing the raw data for the command.
1583 and an optional file object containing the raw data for the command.
1392
1584
1393 Returns a 3-tuple of (request, action, action data).
1585 Returns a 3-tuple of (request, action, action data).
1394 """
1586 """
1395 if not self._canissuecommands:
1587 if not self._canissuecommands:
1396 raise error.ProgrammingError('cannot issue new commands')
1588 raise error.ProgrammingError('cannot issue new commands')
1397
1589
1398 requestid = self._nextrequestid
1590 requestid = self._nextrequestid
1399 self._nextrequestid += 2
1591 self._nextrequestid += 2
1400
1592
1401 request = commandrequest(requestid, name, args, datafh=datafh,
1593 request = commandrequest(requestid, name, args, datafh=datafh,
1402 redirect=redirect)
1594 redirect=redirect)
1403
1595
1404 if self._buffersends:
1596 if self._buffersends:
1405 self._pendingrequests.append(request)
1597 self._pendingrequests.append(request)
1406 return request, 'noop', {}
1598 return request, 'noop', {}
1407 else:
1599 else:
1408 if not self._cansend:
1600 if not self._cansend:
1409 raise error.ProgrammingError('sends cannot be performed on '
1601 raise error.ProgrammingError('sends cannot be performed on '
1410 'this instance')
1602 'this instance')
1411
1603
1412 if not self._hasmultiplesend:
1604 if not self._hasmultiplesend:
1413 self._cansend = False
1605 self._cansend = False
1414 self._canissuecommands = False
1606 self._canissuecommands = False
1415
1607
1416 return request, 'sendframes', {
1608 return request, 'sendframes', {
1417 'framegen': self._makecommandframes(request),
1609 'framegen': self._makecommandframes(request),
1418 }
1610 }
1419
1611
1420 def flushcommands(self):
1612 def flushcommands(self):
1421 """Request that all queued commands be sent.
1613 """Request that all queued commands be sent.
1422
1614
1423 If any commands are buffered, this will instruct the caller to send
1615 If any commands are buffered, this will instruct the caller to send
1424 them over the wire. If no commands are buffered it instructs the client
1616 them over the wire. If no commands are buffered it instructs the client
1425 to no-op.
1617 to no-op.
1426
1618
1427 If instances aren't configured for multiple sends, no new command
1619 If instances aren't configured for multiple sends, no new command
1428 requests are allowed after this is called.
1620 requests are allowed after this is called.
1429 """
1621 """
1430 if not self._pendingrequests:
1622 if not self._pendingrequests:
1431 return 'noop', {}
1623 return 'noop', {}
1432
1624
1433 if not self._cansend:
1625 if not self._cansend:
1434 raise error.ProgrammingError('sends cannot be performed on this '
1626 raise error.ProgrammingError('sends cannot be performed on this '
1435 'instance')
1627 'instance')
1436
1628
1437 # If the instance only allows sending once, mark that we have fired
1629 # If the instance only allows sending once, mark that we have fired
1438 # our one shot.
1630 # our one shot.
1439 if not self._hasmultiplesend:
1631 if not self._hasmultiplesend:
1440 self._canissuecommands = False
1632 self._canissuecommands = False
1441 self._cansend = False
1633 self._cansend = False
1442
1634
1443 def makeframes():
1635 def makeframes():
1444 while self._pendingrequests:
1636 while self._pendingrequests:
1445 request = self._pendingrequests.popleft()
1637 request = self._pendingrequests.popleft()
1446 for frame in self._makecommandframes(request):
1638 for frame in self._makecommandframes(request):
1447 yield frame
1639 yield frame
1448
1640
1449 return 'sendframes', {
1641 return 'sendframes', {
1450 'framegen': makeframes(),
1642 'framegen': makeframes(),
1451 }
1643 }
1452
1644
1453 def _makecommandframes(self, request):
1645 def _makecommandframes(self, request):
1454 """Emit frames to issue a command request.
1646 """Emit frames to issue a command request.
1455
1647
1456 As a side-effect, update request accounting to reflect its changed
1648 As a side-effect, update request accounting to reflect its changed
1457 state.
1649 state.
1458 """
1650 """
1459 self._activerequests[request.requestid] = request
1651 self._activerequests[request.requestid] = request
1460 request.state = 'sending'
1652 request.state = 'sending'
1461
1653
1462 res = createcommandframes(self._outgoingstream,
1654 res = createcommandframes(self._outgoingstream,
1463 request.requestid,
1655 request.requestid,
1464 request.name,
1656 request.name,
1465 request.args,
1657 request.args,
1466 datafh=request.datafh,
1658 datafh=request.datafh,
1467 redirect=request.redirect)
1659 redirect=request.redirect)
1468
1660
1469 for frame in res:
1661 for frame in res:
1470 yield frame
1662 yield frame
1471
1663
1472 request.state = 'sent'
1664 request.state = 'sent'
1473
1665
1474 def onframerecv(self, frame):
1666 def onframerecv(self, frame):
1475 """Process a frame that has been received off the wire.
1667 """Process a frame that has been received off the wire.
1476
1668
1477 Returns a 2-tuple of (action, meta) describing further action the
1669 Returns a 2-tuple of (action, meta) describing further action the
1478 caller needs to take as a result of receiving this frame.
1670 caller needs to take as a result of receiving this frame.
1479 """
1671 """
1480 if frame.streamid % 2:
1672 if frame.streamid % 2:
1481 return 'error', {
1673 return 'error', {
1482 'message': (
1674 'message': (
1483 _('received frame with odd numbered stream ID: %d') %
1675 _('received frame with odd numbered stream ID: %d') %
1484 frame.streamid),
1676 frame.streamid),
1485 }
1677 }
1486
1678
1487 if frame.streamid not in self._incomingstreams:
1679 if frame.streamid not in self._incomingstreams:
1488 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1680 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1489 return 'error', {
1681 return 'error', {
1490 'message': _('received frame on unknown stream '
1682 'message': _('received frame on unknown stream '
1491 'without beginning of stream flag set'),
1683 'without beginning of stream flag set'),
1492 }
1684 }
1493
1685
1494 self._incomingstreams[frame.streamid] = inputstream(
1686 self._incomingstreams[frame.streamid] = inputstream(
1495 frame.streamid)
1687 frame.streamid)
1496
1688
1689 stream = self._incomingstreams[frame.streamid]
1690
1691 # If the payload is encoded, ask the stream to decode it. We
1692 # merely substitute the decoded result into the frame payload as
1693 # if it had been transferred all along.
1497 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1694 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1498 raise error.ProgrammingError('support for decoding stream '
1695 frame.payload = stream.decode(frame.payload)
1499 'payloads not yet implemneted')
1500
1696
1501 if frame.streamflags & STREAM_FLAG_END_STREAM:
1697 if frame.streamflags & STREAM_FLAG_END_STREAM:
1502 del self._incomingstreams[frame.streamid]
1698 del self._incomingstreams[frame.streamid]
1503
1699
1504 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1700 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1505 return self._onstreamsettingsframe(frame)
1701 return self._onstreamsettingsframe(frame)
1506
1702
1507 if frame.requestid not in self._activerequests:
1703 if frame.requestid not in self._activerequests:
1508 return 'error', {
1704 return 'error', {
1509 'message': (_('received frame for inactive request ID: %d') %
1705 'message': (_('received frame for inactive request ID: %d') %
1510 frame.requestid),
1706 frame.requestid),
1511 }
1707 }
1512
1708
1513 request = self._activerequests[frame.requestid]
1709 request = self._activerequests[frame.requestid]
1514 request.state = 'receiving'
1710 request.state = 'receiving'
1515
1711
1516 handlers = {
1712 handlers = {
1517 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1713 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1518 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1714 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1519 }
1715 }
1520
1716
1521 meth = handlers.get(frame.typeid)
1717 meth = handlers.get(frame.typeid)
1522 if not meth:
1718 if not meth:
1523 raise error.ProgrammingError('unhandled frame type: %d' %
1719 raise error.ProgrammingError('unhandled frame type: %d' %
1524 frame.typeid)
1720 frame.typeid)
1525
1721
1526 return meth(request, frame)
1722 return meth(request, frame)
1527
1723
1528 def _onstreamsettingsframe(self, frame):
1724 def _onstreamsettingsframe(self, frame):
1529 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1725 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1530
1726
1531 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1727 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1532 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1728 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1533
1729
1534 if more and eos:
1730 if more and eos:
1535 return 'error', {
1731 return 'error', {
1536 'message': (_('stream encoding settings frame cannot have both '
1732 'message': (_('stream encoding settings frame cannot have both '
1537 'continuation and end of stream flags set')),
1733 'continuation and end of stream flags set')),
1538 }
1734 }
1539
1735
1540 if not more and not eos:
1736 if not more and not eos:
1541 return 'error', {
1737 return 'error', {
1542 'message': _('stream encoding settings frame must have '
1738 'message': _('stream encoding settings frame must have '
1543 'continuation or end of stream flag set'),
1739 'continuation or end of stream flag set'),
1544 }
1740 }
1545
1741
1546 if frame.streamid not in self._streamsettingsdecoders:
1742 if frame.streamid not in self._streamsettingsdecoders:
1547 decoder = cborutil.bufferingdecoder()
1743 decoder = cborutil.bufferingdecoder()
1548 self._streamsettingsdecoders[frame.streamid] = decoder
1744 self._streamsettingsdecoders[frame.streamid] = decoder
1549
1745
1550 decoder = self._streamsettingsdecoders[frame.streamid]
1746 decoder = self._streamsettingsdecoders[frame.streamid]
1551
1747
1552 try:
1748 try:
1553 decoder.decode(frame.payload)
1749 decoder.decode(frame.payload)
1554 except Exception as e:
1750 except Exception as e:
1555 return 'error', {
1751 return 'error', {
1556 'message': (_('error decoding CBOR from stream encoding '
1752 'message': (_('error decoding CBOR from stream encoding '
1557 'settings frame: %s') %
1753 'settings frame: %s') %
1558 stringutil.forcebytestr(e)),
1754 stringutil.forcebytestr(e)),
1559 }
1755 }
1560
1756
1561 if more:
1757 if more:
1562 return 'noop', {}
1758 return 'noop', {}
1563
1759
1564 assert eos
1760 assert eos
1565
1761
1566 decoded = decoder.getavailable()
1762 decoded = decoder.getavailable()
1567 del self._streamsettingsdecoders[frame.streamid]
1763 del self._streamsettingsdecoders[frame.streamid]
1568
1764
1569 if not decoded:
1765 if not decoded:
1570 return 'error', {
1766 return 'error', {
1571 'message': _('stream encoding settings frame did not contain '
1767 'message': _('stream encoding settings frame did not contain '
1572 'CBOR data'),
1768 'CBOR data'),
1573 }
1769 }
1574
1770
1575 try:
1771 try:
1576 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1772 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1773 decoded[0],
1577 decoded[1:])
1774 decoded[1:])
1578 except Exception as e:
1775 except Exception as e:
1579 return 'error', {
1776 return 'error', {
1580 'message': (_('error setting stream decoder: %s') %
1777 'message': (_('error setting stream decoder: %s') %
1581 stringutil.forcebytestr(e)),
1778 stringutil.forcebytestr(e)),
1582 }
1779 }
1583
1780
1584 return 'noop', {}
1781 return 'noop', {}
1585
1782
1586 def _oncommandresponseframe(self, request, frame):
1783 def _oncommandresponseframe(self, request, frame):
1587 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1784 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1588 request.state = 'received'
1785 request.state = 'received'
1589 del self._activerequests[request.requestid]
1786 del self._activerequests[request.requestid]
1590
1787
1591 return 'responsedata', {
1788 return 'responsedata', {
1592 'request': request,
1789 'request': request,
1593 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1790 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1594 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1791 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1595 'data': frame.payload,
1792 'data': frame.payload,
1596 }
1793 }
1597
1794
1598 def _onerrorresponseframe(self, request, frame):
1795 def _onerrorresponseframe(self, request, frame):
1599 request.state = 'errored'
1796 request.state = 'errored'
1600 del self._activerequests[request.requestid]
1797 del self._activerequests[request.requestid]
1601
1798
1602 # The payload should be a CBOR map.
1799 # The payload should be a CBOR map.
1603 m = cborutil.decodeall(frame.payload)[0]
1800 m = cborutil.decodeall(frame.payload)[0]
1604
1801
1605 return 'error', {
1802 return 'error', {
1606 'request': request,
1803 'request': request,
1607 'type': m['type'],
1804 'type': m['type'],
1608 'message': m['message'],
1805 'message': m['message'],
1609 }
1806 }
@@ -1,291 +1,604 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import unittest
3 import unittest
4 import zlib
4
5
5 from mercurial import (
6 from mercurial import (
6 error,
7 error,
7 ui as uimod,
8 ui as uimod,
8 wireprotoframing as framing,
9 wireprotoframing as framing,
9 )
10 )
10 from mercurial.utils import (
11 from mercurial.utils import (
11 cborutil,
12 cborutil,
12 )
13 )
13
14
15 try:
16 from mercurial import zstd
17 zstd.__version__
18 except ImportError:
19 zstd = None
20
14 ffs = framing.makeframefromhumanstring
21 ffs = framing.makeframefromhumanstring
15
22
16 globalui = uimod.ui()
23 globalui = uimod.ui()
17
24
18 def sendframe(reactor, frame):
25 def sendframe(reactor, frame):
19 """Send a frame bytearray to a reactor."""
26 """Send a frame bytearray to a reactor."""
20 header = framing.parseheader(frame)
27 header = framing.parseheader(frame)
21 payload = frame[framing.FRAME_HEADER_SIZE:]
28 payload = frame[framing.FRAME_HEADER_SIZE:]
22 assert len(payload) == header.length
29 assert len(payload) == header.length
23
30
24 return reactor.onframerecv(framing.frame(header.requestid,
31 return reactor.onframerecv(framing.frame(header.requestid,
25 header.streamid,
32 header.streamid,
26 header.streamflags,
33 header.streamflags,
27 header.typeid,
34 header.typeid,
28 header.flags,
35 header.flags,
29 payload))
36 payload))
30
37
31 class SingleSendTests(unittest.TestCase):
38 class SingleSendTests(unittest.TestCase):
32 """A reactor that can only send once rejects subsequent sends."""
39 """A reactor that can only send once rejects subsequent sends."""
33
40
34 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
41 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
35 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
42 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
36 # the regex version.
43 # the regex version.
37 assertRaisesRegex = (# camelcase-required
44 assertRaisesRegex = (# camelcase-required
38 unittest.TestCase.assertRaisesRegexp)
45 unittest.TestCase.assertRaisesRegexp)
39
46
40 def testbasic(self):
47 def testbasic(self):
41 reactor = framing.clientreactor(globalui,
48 reactor = framing.clientreactor(globalui,
42 hasmultiplesend=False,
49 hasmultiplesend=False,
43 buffersends=True)
50 buffersends=True)
44
51
45 request, action, meta = reactor.callcommand(b'foo', {})
52 request, action, meta = reactor.callcommand(b'foo', {})
46 self.assertEqual(request.state, b'pending')
53 self.assertEqual(request.state, b'pending')
47 self.assertEqual(action, b'noop')
54 self.assertEqual(action, b'noop')
48
55
49 action, meta = reactor.flushcommands()
56 action, meta = reactor.flushcommands()
50 self.assertEqual(action, b'sendframes')
57 self.assertEqual(action, b'sendframes')
51
58
52 for frame in meta[b'framegen']:
59 for frame in meta[b'framegen']:
53 self.assertEqual(request.state, b'sending')
60 self.assertEqual(request.state, b'sending')
54
61
55 self.assertEqual(request.state, b'sent')
62 self.assertEqual(request.state, b'sent')
56
63
57 with self.assertRaisesRegex(error.ProgrammingError,
64 with self.assertRaisesRegex(error.ProgrammingError,
58 'cannot issue new commands'):
65 'cannot issue new commands'):
59 reactor.callcommand(b'foo', {})
66 reactor.callcommand(b'foo', {})
60
67
61 with self.assertRaisesRegex(error.ProgrammingError,
68 with self.assertRaisesRegex(error.ProgrammingError,
62 'cannot issue new commands'):
69 'cannot issue new commands'):
63 reactor.callcommand(b'foo', {})
70 reactor.callcommand(b'foo', {})
64
71
65 class NoBufferTests(unittest.TestCase):
72 class NoBufferTests(unittest.TestCase):
66 """A reactor without send buffering sends requests immediately."""
73 """A reactor without send buffering sends requests immediately."""
67 def testbasic(self):
74 def testbasic(self):
68 reactor = framing.clientreactor(globalui,
75 reactor = framing.clientreactor(globalui,
69 hasmultiplesend=True,
76 hasmultiplesend=True,
70 buffersends=False)
77 buffersends=False)
71
78
72 request, action, meta = reactor.callcommand(b'command1', {})
79 request, action, meta = reactor.callcommand(b'command1', {})
73 self.assertEqual(request.requestid, 1)
80 self.assertEqual(request.requestid, 1)
74 self.assertEqual(action, b'sendframes')
81 self.assertEqual(action, b'sendframes')
75
82
76 self.assertEqual(request.state, b'pending')
83 self.assertEqual(request.state, b'pending')
77
84
78 for frame in meta[b'framegen']:
85 for frame in meta[b'framegen']:
79 self.assertEqual(request.state, b'sending')
86 self.assertEqual(request.state, b'sending')
80
87
81 self.assertEqual(request.state, b'sent')
88 self.assertEqual(request.state, b'sent')
82
89
83 action, meta = reactor.flushcommands()
90 action, meta = reactor.flushcommands()
84 self.assertEqual(action, b'noop')
91 self.assertEqual(action, b'noop')
85
92
86 # And we can send another command.
93 # And we can send another command.
87 request, action, meta = reactor.callcommand(b'command2', {})
94 request, action, meta = reactor.callcommand(b'command2', {})
88 self.assertEqual(request.requestid, 3)
95 self.assertEqual(request.requestid, 3)
89 self.assertEqual(action, b'sendframes')
96 self.assertEqual(action, b'sendframes')
90
97
91 for frame in meta[b'framegen']:
98 for frame in meta[b'framegen']:
92 self.assertEqual(request.state, b'sending')
99 self.assertEqual(request.state, b'sending')
93
100
94 self.assertEqual(request.state, b'sent')
101 self.assertEqual(request.state, b'sent')
95
102
96 class BadFrameRecvTests(unittest.TestCase):
103 class BadFrameRecvTests(unittest.TestCase):
97 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
104 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
98 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
105 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
99 # the regex version.
106 # the regex version.
100 assertRaisesRegex = (# camelcase-required
107 assertRaisesRegex = (# camelcase-required
101 unittest.TestCase.assertRaisesRegexp)
108 unittest.TestCase.assertRaisesRegexp)
102
109
103 def testoddstream(self):
110 def testoddstream(self):
104 reactor = framing.clientreactor(globalui)
111 reactor = framing.clientreactor(globalui)
105
112
106 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
113 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
107 self.assertEqual(action, b'error')
114 self.assertEqual(action, b'error')
108 self.assertEqual(meta[b'message'],
115 self.assertEqual(meta[b'message'],
109 b'received frame with odd numbered stream ID: 1')
116 b'received frame with odd numbered stream ID: 1')
110
117
111 def testunknownstream(self):
118 def testunknownstream(self):
112 reactor = framing.clientreactor(globalui)
119 reactor = framing.clientreactor(globalui)
113
120
114 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
121 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
115 self.assertEqual(action, b'error')
122 self.assertEqual(action, b'error')
116 self.assertEqual(meta[b'message'],
123 self.assertEqual(meta[b'message'],
117 b'received frame on unknown stream without beginning '
124 b'received frame on unknown stream without beginning '
118 b'of stream flag set')
125 b'of stream flag set')
119
126
120 def testunhandledframetype(self):
127 def testunhandledframetype(self):
121 reactor = framing.clientreactor(globalui, buffersends=False)
128 reactor = framing.clientreactor(globalui, buffersends=False)
122
129
123 request, action, meta = reactor.callcommand(b'foo', {})
130 request, action, meta = reactor.callcommand(b'foo', {})
124 for frame in meta[b'framegen']:
131 for frame in meta[b'framegen']:
125 pass
132 pass
126
133
127 with self.assertRaisesRegex(error.ProgrammingError,
134 with self.assertRaisesRegex(error.ProgrammingError,
128 'unhandled frame type'):
135 'unhandled frame type'):
129 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
136 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
130
137
131 class StreamTests(unittest.TestCase):
138 class StreamTests(unittest.TestCase):
132 def testmultipleresponseframes(self):
139 def testmultipleresponseframes(self):
133 reactor = framing.clientreactor(globalui, buffersends=False)
140 reactor = framing.clientreactor(globalui, buffersends=False)
134
141
135 request, action, meta = reactor.callcommand(b'foo', {})
142 request, action, meta = reactor.callcommand(b'foo', {})
136
143
137 self.assertEqual(action, b'sendframes')
144 self.assertEqual(action, b'sendframes')
138 for f in meta[b'framegen']:
145 for f in meta[b'framegen']:
139 pass
146 pass
140
147
141 action, meta = sendframe(
148 action, meta = sendframe(
142 reactor,
149 reactor,
143 ffs(b'%d 0 stream-begin command-response 0 foo' %
150 ffs(b'%d 0 stream-begin command-response 0 foo' %
144 request.requestid))
151 request.requestid))
145 self.assertEqual(action, b'responsedata')
152 self.assertEqual(action, b'responsedata')
146
153
147 action, meta = sendframe(
154 action, meta = sendframe(
148 reactor,
155 reactor,
149 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
156 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
150 self.assertEqual(action, b'responsedata')
157 self.assertEqual(action, b'responsedata')
151
158
152 class RedirectTests(unittest.TestCase):
159 class RedirectTests(unittest.TestCase):
153 def testredirect(self):
160 def testredirect(self):
154 reactor = framing.clientreactor(globalui, buffersends=False)
161 reactor = framing.clientreactor(globalui, buffersends=False)
155
162
156 redirect = {
163 redirect = {
157 b'targets': [b'a', b'b'],
164 b'targets': [b'a', b'b'],
158 b'hashes': [b'sha256'],
165 b'hashes': [b'sha256'],
159 }
166 }
160
167
161 request, action, meta = reactor.callcommand(
168 request, action, meta = reactor.callcommand(
162 b'foo', {}, redirect=redirect)
169 b'foo', {}, redirect=redirect)
163
170
164 self.assertEqual(action, b'sendframes')
171 self.assertEqual(action, b'sendframes')
165
172
166 frames = list(meta[b'framegen'])
173 frames = list(meta[b'framegen'])
167 self.assertEqual(len(frames), 1)
174 self.assertEqual(len(frames), 1)
168
175
169 self.assertEqual(frames[0],
176 self.assertEqual(frames[0],
170 ffs(b'1 1 stream-begin command-request new '
177 ffs(b'1 1 stream-begin command-request new '
171 b"cbor:{b'name': b'foo', "
178 b"cbor:{b'name': b'foo', "
172 b"b'redirect': {b'targets': [b'a', b'b'], "
179 b"b'redirect': {b'targets': [b'a', b'b'], "
173 b"b'hashes': [b'sha256']}}"))
180 b"b'hashes': [b'sha256']}}"))
174
181
175 class StreamSettingsTests(unittest.TestCase):
182 class StreamSettingsTests(unittest.TestCase):
176 def testnoflags(self):
183 def testnoflags(self):
177 reactor = framing.clientreactor(globalui, buffersends=False)
184 reactor = framing.clientreactor(globalui, buffersends=False)
178
185
179 request, action, meta = reactor.callcommand(b'foo', {})
186 request, action, meta = reactor.callcommand(b'foo', {})
180 for f in meta[b'framegen']:
187 for f in meta[b'framegen']:
181 pass
188 pass
182
189
183 action, meta = sendframe(reactor,
190 action, meta = sendframe(reactor,
184 ffs(b'1 2 stream-begin stream-settings 0 '))
191 ffs(b'1 2 stream-begin stream-settings 0 '))
185
192
186 self.assertEqual(action, b'error')
193 self.assertEqual(action, b'error')
187 self.assertEqual(meta, {
194 self.assertEqual(meta, {
188 b'message': b'stream encoding settings frame must have '
195 b'message': b'stream encoding settings frame must have '
189 b'continuation or end of stream flag set',
196 b'continuation or end of stream flag set',
190 })
197 })
191
198
192 def testconflictflags(self):
199 def testconflictflags(self):
193 reactor = framing.clientreactor(globalui, buffersends=False)
200 reactor = framing.clientreactor(globalui, buffersends=False)
194
201
195 request, action, meta = reactor.callcommand(b'foo', {})
202 request, action, meta = reactor.callcommand(b'foo', {})
196 for f in meta[b'framegen']:
203 for f in meta[b'framegen']:
197 pass
204 pass
198
205
199 action, meta = sendframe(reactor,
206 action, meta = sendframe(reactor,
200 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
207 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
201
208
202 self.assertEqual(action, b'error')
209 self.assertEqual(action, b'error')
203 self.assertEqual(meta, {
210 self.assertEqual(meta, {
204 b'message': b'stream encoding settings frame cannot have both '
211 b'message': b'stream encoding settings frame cannot have both '
205 b'continuation and end of stream flags set',
212 b'continuation and end of stream flags set',
206 })
213 })
207
214
208 def testemptypayload(self):
215 def testemptypayload(self):
209 reactor = framing.clientreactor(globalui, buffersends=False)
216 reactor = framing.clientreactor(globalui, buffersends=False)
210
217
211 request, action, meta = reactor.callcommand(b'foo', {})
218 request, action, meta = reactor.callcommand(b'foo', {})
212 for f in meta[b'framegen']:
219 for f in meta[b'framegen']:
213 pass
220 pass
214
221
215 action, meta = sendframe(reactor,
222 action, meta = sendframe(reactor,
216 ffs(b'1 2 stream-begin stream-settings eos '))
223 ffs(b'1 2 stream-begin stream-settings eos '))
217
224
218 self.assertEqual(action, b'error')
225 self.assertEqual(action, b'error')
219 self.assertEqual(meta, {
226 self.assertEqual(meta, {
220 b'message': b'stream encoding settings frame did not contain '
227 b'message': b'stream encoding settings frame did not contain '
221 b'CBOR data'
228 b'CBOR data'
222 })
229 })
223
230
224 def testbadcbor(self):
231 def testbadcbor(self):
225 reactor = framing.clientreactor(globalui, buffersends=False)
232 reactor = framing.clientreactor(globalui, buffersends=False)
226
233
227 request, action, meta = reactor.callcommand(b'foo', {})
234 request, action, meta = reactor.callcommand(b'foo', {})
228 for f in meta[b'framegen']:
235 for f in meta[b'framegen']:
229 pass
236 pass
230
237
231 action, meta = sendframe(reactor,
238 action, meta = sendframe(reactor,
232 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
239 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
233
240
234 self.assertEqual(action, b'error')
241 self.assertEqual(action, b'error')
235
242
236 def testsingleobject(self):
243 def testsingleobject(self):
237 reactor = framing.clientreactor(globalui, buffersends=False)
244 reactor = framing.clientreactor(globalui, buffersends=False)
238
245
239 request, action, meta = reactor.callcommand(b'foo', {})
246 request, action, meta = reactor.callcommand(b'foo', {})
240 for f in meta[b'framegen']:
247 for f in meta[b'framegen']:
241 pass
248 pass
242
249
243 action, meta = sendframe(reactor,
250 action, meta = sendframe(reactor,
244 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
251 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
245
252
246 self.assertEqual(action, b'noop')
253 self.assertEqual(action, b'noop')
247 self.assertEqual(meta, {})
254 self.assertEqual(meta, {})
248
255
249 def testmultipleobjects(self):
256 def testmultipleobjects(self):
250 reactor = framing.clientreactor(globalui, buffersends=False)
257 reactor = framing.clientreactor(globalui, buffersends=False)
251
258
252 request, action, meta = reactor.callcommand(b'foo', {})
259 request, action, meta = reactor.callcommand(b'foo', {})
253 for f in meta[b'framegen']:
260 for f in meta[b'framegen']:
254 pass
261 pass
255
262
256 data = b''.join([
263 data = b''.join([
257 b''.join(cborutil.streamencode(b'identity')),
264 b''.join(cborutil.streamencode(b'identity')),
258 b''.join(cborutil.streamencode({b'foo', b'bar'})),
265 b''.join(cborutil.streamencode({b'foo', b'bar'})),
259 ])
266 ])
260
267
261 action, meta = sendframe(reactor,
268 action, meta = sendframe(reactor,
262 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
269 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
263
270
264 self.assertEqual(action, b'noop')
271 self.assertEqual(action, b'error')
265 self.assertEqual(meta, {})
272 self.assertEqual(meta, {
273 b'message': b'error setting stream decoder: identity decoder '
274 b'received unexpected additional values',
275 })
266
276
267 def testmultipleframes(self):
277 def testmultipleframes(self):
268 reactor = framing.clientreactor(globalui, buffersends=False)
278 reactor = framing.clientreactor(globalui, buffersends=False)
269
279
270 request, action, meta = reactor.callcommand(b'foo', {})
280 request, action, meta = reactor.callcommand(b'foo', {})
271 for f in meta[b'framegen']:
281 for f in meta[b'framegen']:
272 pass
282 pass
273
283
274 data = b''.join(cborutil.streamencode(b'identity'))
284 data = b''.join(cborutil.streamencode(b'identity'))
275
285
276 action, meta = sendframe(reactor,
286 action, meta = sendframe(reactor,
277 ffs(b'1 2 stream-begin stream-settings continuation %s' %
287 ffs(b'1 2 stream-begin stream-settings continuation %s' %
278 data[0:3]))
288 data[0:3]))
279
289
280 self.assertEqual(action, b'noop')
290 self.assertEqual(action, b'noop')
281 self.assertEqual(meta, {})
291 self.assertEqual(meta, {})
282
292
283 action, meta = sendframe(reactor,
293 action, meta = sendframe(reactor,
284 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
294 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
285
295
286 self.assertEqual(action, b'noop')
296 self.assertEqual(action, b'noop')
287 self.assertEqual(meta, {})
297 self.assertEqual(meta, {})
288
298
299 def testinvalidencoder(self):
300 reactor = framing.clientreactor(globalui, buffersends=False)
301
302 request, action, meta = reactor.callcommand(b'foo', {})
303 for f in meta[b'framegen']:
304 pass
305
306 action, meta = sendframe(reactor,
307 ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'))
308
309 self.assertEqual(action, b'error')
310 self.assertEqual(meta, {
311 b'message': b'error setting stream decoder: unknown stream '
312 b'decoder: badvalue',
313 })
314
315 def testzlibencoding(self):
316 reactor = framing.clientreactor(globalui, buffersends=False)
317
318 request, action, meta = reactor.callcommand(b'foo', {})
319 for f in meta[b'framegen']:
320 pass
321
322 action, meta = sendframe(reactor,
323 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
324 request.requestid))
325
326 self.assertEqual(action, b'noop')
327 self.assertEqual(meta, {})
328
329 result = {
330 b'status': b'ok',
331 }
332 encoded = b''.join(cborutil.streamencode(result))
333
334 compressed = zlib.compress(encoded)
335 self.assertEqual(zlib.decompress(compressed), encoded)
336
337 action, meta = sendframe(reactor,
338 ffs(b'%d 2 encoded command-response eos %s' %
339 (request.requestid, compressed)))
340
341 self.assertEqual(action, b'responsedata')
342 self.assertEqual(meta[b'data'], encoded)
343
344 def testzlibencodingsinglebyteframes(self):
345 reactor = framing.clientreactor(globalui, buffersends=False)
346
347 request, action, meta = reactor.callcommand(b'foo', {})
348 for f in meta[b'framegen']:
349 pass
350
351 action, meta = sendframe(reactor,
352 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
353 request.requestid))
354
355 self.assertEqual(action, b'noop')
356 self.assertEqual(meta, {})
357
358 result = {
359 b'status': b'ok',
360 }
361 encoded = b''.join(cborutil.streamencode(result))
362
363 compressed = zlib.compress(encoded)
364 self.assertEqual(zlib.decompress(compressed), encoded)
365
366 chunks = []
367
368 for i in range(len(compressed)):
369 char = compressed[i:i + 1]
370 if char == b'\\':
371 char = b'\\\\'
372 action, meta = sendframe(reactor,
373 ffs(b'%d 2 encoded command-response continuation %s' %
374 (request.requestid, char)))
375
376 self.assertEqual(action, b'responsedata')
377 chunks.append(meta[b'data'])
378 self.assertTrue(meta[b'expectmore'])
379 self.assertFalse(meta[b'eos'])
380
381 # zlib will have the full data decoded at this point, even though
382 # we haven't flushed.
383 self.assertEqual(b''.join(chunks), encoded)
384
385 # End the stream for good measure.
386 action, meta = sendframe(reactor,
387 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
388
389 self.assertEqual(action, b'responsedata')
390 self.assertEqual(meta[b'data'], b'')
391 self.assertFalse(meta[b'expectmore'])
392 self.assertTrue(meta[b'eos'])
393
394 def testzlibmultipleresponses(self):
395 # We feed in zlib compressed data on the same stream but belonging to
396 # 2 different requests. This tests our flushing behavior.
397 reactor = framing.clientreactor(globalui, buffersends=False,
398 hasmultiplesend=True)
399
400 request1, action, meta = reactor.callcommand(b'foo', {})
401 for f in meta[b'framegen']:
402 pass
403
404 request2, action, meta = reactor.callcommand(b'foo', {})
405 for f in meta[b'framegen']:
406 pass
407
408 outstream = framing.outputstream(2)
409 outstream.setencoder(globalui, b'zlib')
410
411 response1 = b''.join(cborutil.streamencode({
412 b'status': b'ok',
413 b'extra': b'response1' * 10,
414 }))
415
416 response2 = b''.join(cborutil.streamencode({
417 b'status': b'error',
418 b'extra': b'response2' * 10,
419 }))
420
421 action, meta = sendframe(reactor,
422 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zlib"' %
423 request1.requestid))
424
425 self.assertEqual(action, b'noop')
426 self.assertEqual(meta, {})
427
428 # Feeding partial data in won't get anything useful out.
429 action, meta = sendframe(reactor,
430 ffs(b'%d 2 encoded command-response continuation %s' % (
431 request1.requestid, outstream.encode(response1))))
432 self.assertEqual(action, b'responsedata')
433 self.assertEqual(meta[b'data'], b'')
434
435 # But flushing data at both ends will get our original data.
436 action, meta = sendframe(reactor,
437 ffs(b'%d 2 encoded command-response eos %s' % (
438 request1.requestid, outstream.flush())))
439 self.assertEqual(action, b'responsedata')
440 self.assertEqual(meta[b'data'], response1)
441
442 # We should be able to reuse the compressor/decompressor for the
443 # 2nd response.
444 action, meta = sendframe(reactor,
445 ffs(b'%d 2 encoded command-response continuation %s' % (
446 request2.requestid, outstream.encode(response2))))
447 self.assertEqual(action, b'responsedata')
448 self.assertEqual(meta[b'data'], b'')
449
450 action, meta = sendframe(reactor,
451 ffs(b'%d 2 encoded command-response eos %s' % (
452 request2.requestid, outstream.flush())))
453 self.assertEqual(action, b'responsedata')
454 self.assertEqual(meta[b'data'], response2)
455
456 @unittest.skipUnless(zstd, 'zstd not available')
457 def testzstd8mbencoding(self):
458 reactor = framing.clientreactor(globalui, buffersends=False)
459
460 request, action, meta = reactor.callcommand(b'foo', {})
461 for f in meta[b'framegen']:
462 pass
463
464 action, meta = sendframe(reactor,
465 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
466 request.requestid))
467
468 self.assertEqual(action, b'noop')
469 self.assertEqual(meta, {})
470
471 result = {
472 b'status': b'ok',
473 }
474 encoded = b''.join(cborutil.streamencode(result))
475
476 encoder = framing.zstd8mbencoder(globalui)
477 compressed = encoder.encode(encoded) + encoder.finish()
478 self.assertEqual(zstd.ZstdDecompressor().decompress(
479 compressed, max_output_size=len(encoded)), encoded)
480
481 action, meta = sendframe(reactor,
482 ffs(b'%d 2 encoded command-response eos %s' %
483 (request.requestid, compressed)))
484
485 self.assertEqual(action, b'responsedata')
486 self.assertEqual(meta[b'data'], encoded)
487
488 @unittest.skipUnless(zstd, 'zstd not available')
489 def testzstd8mbencodingsinglebyteframes(self):
490 reactor = framing.clientreactor(globalui, buffersends=False)
491
492 request, action, meta = reactor.callcommand(b'foo', {})
493 for f in meta[b'framegen']:
494 pass
495
496 action, meta = sendframe(reactor,
497 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
498 request.requestid))
499
500 self.assertEqual(action, b'noop')
501 self.assertEqual(meta, {})
502
503 result = {
504 b'status': b'ok',
505 }
506 encoded = b''.join(cborutil.streamencode(result))
507
508 compressed = zstd.ZstdCompressor().compress(encoded)
509 self.assertEqual(zstd.ZstdDecompressor().decompress(compressed),
510 encoded)
511
512 chunks = []
513
514 for i in range(len(compressed)):
515 char = compressed[i:i + 1]
516 if char == b'\\':
517 char = b'\\\\'
518 action, meta = sendframe(reactor,
519 ffs(b'%d 2 encoded command-response continuation %s' %
520 (request.requestid, char)))
521
522 self.assertEqual(action, b'responsedata')
523 chunks.append(meta[b'data'])
524 self.assertTrue(meta[b'expectmore'])
525 self.assertFalse(meta[b'eos'])
526
527 # zstd decompressor will flush at frame boundaries.
528 self.assertEqual(b''.join(chunks), encoded)
529
530 # End the stream for good measure.
531 action, meta = sendframe(reactor,
532 ffs(b'%d 2 stream-end command-response eos ' % request.requestid))
533
534 self.assertEqual(action, b'responsedata')
535 self.assertEqual(meta[b'data'], b'')
536 self.assertFalse(meta[b'expectmore'])
537 self.assertTrue(meta[b'eos'])
538
539 @unittest.skipUnless(zstd, 'zstd not available')
540 def testzstd8mbmultipleresponses(self):
541 # We feed in zstd compressed data on the same stream but belonging to
542 # 2 different requests. This tests our flushing behavior.
543 reactor = framing.clientreactor(globalui, buffersends=False,
544 hasmultiplesend=True)
545
546 request1, action, meta = reactor.callcommand(b'foo', {})
547 for f in meta[b'framegen']:
548 pass
549
550 request2, action, meta = reactor.callcommand(b'foo', {})
551 for f in meta[b'framegen']:
552 pass
553
554 outstream = framing.outputstream(2)
555 outstream.setencoder(globalui, b'zstd-8mb')
556
557 response1 = b''.join(cborutil.streamencode({
558 b'status': b'ok',
559 b'extra': b'response1' * 10,
560 }))
561
562 response2 = b''.join(cborutil.streamencode({
563 b'status': b'error',
564 b'extra': b'response2' * 10,
565 }))
566
567 action, meta = sendframe(reactor,
568 ffs(b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"' %
569 request1.requestid))
570
571 self.assertEqual(action, b'noop')
572 self.assertEqual(meta, {})
573
574 # Feeding partial data in won't get anything useful out.
575 action, meta = sendframe(reactor,
576 ffs(b'%d 2 encoded command-response continuation %s' % (
577 request1.requestid, outstream.encode(response1))))
578 self.assertEqual(action, b'responsedata')
579 self.assertEqual(meta[b'data'], b'')
580
581 # But flushing data at both ends will get our original data.
582 action, meta = sendframe(reactor,
583 ffs(b'%d 2 encoded command-response eos %s' % (
584 request1.requestid, outstream.flush())))
585 self.assertEqual(action, b'responsedata')
586 self.assertEqual(meta[b'data'], response1)
587
588 # We should be able to reuse the compressor/decompressor for the
589 # 2nd response.
590 action, meta = sendframe(reactor,
591 ffs(b'%d 2 encoded command-response continuation %s' % (
592 request2.requestid, outstream.encode(response2))))
593 self.assertEqual(action, b'responsedata')
594 self.assertEqual(meta[b'data'], b'')
595
596 action, meta = sendframe(reactor,
597 ffs(b'%d 2 encoded command-response eos %s' % (
598 request2.requestid, outstream.flush())))
599 self.assertEqual(action, b'responsedata')
600 self.assertEqual(meta[b'data'], response2)
601
289 if __name__ == '__main__':
602 if __name__ == '__main__':
290 import silenttestrunner
603 import silenttestrunner
291 silenttestrunner.main(__name__)
604 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now