##// END OF EJS Templates
wireprotov2: remove functions for creating response frames from bytes...
Gregory Szorc -
r40170:966b5f7f default
parent child Browse files
Show More
@@ -1,1827 +1,1738 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
371
372 Returns a generator of bytearrays.
373 """
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
378
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
385 payload=overall + data)
386 return
387
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
394
395 offset = 0
396 while True:
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
399 done = offset == len(data)
400
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
409 payload=chunk)
410
411 if done:
412 break
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
417
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
421 """
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
425 while True:
426 chunk = cb.read(maxframesize)
427 if not chunk:
428 break
429
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
433 payload=chunk)
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
437 def createcommandresponseokframe(stream, requestid):
368 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
370
440 return stream.makeframe(requestid=requestid,
371 return stream.makeframe(requestid=requestid,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 payload=overall)
374 payload=overall)
444
375
445 def createcommandresponseeosframe(stream, requestid):
376 def createcommandresponseeosframe(stream, requestid):
446 """Create an empty payload frame representing command end-of-stream."""
377 """Create an empty payload frame representing command end-of-stream."""
447 return stream.makeframe(requestid=requestid,
378 return stream.makeframe(requestid=requestid,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
379 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
380 flags=FLAG_COMMAND_RESPONSE_EOS,
450 payload=b'')
381 payload=b'')
451
382
452 def createalternatelocationresponseframe(stream, requestid, location):
383 def createalternatelocationresponseframe(stream, requestid, location):
453 data = {
384 data = {
454 b'status': b'redirect',
385 b'status': b'redirect',
455 b'location': {
386 b'location': {
456 b'url': location.url,
387 b'url': location.url,
457 b'mediatype': location.mediatype,
388 b'mediatype': location.mediatype,
458 }
389 }
459 }
390 }
460
391
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
392 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 r'servercadercerts'):
393 r'servercadercerts'):
463 value = getattr(location, a)
394 value = getattr(location, a)
464 if value is not None:
395 if value is not None:
465 data[b'location'][pycompat.bytestr(a)] = value
396 data[b'location'][pycompat.bytestr(a)] = value
466
397
467 return stream.makeframe(requestid=requestid,
398 return stream.makeframe(requestid=requestid,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
399 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 payload=b''.join(cborutil.streamencode(data)))
401 payload=b''.join(cborutil.streamencode(data)))
471
402
472 def createcommanderrorresponse(stream, requestid, message, args=None):
403 def createcommanderrorresponse(stream, requestid, message, args=None):
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 # formatting works consistently?
405 # formatting works consistently?
475 m = {
406 m = {
476 b'status': b'error',
407 b'status': b'error',
477 b'error': {
408 b'error': {
478 b'message': message,
409 b'message': message,
479 }
410 }
480 }
411 }
481
412
482 if args:
413 if args:
483 m[b'error'][b'args'] = args
414 m[b'error'][b'args'] = args
484
415
485 overall = b''.join(cborutil.streamencode(m))
416 overall = b''.join(cborutil.streamencode(m))
486
417
487 yield stream.makeframe(requestid=requestid,
418 yield stream.makeframe(requestid=requestid,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
419 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
420 flags=FLAG_COMMAND_RESPONSE_EOS,
490 payload=overall)
421 payload=overall)
491
422
492 def createerrorframe(stream, requestid, msg, errtype):
423 def createerrorframe(stream, requestid, msg, errtype):
493 # TODO properly handle frame size limits.
424 # TODO properly handle frame size limits.
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
425 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495
426
496 payload = b''.join(cborutil.streamencode({
427 payload = b''.join(cborutil.streamencode({
497 b'type': errtype,
428 b'type': errtype,
498 b'message': [{b'msg': msg}],
429 b'message': [{b'msg': msg}],
499 }))
430 }))
500
431
501 yield stream.makeframe(requestid=requestid,
432 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
433 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 flags=0,
434 flags=0,
504 payload=payload)
435 payload=payload)
505
436
506 def createtextoutputframe(stream, requestid, atoms,
437 def createtextoutputframe(stream, requestid, atoms,
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
438 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 """Create a text output frame to render text to people.
439 """Create a text output frame to render text to people.
509
440
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
441 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511
442
512 The formatting string contains ``%s`` tokens to be replaced by the
443 The formatting string contains ``%s`` tokens to be replaced by the
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
444 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 formatters to be applied at rendering time. In terms of the ``ui``
445 formatters to be applied at rendering time. In terms of the ``ui``
515 class, each atom corresponds to a ``ui.write()``.
446 class, each atom corresponds to a ``ui.write()``.
516 """
447 """
517 atomdicts = []
448 atomdicts = []
518
449
519 for (formatting, args, labels) in atoms:
450 for (formatting, args, labels) in atoms:
520 # TODO look for localstr, other types here?
451 # TODO look for localstr, other types here?
521
452
522 if not isinstance(formatting, bytes):
453 if not isinstance(formatting, bytes):
523 raise ValueError('must use bytes formatting strings')
454 raise ValueError('must use bytes formatting strings')
524 for arg in args:
455 for arg in args:
525 if not isinstance(arg, bytes):
456 if not isinstance(arg, bytes):
526 raise ValueError('must use bytes for arguments')
457 raise ValueError('must use bytes for arguments')
527 for label in labels:
458 for label in labels:
528 if not isinstance(label, bytes):
459 if not isinstance(label, bytes):
529 raise ValueError('must use bytes for labels')
460 raise ValueError('must use bytes for labels')
530
461
531 # Formatting string must be ASCII.
462 # Formatting string must be ASCII.
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
463 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533
464
534 # Arguments must be UTF-8.
465 # Arguments must be UTF-8.
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
466 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536
467
537 # Labels must be ASCII.
468 # Labels must be ASCII.
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
469 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 for l in labels]
470 for l in labels]
540
471
541 atom = {b'msg': formatting}
472 atom = {b'msg': formatting}
542 if args:
473 if args:
543 atom[b'args'] = args
474 atom[b'args'] = args
544 if labels:
475 if labels:
545 atom[b'labels'] = labels
476 atom[b'labels'] = labels
546
477
547 atomdicts.append(atom)
478 atomdicts.append(atom)
548
479
549 payload = b''.join(cborutil.streamencode(atomdicts))
480 payload = b''.join(cborutil.streamencode(atomdicts))
550
481
551 if len(payload) > maxframesize:
482 if len(payload) > maxframesize:
552 raise ValueError('cannot encode data in a single frame')
483 raise ValueError('cannot encode data in a single frame')
553
484
554 yield stream.makeframe(requestid=requestid,
485 yield stream.makeframe(requestid=requestid,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
486 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 flags=0,
487 flags=0,
557 payload=payload)
488 payload=payload)
558
489
559 class bufferingcommandresponseemitter(object):
490 class bufferingcommandresponseemitter(object):
560 """Helper object to emit command response frames intelligently.
491 """Helper object to emit command response frames intelligently.
561
492
562 Raw command response data is likely emitted in chunks much smaller
493 Raw command response data is likely emitted in chunks much smaller
563 than what can fit in a single frame. This class exists to buffer
494 than what can fit in a single frame. This class exists to buffer
564 chunks until enough data is available to fit in a single frame.
495 chunks until enough data is available to fit in a single frame.
565
496
566 TODO we'll need something like this when compression is supported.
497 TODO we'll need something like this when compression is supported.
567 So it might make sense to implement this functionality at the stream
498 So it might make sense to implement this functionality at the stream
568 level.
499 level.
569 """
500 """
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
501 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 self._stream = stream
502 self._stream = stream
572 self._requestid = requestid
503 self._requestid = requestid
573 self._maxsize = maxframesize
504 self._maxsize = maxframesize
574 self._chunks = []
505 self._chunks = []
575 self._chunkssize = 0
506 self._chunkssize = 0
576
507
577 def send(self, data):
508 def send(self, data):
578 """Send new data for emission.
509 """Send new data for emission.
579
510
580 Is a generator of new frames that were derived from the new input.
511 Is a generator of new frames that were derived from the new input.
581
512
582 If the special input ``None`` is received, flushes all buffered
513 If the special input ``None`` is received, flushes all buffered
583 data to frames.
514 data to frames.
584 """
515 """
585
516
586 if data is None:
517 if data is None:
587 for frame in self._flush():
518 for frame in self._flush():
588 yield frame
519 yield frame
589 return
520 return
590
521
591 # There is a ton of potential to do more complicated things here.
522 # There is a ton of potential to do more complicated things here.
592 # Our immediate goal is to coalesce small chunks into big frames,
523 # Our immediate goal is to coalesce small chunks into big frames,
593 # not achieve the fewest number of frames possible. So we go with
524 # not achieve the fewest number of frames possible. So we go with
594 # a simple implementation:
525 # a simple implementation:
595 #
526 #
596 # * If a chunk is too large for a frame, we flush and emit frames
527 # * If a chunk is too large for a frame, we flush and emit frames
597 # for the new chunk.
528 # for the new chunk.
598 # * If a chunk can be buffered without total buffered size limits
529 # * If a chunk can be buffered without total buffered size limits
599 # being exceeded, we do that.
530 # being exceeded, we do that.
600 # * If a chunk causes us to go over our buffering limit, we flush
531 # * If a chunk causes us to go over our buffering limit, we flush
601 # and then buffer the new chunk.
532 # and then buffer the new chunk.
602
533
603 if len(data) > self._maxsize:
534 if len(data) > self._maxsize:
604 for frame in self._flush():
535 for frame in self._flush():
605 yield frame
536 yield frame
606
537
607 # Now emit frames for the big chunk.
538 # Now emit frames for the big chunk.
608 offset = 0
539 offset = 0
609 while True:
540 while True:
610 chunk = data[offset:offset + self._maxsize]
541 chunk = data[offset:offset + self._maxsize]
611 offset += len(chunk)
542 offset += len(chunk)
612
543
613 yield self._stream.makeframe(
544 yield self._stream.makeframe(
614 self._requestid,
545 self._requestid,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
546 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
547 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 payload=chunk)
548 payload=chunk)
618
549
619 if offset == len(data):
550 if offset == len(data):
620 return
551 return
621
552
622 # If we don't have enough to constitute a full frame, buffer and
553 # If we don't have enough to constitute a full frame, buffer and
623 # return.
554 # return.
624 if len(data) + self._chunkssize < self._maxsize:
555 if len(data) + self._chunkssize < self._maxsize:
625 self._chunks.append(data)
556 self._chunks.append(data)
626 self._chunkssize += len(data)
557 self._chunkssize += len(data)
627 return
558 return
628
559
629 # Else flush what we have and buffer the new chunk. We could do
560 # Else flush what we have and buffer the new chunk. We could do
630 # something more intelligent here, like break the chunk. Let's
561 # something more intelligent here, like break the chunk. Let's
631 # keep things simple for now.
562 # keep things simple for now.
632 for frame in self._flush():
563 for frame in self._flush():
633 yield frame
564 yield frame
634
565
635 self._chunks.append(data)
566 self._chunks.append(data)
636 self._chunkssize = len(data)
567 self._chunkssize = len(data)
637
568
638 def _flush(self):
569 def _flush(self):
639 payload = b''.join(self._chunks)
570 payload = b''.join(self._chunks)
640 assert len(payload) <= self._maxsize
571 assert len(payload) <= self._maxsize
641
572
642 self._chunks[:] = []
573 self._chunks[:] = []
643 self._chunkssize = 0
574 self._chunkssize = 0
644
575
645 yield self._stream.makeframe(
576 yield self._stream.makeframe(
646 self._requestid,
577 self._requestid,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
578 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
579 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 payload=payload)
580 payload=payload)
650
581
651 # TODO consider defining encoders/decoders using the util.compressionengine
582 # TODO consider defining encoders/decoders using the util.compressionengine
652 # mechanism.
583 # mechanism.
653
584
654 class identityencoder(object):
585 class identityencoder(object):
655 """Encoder for the "identity" stream encoding profile."""
586 """Encoder for the "identity" stream encoding profile."""
656 def __init__(self, ui):
587 def __init__(self, ui):
657 pass
588 pass
658
589
659 def encode(self, data):
590 def encode(self, data):
660 return data
591 return data
661
592
662 def flush(self):
593 def flush(self):
663 return b''
594 return b''
664
595
665 def finish(self):
596 def finish(self):
666 return b''
597 return b''
667
598
668 class identitydecoder(object):
599 class identitydecoder(object):
669 """Decoder for the "identity" stream encoding profile."""
600 """Decoder for the "identity" stream encoding profile."""
670
601
671 def __init__(self, ui, extraobjs):
602 def __init__(self, ui, extraobjs):
672 if extraobjs:
603 if extraobjs:
673 raise error.Abort(_('identity decoder received unexpected '
604 raise error.Abort(_('identity decoder received unexpected '
674 'additional values'))
605 'additional values'))
675
606
676 def decode(self, data):
607 def decode(self, data):
677 return data
608 return data
678
609
679 class zlibencoder(object):
610 class zlibencoder(object):
680 def __init__(self, ui):
611 def __init__(self, ui):
681 import zlib
612 import zlib
682 self._zlib = zlib
613 self._zlib = zlib
683 self._compressor = zlib.compressobj()
614 self._compressor = zlib.compressobj()
684
615
685 def encode(self, data):
616 def encode(self, data):
686 return self._compressor.compress(data)
617 return self._compressor.compress(data)
687
618
688 def flush(self):
619 def flush(self):
689 # Z_SYNC_FLUSH doesn't reset compression context, which is
620 # Z_SYNC_FLUSH doesn't reset compression context, which is
690 # what we want.
621 # what we want.
691 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
622 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
692
623
693 def finish(self):
624 def finish(self):
694 res = self._compressor.flush(self._zlib.Z_FINISH)
625 res = self._compressor.flush(self._zlib.Z_FINISH)
695 self._compressor = None
626 self._compressor = None
696 return res
627 return res
697
628
698 class zlibdecoder(object):
629 class zlibdecoder(object):
699 def __init__(self, ui, extraobjs):
630 def __init__(self, ui, extraobjs):
700 import zlib
631 import zlib
701
632
702 if extraobjs:
633 if extraobjs:
703 raise error.Abort(_('zlib decoder received unexpected '
634 raise error.Abort(_('zlib decoder received unexpected '
704 'additional values'))
635 'additional values'))
705
636
706 self._decompressor = zlib.decompressobj()
637 self._decompressor = zlib.decompressobj()
707
638
708 def decode(self, data):
639 def decode(self, data):
709 # Python 2's zlib module doesn't use the buffer protocol and can't
640 # Python 2's zlib module doesn't use the buffer protocol and can't
710 # handle all bytes-like types.
641 # handle all bytes-like types.
711 if not pycompat.ispy3 and isinstance(data, bytearray):
642 if not pycompat.ispy3 and isinstance(data, bytearray):
712 data = bytes(data)
643 data = bytes(data)
713
644
714 return self._decompressor.decompress(data)
645 return self._decompressor.decompress(data)
715
646
716 class zstdbaseencoder(object):
647 class zstdbaseencoder(object):
717 def __init__(self, level):
648 def __init__(self, level):
718 from . import zstd
649 from . import zstd
719
650
720 self._zstd = zstd
651 self._zstd = zstd
721 cctx = zstd.ZstdCompressor(level=level)
652 cctx = zstd.ZstdCompressor(level=level)
722 self._compressor = cctx.compressobj()
653 self._compressor = cctx.compressobj()
723
654
724 def encode(self, data):
655 def encode(self, data):
725 return self._compressor.compress(data)
656 return self._compressor.compress(data)
726
657
727 def flush(self):
658 def flush(self):
728 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
659 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
729 # compressor and allows a decompressor to access all encoded data
660 # compressor and allows a decompressor to access all encoded data
730 # up to this point.
661 # up to this point.
731 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
662 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
732
663
733 def finish(self):
664 def finish(self):
734 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
665 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
735 self._compressor = None
666 self._compressor = None
736 return res
667 return res
737
668
738 class zstd8mbencoder(zstdbaseencoder):
669 class zstd8mbencoder(zstdbaseencoder):
739 def __init__(self, ui):
670 def __init__(self, ui):
740 super(zstd8mbencoder, self).__init__(3)
671 super(zstd8mbencoder, self).__init__(3)
741
672
742 class zstdbasedecoder(object):
673 class zstdbasedecoder(object):
743 def __init__(self, maxwindowsize):
674 def __init__(self, maxwindowsize):
744 from . import zstd
675 from . import zstd
745 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
676 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
746 self._decompressor = dctx.decompressobj()
677 self._decompressor = dctx.decompressobj()
747
678
748 def decode(self, data):
679 def decode(self, data):
749 return self._decompressor.decompress(data)
680 return self._decompressor.decompress(data)
750
681
751 class zstd8mbdecoder(zstdbasedecoder):
682 class zstd8mbdecoder(zstdbasedecoder):
752 def __init__(self, ui, extraobjs):
683 def __init__(self, ui, extraobjs):
753 if extraobjs:
684 if extraobjs:
754 raise error.Abort(_('zstd8mb decoder received unexpected '
685 raise error.Abort(_('zstd8mb decoder received unexpected '
755 'additional values'))
686 'additional values'))
756
687
757 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
688 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
758
689
759 # We lazily populate this to avoid excessive module imports when importing
690 # We lazily populate this to avoid excessive module imports when importing
760 # this module.
691 # this module.
761 STREAM_ENCODERS = {}
692 STREAM_ENCODERS = {}
762 STREAM_ENCODERS_ORDER = []
693 STREAM_ENCODERS_ORDER = []
763
694
764 def populatestreamencoders():
695 def populatestreamencoders():
765 if STREAM_ENCODERS:
696 if STREAM_ENCODERS:
766 return
697 return
767
698
768 try:
699 try:
769 from . import zstd
700 from . import zstd
770 zstd.__version__
701 zstd.__version__
771 except ImportError:
702 except ImportError:
772 zstd = None
703 zstd = None
773
704
774 # zstandard is fastest and is preferred.
705 # zstandard is fastest and is preferred.
775 if zstd:
706 if zstd:
776 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
707 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
777 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
708 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
778
709
779 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
710 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
780 STREAM_ENCODERS_ORDER.append(b'zlib')
711 STREAM_ENCODERS_ORDER.append(b'zlib')
781
712
782 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
713 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
783 STREAM_ENCODERS_ORDER.append(b'identity')
714 STREAM_ENCODERS_ORDER.append(b'identity')
784
715
785 class stream(object):
716 class stream(object):
786 """Represents a logical unidirectional series of frames."""
717 """Represents a logical unidirectional series of frames."""
787
718
788 def __init__(self, streamid, active=False):
719 def __init__(self, streamid, active=False):
789 self.streamid = streamid
720 self.streamid = streamid
790 self._active = active
721 self._active = active
791
722
792 def makeframe(self, requestid, typeid, flags, payload):
723 def makeframe(self, requestid, typeid, flags, payload):
793 """Create a frame to be sent out over this stream.
724 """Create a frame to be sent out over this stream.
794
725
795 Only returns the frame instance. Does not actually send it.
726 Only returns the frame instance. Does not actually send it.
796 """
727 """
797 streamflags = 0
728 streamflags = 0
798 if not self._active:
729 if not self._active:
799 streamflags |= STREAM_FLAG_BEGIN_STREAM
730 streamflags |= STREAM_FLAG_BEGIN_STREAM
800 self._active = True
731 self._active = True
801
732
802 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
733 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
803 payload)
734 payload)
804
735
805 class inputstream(stream):
736 class inputstream(stream):
806 """Represents a stream used for receiving data."""
737 """Represents a stream used for receiving data."""
807
738
808 def __init__(self, streamid, active=False):
739 def __init__(self, streamid, active=False):
809 super(inputstream, self).__init__(streamid, active=active)
740 super(inputstream, self).__init__(streamid, active=active)
810 self._decoder = None
741 self._decoder = None
811
742
812 def setdecoder(self, ui, name, extraobjs):
743 def setdecoder(self, ui, name, extraobjs):
813 """Set the decoder for this stream.
744 """Set the decoder for this stream.
814
745
815 Receives the stream profile name and any additional CBOR objects
746 Receives the stream profile name and any additional CBOR objects
816 decoded from the stream encoding settings frame payloads.
747 decoded from the stream encoding settings frame payloads.
817 """
748 """
818 if name not in STREAM_ENCODERS:
749 if name not in STREAM_ENCODERS:
819 raise error.Abort(_('unknown stream decoder: %s') % name)
750 raise error.Abort(_('unknown stream decoder: %s') % name)
820
751
821 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
752 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
822
753
823 def decode(self, data):
754 def decode(self, data):
824 # Default is identity decoder. We don't bother instantiating one
755 # Default is identity decoder. We don't bother instantiating one
825 # because it is trivial.
756 # because it is trivial.
826 if not self._decoder:
757 if not self._decoder:
827 return data
758 return data
828
759
829 return self._decoder.decode(data)
760 return self._decoder.decode(data)
830
761
831 def flush(self):
762 def flush(self):
832 if not self._decoder:
763 if not self._decoder:
833 return b''
764 return b''
834
765
835 return self._decoder.flush()
766 return self._decoder.flush()
836
767
837 class outputstream(stream):
768 class outputstream(stream):
838 """Represents a stream used for sending data."""
769 """Represents a stream used for sending data."""
839
770
840 def __init__(self, streamid, active=False):
771 def __init__(self, streamid, active=False):
841 super(outputstream, self).__init__(streamid, active=active)
772 super(outputstream, self).__init__(streamid, active=active)
842 self._encoder = None
773 self._encoder = None
843
774
844 def setencoder(self, ui, name):
775 def setencoder(self, ui, name):
845 """Set the encoder for this stream.
776 """Set the encoder for this stream.
846
777
847 Receives the stream profile name.
778 Receives the stream profile name.
848 """
779 """
849 if name not in STREAM_ENCODERS:
780 if name not in STREAM_ENCODERS:
850 raise error.Abort(_('unknown stream encoder: %s') % name)
781 raise error.Abort(_('unknown stream encoder: %s') % name)
851
782
852 self._encoder = STREAM_ENCODERS[name][0](ui)
783 self._encoder = STREAM_ENCODERS[name][0](ui)
853
784
854 def encode(self, data):
785 def encode(self, data):
855 if not self._encoder:
786 if not self._encoder:
856 return data
787 return data
857
788
858 return self._encoder.encode(data)
789 return self._encoder.encode(data)
859
790
860 def flush(self):
791 def flush(self):
861 if not self._encoder:
792 if not self._encoder:
862 return b''
793 return b''
863
794
864 return self._encoder.flush()
795 return self._encoder.flush()
865
796
866 def finish(self):
797 def finish(self):
867 if not self._encoder:
798 if not self._encoder:
868 return b''
799 return b''
869
800
870 self._encoder.finish()
801 self._encoder.finish()
871
802
872 def ensureserverstream(stream):
803 def ensureserverstream(stream):
873 if stream.streamid % 2:
804 if stream.streamid % 2:
874 raise error.ProgrammingError('server should only write to even '
805 raise error.ProgrammingError('server should only write to even '
875 'numbered streams; %d is not even' %
806 'numbered streams; %d is not even' %
876 stream.streamid)
807 stream.streamid)
877
808
878 DEFAULT_PROTOCOL_SETTINGS = {
809 DEFAULT_PROTOCOL_SETTINGS = {
879 'contentencodings': [b'identity'],
810 'contentencodings': [b'identity'],
880 }
811 }
881
812
882 class serverreactor(object):
813 class serverreactor(object):
883 """Holds state of a server handling frame-based protocol requests.
814 """Holds state of a server handling frame-based protocol requests.
884
815
885 This class is the "brain" of the unified frame-based protocol server
816 This class is the "brain" of the unified frame-based protocol server
886 component. While the protocol is stateless from the perspective of
817 component. While the protocol is stateless from the perspective of
887 requests/commands, something needs to track which frames have been
818 requests/commands, something needs to track which frames have been
888 received, what frames to expect, etc. This class is that thing.
819 received, what frames to expect, etc. This class is that thing.
889
820
890 Instances are modeled as a state machine of sorts. Instances are also
821 Instances are modeled as a state machine of sorts. Instances are also
891 reactionary to external events. The point of this class is to encapsulate
822 reactionary to external events. The point of this class is to encapsulate
892 the state of the connection and the exchange of frames, not to perform
823 the state of the connection and the exchange of frames, not to perform
893 work. Instead, callers tell this class when something occurs, like a
824 work. Instead, callers tell this class when something occurs, like a
894 frame arriving. If that activity is worthy of a follow-up action (say
825 frame arriving. If that activity is worthy of a follow-up action (say
895 *run a command*), the return value of that handler will say so.
826 *run a command*), the return value of that handler will say so.
896
827
897 I/O and CPU intensive operations are purposefully delegated outside of
828 I/O and CPU intensive operations are purposefully delegated outside of
898 this class.
829 this class.
899
830
900 Consumers are expected to tell instances when events occur. They do so by
831 Consumers are expected to tell instances when events occur. They do so by
901 calling the various ``on*`` methods. These methods return a 2-tuple
832 calling the various ``on*`` methods. These methods return a 2-tuple
902 describing any follow-up action(s) to take. The first element is the
833 describing any follow-up action(s) to take. The first element is the
903 name of an action to perform. The second is a data structure (usually
834 name of an action to perform. The second is a data structure (usually
904 a dict) specific to that action that contains more information. e.g.
835 a dict) specific to that action that contains more information. e.g.
905 if the server wants to send frames back to the client, the data structure
836 if the server wants to send frames back to the client, the data structure
906 will contain a reference to those frames.
837 will contain a reference to those frames.
907
838
908 Valid actions that consumers can be instructed to take are:
839 Valid actions that consumers can be instructed to take are:
909
840
910 sendframes
841 sendframes
911 Indicates that frames should be sent to the client. The ``framegen``
842 Indicates that frames should be sent to the client. The ``framegen``
912 key contains a generator of frames that should be sent. The server
843 key contains a generator of frames that should be sent. The server
913 assumes that all frames are sent to the client.
844 assumes that all frames are sent to the client.
914
845
915 error
846 error
916 Indicates that an error occurred. Consumer should probably abort.
847 Indicates that an error occurred. Consumer should probably abort.
917
848
918 runcommand
849 runcommand
919 Indicates that the consumer should run a wire protocol command. Details
850 Indicates that the consumer should run a wire protocol command. Details
920 of the command to run are given in the data structure.
851 of the command to run are given in the data structure.
921
852
922 wantframe
853 wantframe
923 Indicates that nothing of interest happened and the server is waiting on
854 Indicates that nothing of interest happened and the server is waiting on
924 more frames from the client before anything interesting can be done.
855 more frames from the client before anything interesting can be done.
925
856
926 noop
857 noop
927 Indicates no additional action is required.
858 Indicates no additional action is required.
928
859
929 Known Issues
860 Known Issues
930 ------------
861 ------------
931
862
932 There are no limits to the number of partially received commands or their
863 There are no limits to the number of partially received commands or their
933 size. A malicious client could stream command request data and exhaust the
864 size. A malicious client could stream command request data and exhaust the
934 server's memory.
865 server's memory.
935
866
936 Partially received commands are not acted upon when end of input is
867 Partially received commands are not acted upon when end of input is
937 reached. Should the server error if it receives a partial request?
868 reached. Should the server error if it receives a partial request?
938 Should the client send a message to abort a partially transmitted request
869 Should the client send a message to abort a partially transmitted request
939 to facilitate graceful shutdown?
870 to facilitate graceful shutdown?
940
871
941 Active requests that haven't been responded to aren't tracked. This means
872 Active requests that haven't been responded to aren't tracked. This means
942 that if we receive a command and instruct its dispatch, another command
873 that if we receive a command and instruct its dispatch, another command
943 with its request ID can come in over the wire and there will be a race
874 with its request ID can come in over the wire and there will be a race
944 between who responds to what.
875 between who responds to what.
945 """
876 """
946
877
947 def __init__(self, ui, deferoutput=False):
878 def __init__(self, ui, deferoutput=False):
948 """Construct a new server reactor.
879 """Construct a new server reactor.
949
880
950 ``deferoutput`` can be used to indicate that no output frames should be
881 ``deferoutput`` can be used to indicate that no output frames should be
951 instructed to be sent until input has been exhausted. In this mode,
882 instructed to be sent until input has been exhausted. In this mode,
952 events that would normally generate output frames (such as a command
883 events that would normally generate output frames (such as a command
953 response being ready) will instead defer instructing the consumer to
884 response being ready) will instead defer instructing the consumer to
954 send those frames. This is useful for half-duplex transports where the
885 send those frames. This is useful for half-duplex transports where the
955 sender cannot receive until all data has been transmitted.
886 sender cannot receive until all data has been transmitted.
956 """
887 """
957 self._ui = ui
888 self._ui = ui
958 self._deferoutput = deferoutput
889 self._deferoutput = deferoutput
959 self._state = 'initial'
890 self._state = 'initial'
960 self._nextoutgoingstreamid = 2
891 self._nextoutgoingstreamid = 2
961 self._bufferedframegens = []
892 self._bufferedframegens = []
962 # stream id -> stream instance for all active streams from the client.
893 # stream id -> stream instance for all active streams from the client.
963 self._incomingstreams = {}
894 self._incomingstreams = {}
964 self._outgoingstreams = {}
895 self._outgoingstreams = {}
965 # request id -> dict of commands that are actively being received.
896 # request id -> dict of commands that are actively being received.
966 self._receivingcommands = {}
897 self._receivingcommands = {}
967 # Request IDs that have been received and are actively being processed.
898 # Request IDs that have been received and are actively being processed.
968 # Once all output for a request has been sent, it is removed from this
899 # Once all output for a request has been sent, it is removed from this
969 # set.
900 # set.
970 self._activecommands = set()
901 self._activecommands = set()
971
902
972 self._protocolsettingsdecoder = None
903 self._protocolsettingsdecoder = None
973
904
974 # Sender protocol settings are optional. Set implied default values.
905 # Sender protocol settings are optional. Set implied default values.
975 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
906 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
976
907
977 populatestreamencoders()
908 populatestreamencoders()
978
909
979 def onframerecv(self, frame):
910 def onframerecv(self, frame):
980 """Process a frame that has been received off the wire.
911 """Process a frame that has been received off the wire.
981
912
982 Returns a dict with an ``action`` key that details what action,
913 Returns a dict with an ``action`` key that details what action,
983 if any, the consumer should take next.
914 if any, the consumer should take next.
984 """
915 """
985 if not frame.streamid % 2:
916 if not frame.streamid % 2:
986 self._state = 'errored'
917 self._state = 'errored'
987 return self._makeerrorresult(
918 return self._makeerrorresult(
988 _('received frame with even numbered stream ID: %d') %
919 _('received frame with even numbered stream ID: %d') %
989 frame.streamid)
920 frame.streamid)
990
921
991 if frame.streamid not in self._incomingstreams:
922 if frame.streamid not in self._incomingstreams:
992 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
923 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
993 self._state = 'errored'
924 self._state = 'errored'
994 return self._makeerrorresult(
925 return self._makeerrorresult(
995 _('received frame on unknown inactive stream without '
926 _('received frame on unknown inactive stream without '
996 'beginning of stream flag set'))
927 'beginning of stream flag set'))
997
928
998 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
929 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
999
930
1000 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
931 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1001 # TODO handle decoding frames
932 # TODO handle decoding frames
1002 self._state = 'errored'
933 self._state = 'errored'
1003 raise error.ProgrammingError('support for decoding stream payloads '
934 raise error.ProgrammingError('support for decoding stream payloads '
1004 'not yet implemented')
935 'not yet implemented')
1005
936
1006 if frame.streamflags & STREAM_FLAG_END_STREAM:
937 if frame.streamflags & STREAM_FLAG_END_STREAM:
1007 del self._incomingstreams[frame.streamid]
938 del self._incomingstreams[frame.streamid]
1008
939
1009 handlers = {
940 handlers = {
1010 'initial': self._onframeinitial,
941 'initial': self._onframeinitial,
1011 'protocol-settings-receiving': self._onframeprotocolsettings,
942 'protocol-settings-receiving': self._onframeprotocolsettings,
1012 'idle': self._onframeidle,
943 'idle': self._onframeidle,
1013 'command-receiving': self._onframecommandreceiving,
944 'command-receiving': self._onframecommandreceiving,
1014 'errored': self._onframeerrored,
945 'errored': self._onframeerrored,
1015 }
946 }
1016
947
1017 meth = handlers.get(self._state)
948 meth = handlers.get(self._state)
1018 if not meth:
949 if not meth:
1019 raise error.ProgrammingError('unhandled state: %s' % self._state)
950 raise error.ProgrammingError('unhandled state: %s' % self._state)
1020
951
1021 return meth(frame)
952 return meth(frame)
1022
953
1023 def oncommandresponseready(self, stream, requestid, data):
1024 """Signal that a bytes response is ready to be sent to the client.
1025
1026 The raw bytes response is passed as an argument.
1027 """
1028 ensureserverstream(stream)
1029
1030 def sendframes():
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
1032 data):
1033 yield frame
1034
1035 self._activecommands.remove(requestid)
1036
1037 result = sendframes()
1038
1039 if self._deferoutput:
1040 self._bufferedframegens.append(result)
1041 return 'noop', {}
1042 else:
1043 return 'sendframes', {
1044 'framegen': result,
1045 }
1046
1047 def oncommandresponsereadyobjects(self, stream, requestid, objs):
954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1048 """Signal that objects are ready to be sent to the client.
955 """Signal that objects are ready to be sent to the client.
1049
956
1050 ``objs`` is an iterable of objects (typically a generator) that will
957 ``objs`` is an iterable of objects (typically a generator) that will
1051 be encoded via CBOR and added to frames, which will be sent to the
958 be encoded via CBOR and added to frames, which will be sent to the
1052 client.
959 client.
1053 """
960 """
1054 ensureserverstream(stream)
961 ensureserverstream(stream)
1055
962
963 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
965 objs = iter(objs)
966
1056 # We need to take care over exception handling. Uncaught exceptions
967 # We need to take care over exception handling. Uncaught exceptions
1057 # when generating frames could lead to premature end of the frame
968 # when generating frames could lead to premature end of the frame
1058 # stream and the possibility of the server or client process getting
969 # stream and the possibility of the server or client process getting
1059 # in a bad state.
970 # in a bad state.
1060 #
971 #
1061 # Keep in mind that if ``objs`` is a generator, advancing it could
972 # Keep in mind that if ``objs`` is a generator, advancing it could
1062 # raise exceptions that originated in e.g. wire protocol command
973 # raise exceptions that originated in e.g. wire protocol command
1063 # functions. That is why we differentiate between exceptions raised
974 # functions. That is why we differentiate between exceptions raised
1064 # when iterating versus other exceptions that occur.
975 # when iterating versus other exceptions that occur.
1065 #
976 #
1066 # In all cases, when the function finishes, the request is fully
977 # In all cases, when the function finishes, the request is fully
1067 # handled and no new frames for it should be seen.
978 # handled and no new frames for it should be seen.
1068
979
1069 def sendframes():
980 def sendframes():
1070 emitted = False
981 emitted = False
1071 alternatelocationsent = False
982 alternatelocationsent = False
1072 emitter = bufferingcommandresponseemitter(stream, requestid)
983 emitter = bufferingcommandresponseemitter(stream, requestid)
1073 while True:
984 while True:
1074 try:
985 try:
1075 o = next(objs)
986 o = next(objs)
1076 except StopIteration:
987 except StopIteration:
1077 for frame in emitter.send(None):
988 for frame in emitter.send(None):
1078 yield frame
989 yield frame
1079
990
1080 if emitted:
991 if emitted:
1081 yield createcommandresponseeosframe(stream, requestid)
992 yield createcommandresponseeosframe(stream, requestid)
1082 break
993 break
1083
994
1084 except error.WireprotoCommandError as e:
995 except error.WireprotoCommandError as e:
1085 for frame in createcommanderrorresponse(
996 for frame in createcommanderrorresponse(
1086 stream, requestid, e.message, e.messageargs):
997 stream, requestid, e.message, e.messageargs):
1087 yield frame
998 yield frame
1088 break
999 break
1089
1000
1090 except Exception as e:
1001 except Exception as e:
1091 for frame in createerrorframe(
1002 for frame in createerrorframe(
1092 stream, requestid, '%s' % stringutil.forcebytestr(e),
1003 stream, requestid, '%s' % stringutil.forcebytestr(e),
1093 errtype='server'):
1004 errtype='server'):
1094
1005
1095 yield frame
1006 yield frame
1096
1007
1097 break
1008 break
1098
1009
1099 try:
1010 try:
1100 # Alternate location responses can only be the first and
1011 # Alternate location responses can only be the first and
1101 # only object in the output stream.
1012 # only object in the output stream.
1102 if isinstance(o, wireprototypes.alternatelocationresponse):
1013 if isinstance(o, wireprototypes.alternatelocationresponse):
1103 if emitted:
1014 if emitted:
1104 raise error.ProgrammingError(
1015 raise error.ProgrammingError(
1105 'alternatelocationresponse seen after initial '
1016 'alternatelocationresponse seen after initial '
1106 'output object')
1017 'output object')
1107
1018
1108 yield createalternatelocationresponseframe(
1019 yield createalternatelocationresponseframe(
1109 stream, requestid, o)
1020 stream, requestid, o)
1110
1021
1111 alternatelocationsent = True
1022 alternatelocationsent = True
1112 emitted = True
1023 emitted = True
1113 continue
1024 continue
1114
1025
1115 if alternatelocationsent:
1026 if alternatelocationsent:
1116 raise error.ProgrammingError(
1027 raise error.ProgrammingError(
1117 'object follows alternatelocationresponse')
1028 'object follows alternatelocationresponse')
1118
1029
1119 if not emitted:
1030 if not emitted:
1120 yield createcommandresponseokframe(stream, requestid)
1031 yield createcommandresponseokframe(stream, requestid)
1121 emitted = True
1032 emitted = True
1122
1033
1123 # Objects emitted by command functions can be serializable
1034 # Objects emitted by command functions can be serializable
1124 # data structures or special types.
1035 # data structures or special types.
1125 # TODO consider extracting the content normalization to a
1036 # TODO consider extracting the content normalization to a
1126 # standalone function, as it may be useful for e.g. cachers.
1037 # standalone function, as it may be useful for e.g. cachers.
1127
1038
1128 # A pre-encoded object is sent directly to the emitter.
1039 # A pre-encoded object is sent directly to the emitter.
1129 if isinstance(o, wireprototypes.encodedresponse):
1040 if isinstance(o, wireprototypes.encodedresponse):
1130 for frame in emitter.send(o.data):
1041 for frame in emitter.send(o.data):
1131 yield frame
1042 yield frame
1132
1043
1133 # A regular object is CBOR encoded.
1044 # A regular object is CBOR encoded.
1134 else:
1045 else:
1135 for chunk in cborutil.streamencode(o):
1046 for chunk in cborutil.streamencode(o):
1136 for frame in emitter.send(chunk):
1047 for frame in emitter.send(chunk):
1137 yield frame
1048 yield frame
1138
1049
1139 except Exception as e:
1050 except Exception as e:
1140 for frame in createerrorframe(stream, requestid,
1051 for frame in createerrorframe(stream, requestid,
1141 '%s' % e,
1052 '%s' % e,
1142 errtype='server'):
1053 errtype='server'):
1143 yield frame
1054 yield frame
1144
1055
1145 break
1056 break
1146
1057
1147 self._activecommands.remove(requestid)
1058 self._activecommands.remove(requestid)
1148
1059
1149 return self._handlesendframes(sendframes())
1060 return self._handlesendframes(sendframes())
1150
1061
1151 def oninputeof(self):
1062 def oninputeof(self):
1152 """Signals that end of input has been received.
1063 """Signals that end of input has been received.
1153
1064
1154 No more frames will be received. All pending activity should be
1065 No more frames will be received. All pending activity should be
1155 completed.
1066 completed.
1156 """
1067 """
1157 # TODO should we do anything about in-flight commands?
1068 # TODO should we do anything about in-flight commands?
1158
1069
1159 if not self._deferoutput or not self._bufferedframegens:
1070 if not self._deferoutput or not self._bufferedframegens:
1160 return 'noop', {}
1071 return 'noop', {}
1161
1072
1162 # If we buffered all our responses, emit those.
1073 # If we buffered all our responses, emit those.
1163 def makegen():
1074 def makegen():
1164 for gen in self._bufferedframegens:
1075 for gen in self._bufferedframegens:
1165 for frame in gen:
1076 for frame in gen:
1166 yield frame
1077 yield frame
1167
1078
1168 return 'sendframes', {
1079 return 'sendframes', {
1169 'framegen': makegen(),
1080 'framegen': makegen(),
1170 }
1081 }
1171
1082
1172 def _handlesendframes(self, framegen):
1083 def _handlesendframes(self, framegen):
1173 if self._deferoutput:
1084 if self._deferoutput:
1174 self._bufferedframegens.append(framegen)
1085 self._bufferedframegens.append(framegen)
1175 return 'noop', {}
1086 return 'noop', {}
1176 else:
1087 else:
1177 return 'sendframes', {
1088 return 'sendframes', {
1178 'framegen': framegen,
1089 'framegen': framegen,
1179 }
1090 }
1180
1091
1181 def onservererror(self, stream, requestid, msg):
1092 def onservererror(self, stream, requestid, msg):
1182 ensureserverstream(stream)
1093 ensureserverstream(stream)
1183
1094
1184 def sendframes():
1095 def sendframes():
1185 for frame in createerrorframe(stream, requestid, msg,
1096 for frame in createerrorframe(stream, requestid, msg,
1186 errtype='server'):
1097 errtype='server'):
1187 yield frame
1098 yield frame
1188
1099
1189 self._activecommands.remove(requestid)
1100 self._activecommands.remove(requestid)
1190
1101
1191 return self._handlesendframes(sendframes())
1102 return self._handlesendframes(sendframes())
1192
1103
1193 def oncommanderror(self, stream, requestid, message, args=None):
1104 def oncommanderror(self, stream, requestid, message, args=None):
1194 """Called when a command encountered an error before sending output."""
1105 """Called when a command encountered an error before sending output."""
1195 ensureserverstream(stream)
1106 ensureserverstream(stream)
1196
1107
1197 def sendframes():
1108 def sendframes():
1198 for frame in createcommanderrorresponse(stream, requestid, message,
1109 for frame in createcommanderrorresponse(stream, requestid, message,
1199 args):
1110 args):
1200 yield frame
1111 yield frame
1201
1112
1202 self._activecommands.remove(requestid)
1113 self._activecommands.remove(requestid)
1203
1114
1204 return self._handlesendframes(sendframes())
1115 return self._handlesendframes(sendframes())
1205
1116
1206 def makeoutputstream(self):
1117 def makeoutputstream(self):
1207 """Create a stream to be used for sending data to the client."""
1118 """Create a stream to be used for sending data to the client."""
1208 streamid = self._nextoutgoingstreamid
1119 streamid = self._nextoutgoingstreamid
1209 self._nextoutgoingstreamid += 2
1120 self._nextoutgoingstreamid += 2
1210
1121
1211 s = outputstream(streamid)
1122 s = outputstream(streamid)
1212 self._outgoingstreams[streamid] = s
1123 self._outgoingstreams[streamid] = s
1213
1124
1214 return s
1125 return s
1215
1126
1216 def _makeerrorresult(self, msg):
1127 def _makeerrorresult(self, msg):
1217 return 'error', {
1128 return 'error', {
1218 'message': msg,
1129 'message': msg,
1219 }
1130 }
1220
1131
1221 def _makeruncommandresult(self, requestid):
1132 def _makeruncommandresult(self, requestid):
1222 entry = self._receivingcommands[requestid]
1133 entry = self._receivingcommands[requestid]
1223
1134
1224 if not entry['requestdone']:
1135 if not entry['requestdone']:
1225 self._state = 'errored'
1136 self._state = 'errored'
1226 raise error.ProgrammingError('should not be called without '
1137 raise error.ProgrammingError('should not be called without '
1227 'requestdone set')
1138 'requestdone set')
1228
1139
1229 del self._receivingcommands[requestid]
1140 del self._receivingcommands[requestid]
1230
1141
1231 if self._receivingcommands:
1142 if self._receivingcommands:
1232 self._state = 'command-receiving'
1143 self._state = 'command-receiving'
1233 else:
1144 else:
1234 self._state = 'idle'
1145 self._state = 'idle'
1235
1146
1236 # Decode the payloads as CBOR.
1147 # Decode the payloads as CBOR.
1237 entry['payload'].seek(0)
1148 entry['payload'].seek(0)
1238 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1149 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1239
1150
1240 if b'name' not in request:
1151 if b'name' not in request:
1241 self._state = 'errored'
1152 self._state = 'errored'
1242 return self._makeerrorresult(
1153 return self._makeerrorresult(
1243 _('command request missing "name" field'))
1154 _('command request missing "name" field'))
1244
1155
1245 if b'args' not in request:
1156 if b'args' not in request:
1246 request[b'args'] = {}
1157 request[b'args'] = {}
1247
1158
1248 assert requestid not in self._activecommands
1159 assert requestid not in self._activecommands
1249 self._activecommands.add(requestid)
1160 self._activecommands.add(requestid)
1250
1161
1251 return 'runcommand', {
1162 return 'runcommand', {
1252 'requestid': requestid,
1163 'requestid': requestid,
1253 'command': request[b'name'],
1164 'command': request[b'name'],
1254 'args': request[b'args'],
1165 'args': request[b'args'],
1255 'redirect': request.get(b'redirect'),
1166 'redirect': request.get(b'redirect'),
1256 'data': entry['data'].getvalue() if entry['data'] else None,
1167 'data': entry['data'].getvalue() if entry['data'] else None,
1257 }
1168 }
1258
1169
1259 def _makewantframeresult(self):
1170 def _makewantframeresult(self):
1260 return 'wantframe', {
1171 return 'wantframe', {
1261 'state': self._state,
1172 'state': self._state,
1262 }
1173 }
1263
1174
1264 def _validatecommandrequestframe(self, frame):
1175 def _validatecommandrequestframe(self, frame):
1265 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1176 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1266 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1177 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1267
1178
1268 if new and continuation:
1179 if new and continuation:
1269 self._state = 'errored'
1180 self._state = 'errored'
1270 return self._makeerrorresult(
1181 return self._makeerrorresult(
1271 _('received command request frame with both new and '
1182 _('received command request frame with both new and '
1272 'continuation flags set'))
1183 'continuation flags set'))
1273
1184
1274 if not new and not continuation:
1185 if not new and not continuation:
1275 self._state = 'errored'
1186 self._state = 'errored'
1276 return self._makeerrorresult(
1187 return self._makeerrorresult(
1277 _('received command request frame with neither new nor '
1188 _('received command request frame with neither new nor '
1278 'continuation flags set'))
1189 'continuation flags set'))
1279
1190
1280 def _onframeinitial(self, frame):
1191 def _onframeinitial(self, frame):
1281 # Called when we receive a frame when in the "initial" state.
1192 # Called when we receive a frame when in the "initial" state.
1282 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1193 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1283 self._state = 'protocol-settings-receiving'
1194 self._state = 'protocol-settings-receiving'
1284 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1195 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1285 return self._onframeprotocolsettings(frame)
1196 return self._onframeprotocolsettings(frame)
1286
1197
1287 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1198 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1288 self._state = 'idle'
1199 self._state = 'idle'
1289 return self._onframeidle(frame)
1200 return self._onframeidle(frame)
1290
1201
1291 else:
1202 else:
1292 self._state = 'errored'
1203 self._state = 'errored'
1293 return self._makeerrorresult(
1204 return self._makeerrorresult(
1294 _('expected sender protocol settings or command request '
1205 _('expected sender protocol settings or command request '
1295 'frame; got %d') % frame.typeid)
1206 'frame; got %d') % frame.typeid)
1296
1207
1297 def _onframeprotocolsettings(self, frame):
1208 def _onframeprotocolsettings(self, frame):
1298 assert self._state == 'protocol-settings-receiving'
1209 assert self._state == 'protocol-settings-receiving'
1299 assert self._protocolsettingsdecoder is not None
1210 assert self._protocolsettingsdecoder is not None
1300
1211
1301 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1212 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1302 self._state = 'errored'
1213 self._state = 'errored'
1303 return self._makeerrorresult(
1214 return self._makeerrorresult(
1304 _('expected sender protocol settings frame; got %d') %
1215 _('expected sender protocol settings frame; got %d') %
1305 frame.typeid)
1216 frame.typeid)
1306
1217
1307 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1218 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1308 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1219 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1309
1220
1310 if more and eos:
1221 if more and eos:
1311 self._state = 'errored'
1222 self._state = 'errored'
1312 return self._makeerrorresult(
1223 return self._makeerrorresult(
1313 _('sender protocol settings frame cannot have both '
1224 _('sender protocol settings frame cannot have both '
1314 'continuation and end of stream flags set'))
1225 'continuation and end of stream flags set'))
1315
1226
1316 if not more and not eos:
1227 if not more and not eos:
1317 self._state = 'errored'
1228 self._state = 'errored'
1318 return self._makeerrorresult(
1229 return self._makeerrorresult(
1319 _('sender protocol settings frame must have continuation or '
1230 _('sender protocol settings frame must have continuation or '
1320 'end of stream flag set'))
1231 'end of stream flag set'))
1321
1232
1322 # TODO establish limits for maximum amount of data that can be
1233 # TODO establish limits for maximum amount of data that can be
1323 # buffered.
1234 # buffered.
1324 try:
1235 try:
1325 self._protocolsettingsdecoder.decode(frame.payload)
1236 self._protocolsettingsdecoder.decode(frame.payload)
1326 except Exception as e:
1237 except Exception as e:
1327 self._state = 'errored'
1238 self._state = 'errored'
1328 return self._makeerrorresult(
1239 return self._makeerrorresult(
1329 _('error decoding CBOR from sender protocol settings frame: %s')
1240 _('error decoding CBOR from sender protocol settings frame: %s')
1330 % stringutil.forcebytestr(e))
1241 % stringutil.forcebytestr(e))
1331
1242
1332 if more:
1243 if more:
1333 return self._makewantframeresult()
1244 return self._makewantframeresult()
1334
1245
1335 assert eos
1246 assert eos
1336
1247
1337 decoded = self._protocolsettingsdecoder.getavailable()
1248 decoded = self._protocolsettingsdecoder.getavailable()
1338 self._protocolsettingsdecoder = None
1249 self._protocolsettingsdecoder = None
1339
1250
1340 if not decoded:
1251 if not decoded:
1341 self._state = 'errored'
1252 self._state = 'errored'
1342 return self._makeerrorresult(
1253 return self._makeerrorresult(
1343 _('sender protocol settings frame did not contain CBOR data'))
1254 _('sender protocol settings frame did not contain CBOR data'))
1344 elif len(decoded) > 1:
1255 elif len(decoded) > 1:
1345 self._state = 'errored'
1256 self._state = 'errored'
1346 return self._makeerrorresult(
1257 return self._makeerrorresult(
1347 _('sender protocol settings frame contained multiple CBOR '
1258 _('sender protocol settings frame contained multiple CBOR '
1348 'values'))
1259 'values'))
1349
1260
1350 d = decoded[0]
1261 d = decoded[0]
1351
1262
1352 if b'contentencodings' in d:
1263 if b'contentencodings' in d:
1353 self._sendersettings['contentencodings'] = d[b'contentencodings']
1264 self._sendersettings['contentencodings'] = d[b'contentencodings']
1354
1265
1355 self._state = 'idle'
1266 self._state = 'idle'
1356
1267
1357 return self._makewantframeresult()
1268 return self._makewantframeresult()
1358
1269
1359 def _onframeidle(self, frame):
1270 def _onframeidle(self, frame):
1360 # The only frame type that should be received in this state is a
1271 # The only frame type that should be received in this state is a
1361 # command request.
1272 # command request.
1362 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1273 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1363 self._state = 'errored'
1274 self._state = 'errored'
1364 return self._makeerrorresult(
1275 return self._makeerrorresult(
1365 _('expected command request frame; got %d') % frame.typeid)
1276 _('expected command request frame; got %d') % frame.typeid)
1366
1277
1367 res = self._validatecommandrequestframe(frame)
1278 res = self._validatecommandrequestframe(frame)
1368 if res:
1279 if res:
1369 return res
1280 return res
1370
1281
1371 if frame.requestid in self._receivingcommands:
1282 if frame.requestid in self._receivingcommands:
1372 self._state = 'errored'
1283 self._state = 'errored'
1373 return self._makeerrorresult(
1284 return self._makeerrorresult(
1374 _('request with ID %d already received') % frame.requestid)
1285 _('request with ID %d already received') % frame.requestid)
1375
1286
1376 if frame.requestid in self._activecommands:
1287 if frame.requestid in self._activecommands:
1377 self._state = 'errored'
1288 self._state = 'errored'
1378 return self._makeerrorresult(
1289 return self._makeerrorresult(
1379 _('request with ID %d is already active') % frame.requestid)
1290 _('request with ID %d is already active') % frame.requestid)
1380
1291
1381 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1292 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1382 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1293 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1383 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1294 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1384
1295
1385 if not new:
1296 if not new:
1386 self._state = 'errored'
1297 self._state = 'errored'
1387 return self._makeerrorresult(
1298 return self._makeerrorresult(
1388 _('received command request frame without new flag set'))
1299 _('received command request frame without new flag set'))
1389
1300
1390 payload = util.bytesio()
1301 payload = util.bytesio()
1391 payload.write(frame.payload)
1302 payload.write(frame.payload)
1392
1303
1393 self._receivingcommands[frame.requestid] = {
1304 self._receivingcommands[frame.requestid] = {
1394 'payload': payload,
1305 'payload': payload,
1395 'data': None,
1306 'data': None,
1396 'requestdone': not moreframes,
1307 'requestdone': not moreframes,
1397 'expectingdata': bool(expectingdata),
1308 'expectingdata': bool(expectingdata),
1398 }
1309 }
1399
1310
1400 # This is the final frame for this request. Dispatch it.
1311 # This is the final frame for this request. Dispatch it.
1401 if not moreframes and not expectingdata:
1312 if not moreframes and not expectingdata:
1402 return self._makeruncommandresult(frame.requestid)
1313 return self._makeruncommandresult(frame.requestid)
1403
1314
1404 assert moreframes or expectingdata
1315 assert moreframes or expectingdata
1405 self._state = 'command-receiving'
1316 self._state = 'command-receiving'
1406 return self._makewantframeresult()
1317 return self._makewantframeresult()
1407
1318
1408 def _onframecommandreceiving(self, frame):
1319 def _onframecommandreceiving(self, frame):
1409 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1320 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1410 # Process new command requests as such.
1321 # Process new command requests as such.
1411 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1322 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1412 return self._onframeidle(frame)
1323 return self._onframeidle(frame)
1413
1324
1414 res = self._validatecommandrequestframe(frame)
1325 res = self._validatecommandrequestframe(frame)
1415 if res:
1326 if res:
1416 return res
1327 return res
1417
1328
1418 # All other frames should be related to a command that is currently
1329 # All other frames should be related to a command that is currently
1419 # receiving but is not active.
1330 # receiving but is not active.
1420 if frame.requestid in self._activecommands:
1331 if frame.requestid in self._activecommands:
1421 self._state = 'errored'
1332 self._state = 'errored'
1422 return self._makeerrorresult(
1333 return self._makeerrorresult(
1423 _('received frame for request that is still active: %d') %
1334 _('received frame for request that is still active: %d') %
1424 frame.requestid)
1335 frame.requestid)
1425
1336
1426 if frame.requestid not in self._receivingcommands:
1337 if frame.requestid not in self._receivingcommands:
1427 self._state = 'errored'
1338 self._state = 'errored'
1428 return self._makeerrorresult(
1339 return self._makeerrorresult(
1429 _('received frame for request that is not receiving: %d') %
1340 _('received frame for request that is not receiving: %d') %
1430 frame.requestid)
1341 frame.requestid)
1431
1342
1432 entry = self._receivingcommands[frame.requestid]
1343 entry = self._receivingcommands[frame.requestid]
1433
1344
1434 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1345 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1435 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1346 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1436 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1347 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1437
1348
1438 if entry['requestdone']:
1349 if entry['requestdone']:
1439 self._state = 'errored'
1350 self._state = 'errored'
1440 return self._makeerrorresult(
1351 return self._makeerrorresult(
1441 _('received command request frame when request frames '
1352 _('received command request frame when request frames '
1442 'were supposedly done'))
1353 'were supposedly done'))
1443
1354
1444 if expectingdata != entry['expectingdata']:
1355 if expectingdata != entry['expectingdata']:
1445 self._state = 'errored'
1356 self._state = 'errored'
1446 return self._makeerrorresult(
1357 return self._makeerrorresult(
1447 _('mismatch between expect data flag and previous frame'))
1358 _('mismatch between expect data flag and previous frame'))
1448
1359
1449 entry['payload'].write(frame.payload)
1360 entry['payload'].write(frame.payload)
1450
1361
1451 if not moreframes:
1362 if not moreframes:
1452 entry['requestdone'] = True
1363 entry['requestdone'] = True
1453
1364
1454 if not moreframes and not expectingdata:
1365 if not moreframes and not expectingdata:
1455 return self._makeruncommandresult(frame.requestid)
1366 return self._makeruncommandresult(frame.requestid)
1456
1367
1457 return self._makewantframeresult()
1368 return self._makewantframeresult()
1458
1369
1459 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1370 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1460 if not entry['expectingdata']:
1371 if not entry['expectingdata']:
1461 self._state = 'errored'
1372 self._state = 'errored'
1462 return self._makeerrorresult(_(
1373 return self._makeerrorresult(_(
1463 'received command data frame for request that is not '
1374 'received command data frame for request that is not '
1464 'expecting data: %d') % frame.requestid)
1375 'expecting data: %d') % frame.requestid)
1465
1376
1466 if entry['data'] is None:
1377 if entry['data'] is None:
1467 entry['data'] = util.bytesio()
1378 entry['data'] = util.bytesio()
1468
1379
1469 return self._handlecommanddataframe(frame, entry)
1380 return self._handlecommanddataframe(frame, entry)
1470 else:
1381 else:
1471 self._state = 'errored'
1382 self._state = 'errored'
1472 return self._makeerrorresult(_(
1383 return self._makeerrorresult(_(
1473 'received unexpected frame type: %d') % frame.typeid)
1384 'received unexpected frame type: %d') % frame.typeid)
1474
1385
1475 def _handlecommanddataframe(self, frame, entry):
1386 def _handlecommanddataframe(self, frame, entry):
1476 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1387 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1477
1388
1478 # TODO support streaming data instead of buffering it.
1389 # TODO support streaming data instead of buffering it.
1479 entry['data'].write(frame.payload)
1390 entry['data'].write(frame.payload)
1480
1391
1481 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1392 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1482 return self._makewantframeresult()
1393 return self._makewantframeresult()
1483 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1394 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1484 entry['data'].seek(0)
1395 entry['data'].seek(0)
1485 return self._makeruncommandresult(frame.requestid)
1396 return self._makeruncommandresult(frame.requestid)
1486 else:
1397 else:
1487 self._state = 'errored'
1398 self._state = 'errored'
1488 return self._makeerrorresult(_('command data frame without '
1399 return self._makeerrorresult(_('command data frame without '
1489 'flags'))
1400 'flags'))
1490
1401
1491 def _onframeerrored(self, frame):
1402 def _onframeerrored(self, frame):
1492 return self._makeerrorresult(_('server already errored'))
1403 return self._makeerrorresult(_('server already errored'))
1493
1404
1494 class commandrequest(object):
1405 class commandrequest(object):
1495 """Represents a request to run a command."""
1406 """Represents a request to run a command."""
1496
1407
1497 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1408 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1498 self.requestid = requestid
1409 self.requestid = requestid
1499 self.name = name
1410 self.name = name
1500 self.args = args
1411 self.args = args
1501 self.datafh = datafh
1412 self.datafh = datafh
1502 self.redirect = redirect
1413 self.redirect = redirect
1503 self.state = 'pending'
1414 self.state = 'pending'
1504
1415
1505 class clientreactor(object):
1416 class clientreactor(object):
1506 """Holds state of a client issuing frame-based protocol requests.
1417 """Holds state of a client issuing frame-based protocol requests.
1507
1418
1508 This is like ``serverreactor`` but for client-side state.
1419 This is like ``serverreactor`` but for client-side state.
1509
1420
1510 Each instance is bound to the lifetime of a connection. For persistent
1421 Each instance is bound to the lifetime of a connection. For persistent
1511 connection transports using e.g. TCP sockets and speaking the raw
1422 connection transports using e.g. TCP sockets and speaking the raw
1512 framing protocol, there will be a single instance for the lifetime of
1423 framing protocol, there will be a single instance for the lifetime of
1513 the TCP socket. For transports where there are multiple discrete
1424 the TCP socket. For transports where there are multiple discrete
1514 interactions (say tunneled within in HTTP request), there will be a
1425 interactions (say tunneled within in HTTP request), there will be a
1515 separate instance for each distinct interaction.
1426 separate instance for each distinct interaction.
1516
1427
1517 Consumers are expected to tell instances when events occur by calling
1428 Consumers are expected to tell instances when events occur by calling
1518 various methods. These methods return a 2-tuple describing any follow-up
1429 various methods. These methods return a 2-tuple describing any follow-up
1519 action(s) to take. The first element is the name of an action to
1430 action(s) to take. The first element is the name of an action to
1520 perform. The second is a data structure (usually a dict) specific to
1431 perform. The second is a data structure (usually a dict) specific to
1521 that action that contains more information. e.g. if the reactor wants
1432 that action that contains more information. e.g. if the reactor wants
1522 to send frames to the server, the data structure will contain a reference
1433 to send frames to the server, the data structure will contain a reference
1523 to those frames.
1434 to those frames.
1524
1435
1525 Valid actions that consumers can be instructed to take are:
1436 Valid actions that consumers can be instructed to take are:
1526
1437
1527 noop
1438 noop
1528 Indicates no additional action is required.
1439 Indicates no additional action is required.
1529
1440
1530 sendframes
1441 sendframes
1531 Indicates that frames should be sent to the server. The ``framegen``
1442 Indicates that frames should be sent to the server. The ``framegen``
1532 key contains a generator of frames that should be sent. The reactor
1443 key contains a generator of frames that should be sent. The reactor
1533 assumes that all frames in this generator are sent to the server.
1444 assumes that all frames in this generator are sent to the server.
1534
1445
1535 error
1446 error
1536 Indicates that an error occurred. The ``message`` key contains an
1447 Indicates that an error occurred. The ``message`` key contains an
1537 error message describing the failure.
1448 error message describing the failure.
1538
1449
1539 responsedata
1450 responsedata
1540 Indicates a response to a previously-issued command was received.
1451 Indicates a response to a previously-issued command was received.
1541
1452
1542 The ``request`` key contains the ``commandrequest`` instance that
1453 The ``request`` key contains the ``commandrequest`` instance that
1543 represents the request this data is for.
1454 represents the request this data is for.
1544
1455
1545 The ``data`` key contains the decoded data from the server.
1456 The ``data`` key contains the decoded data from the server.
1546
1457
1547 ``expectmore`` and ``eos`` evaluate to True when more response data
1458 ``expectmore`` and ``eos`` evaluate to True when more response data
1548 is expected to follow or we're at the end of the response stream,
1459 is expected to follow or we're at the end of the response stream,
1549 respectively.
1460 respectively.
1550 """
1461 """
1551 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1462 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1552 clientcontentencoders=None):
1463 clientcontentencoders=None):
1553 """Create a new instance.
1464 """Create a new instance.
1554
1465
1555 ``hasmultiplesend`` indicates whether multiple sends are supported
1466 ``hasmultiplesend`` indicates whether multiple sends are supported
1556 by the transport. When True, it is possible to send commands immediately
1467 by the transport. When True, it is possible to send commands immediately
1557 instead of buffering until the caller signals an intent to finish a
1468 instead of buffering until the caller signals an intent to finish a
1558 send operation.
1469 send operation.
1559
1470
1560 ``buffercommands`` indicates whether sends should be buffered until the
1471 ``buffercommands`` indicates whether sends should be buffered until the
1561 last request has been issued.
1472 last request has been issued.
1562
1473
1563 ``clientcontentencoders`` is an iterable of content encoders the client
1474 ``clientcontentencoders`` is an iterable of content encoders the client
1564 will advertise to the server and that the server can use for encoding
1475 will advertise to the server and that the server can use for encoding
1565 data. If not defined, the client will not advertise content encoders
1476 data. If not defined, the client will not advertise content encoders
1566 to the server.
1477 to the server.
1567 """
1478 """
1568 self._ui = ui
1479 self._ui = ui
1569 self._hasmultiplesend = hasmultiplesend
1480 self._hasmultiplesend = hasmultiplesend
1570 self._buffersends = buffersends
1481 self._buffersends = buffersends
1571 self._clientcontentencoders = clientcontentencoders
1482 self._clientcontentencoders = clientcontentencoders
1572
1483
1573 self._canissuecommands = True
1484 self._canissuecommands = True
1574 self._cansend = True
1485 self._cansend = True
1575 self._protocolsettingssent = False
1486 self._protocolsettingssent = False
1576
1487
1577 self._nextrequestid = 1
1488 self._nextrequestid = 1
1578 # We only support a single outgoing stream for now.
1489 # We only support a single outgoing stream for now.
1579 self._outgoingstream = outputstream(1)
1490 self._outgoingstream = outputstream(1)
1580 self._pendingrequests = collections.deque()
1491 self._pendingrequests = collections.deque()
1581 self._activerequests = {}
1492 self._activerequests = {}
1582 self._incomingstreams = {}
1493 self._incomingstreams = {}
1583 self._streamsettingsdecoders = {}
1494 self._streamsettingsdecoders = {}
1584
1495
1585 populatestreamencoders()
1496 populatestreamencoders()
1586
1497
1587 def callcommand(self, name, args, datafh=None, redirect=None):
1498 def callcommand(self, name, args, datafh=None, redirect=None):
1588 """Request that a command be executed.
1499 """Request that a command be executed.
1589
1500
1590 Receives the command name, a dict of arguments to pass to the command,
1501 Receives the command name, a dict of arguments to pass to the command,
1591 and an optional file object containing the raw data for the command.
1502 and an optional file object containing the raw data for the command.
1592
1503
1593 Returns a 3-tuple of (request, action, action data).
1504 Returns a 3-tuple of (request, action, action data).
1594 """
1505 """
1595 if not self._canissuecommands:
1506 if not self._canissuecommands:
1596 raise error.ProgrammingError('cannot issue new commands')
1507 raise error.ProgrammingError('cannot issue new commands')
1597
1508
1598 requestid = self._nextrequestid
1509 requestid = self._nextrequestid
1599 self._nextrequestid += 2
1510 self._nextrequestid += 2
1600
1511
1601 request = commandrequest(requestid, name, args, datafh=datafh,
1512 request = commandrequest(requestid, name, args, datafh=datafh,
1602 redirect=redirect)
1513 redirect=redirect)
1603
1514
1604 if self._buffersends:
1515 if self._buffersends:
1605 self._pendingrequests.append(request)
1516 self._pendingrequests.append(request)
1606 return request, 'noop', {}
1517 return request, 'noop', {}
1607 else:
1518 else:
1608 if not self._cansend:
1519 if not self._cansend:
1609 raise error.ProgrammingError('sends cannot be performed on '
1520 raise error.ProgrammingError('sends cannot be performed on '
1610 'this instance')
1521 'this instance')
1611
1522
1612 if not self._hasmultiplesend:
1523 if not self._hasmultiplesend:
1613 self._cansend = False
1524 self._cansend = False
1614 self._canissuecommands = False
1525 self._canissuecommands = False
1615
1526
1616 return request, 'sendframes', {
1527 return request, 'sendframes', {
1617 'framegen': self._makecommandframes(request),
1528 'framegen': self._makecommandframes(request),
1618 }
1529 }
1619
1530
1620 def flushcommands(self):
1531 def flushcommands(self):
1621 """Request that all queued commands be sent.
1532 """Request that all queued commands be sent.
1622
1533
1623 If any commands are buffered, this will instruct the caller to send
1534 If any commands are buffered, this will instruct the caller to send
1624 them over the wire. If no commands are buffered it instructs the client
1535 them over the wire. If no commands are buffered it instructs the client
1625 to no-op.
1536 to no-op.
1626
1537
1627 If instances aren't configured for multiple sends, no new command
1538 If instances aren't configured for multiple sends, no new command
1628 requests are allowed after this is called.
1539 requests are allowed after this is called.
1629 """
1540 """
1630 if not self._pendingrequests:
1541 if not self._pendingrequests:
1631 return 'noop', {}
1542 return 'noop', {}
1632
1543
1633 if not self._cansend:
1544 if not self._cansend:
1634 raise error.ProgrammingError('sends cannot be performed on this '
1545 raise error.ProgrammingError('sends cannot be performed on this '
1635 'instance')
1546 'instance')
1636
1547
1637 # If the instance only allows sending once, mark that we have fired
1548 # If the instance only allows sending once, mark that we have fired
1638 # our one shot.
1549 # our one shot.
1639 if not self._hasmultiplesend:
1550 if not self._hasmultiplesend:
1640 self._canissuecommands = False
1551 self._canissuecommands = False
1641 self._cansend = False
1552 self._cansend = False
1642
1553
1643 def makeframes():
1554 def makeframes():
1644 while self._pendingrequests:
1555 while self._pendingrequests:
1645 request = self._pendingrequests.popleft()
1556 request = self._pendingrequests.popleft()
1646 for frame in self._makecommandframes(request):
1557 for frame in self._makecommandframes(request):
1647 yield frame
1558 yield frame
1648
1559
1649 return 'sendframes', {
1560 return 'sendframes', {
1650 'framegen': makeframes(),
1561 'framegen': makeframes(),
1651 }
1562 }
1652
1563
1653 def _makecommandframes(self, request):
1564 def _makecommandframes(self, request):
1654 """Emit frames to issue a command request.
1565 """Emit frames to issue a command request.
1655
1566
1656 As a side-effect, update request accounting to reflect its changed
1567 As a side-effect, update request accounting to reflect its changed
1657 state.
1568 state.
1658 """
1569 """
1659 self._activerequests[request.requestid] = request
1570 self._activerequests[request.requestid] = request
1660 request.state = 'sending'
1571 request.state = 'sending'
1661
1572
1662 if not self._protocolsettingssent and self._clientcontentencoders:
1573 if not self._protocolsettingssent and self._clientcontentencoders:
1663 self._protocolsettingssent = True
1574 self._protocolsettingssent = True
1664
1575
1665 payload = b''.join(cborutil.streamencode({
1576 payload = b''.join(cborutil.streamencode({
1666 b'contentencodings': self._clientcontentencoders,
1577 b'contentencodings': self._clientcontentencoders,
1667 }))
1578 }))
1668
1579
1669 yield self._outgoingstream.makeframe(
1580 yield self._outgoingstream.makeframe(
1670 requestid=request.requestid,
1581 requestid=request.requestid,
1671 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1582 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1672 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1583 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1673 payload=payload)
1584 payload=payload)
1674
1585
1675 res = createcommandframes(self._outgoingstream,
1586 res = createcommandframes(self._outgoingstream,
1676 request.requestid,
1587 request.requestid,
1677 request.name,
1588 request.name,
1678 request.args,
1589 request.args,
1679 datafh=request.datafh,
1590 datafh=request.datafh,
1680 redirect=request.redirect)
1591 redirect=request.redirect)
1681
1592
1682 for frame in res:
1593 for frame in res:
1683 yield frame
1594 yield frame
1684
1595
1685 request.state = 'sent'
1596 request.state = 'sent'
1686
1597
1687 def onframerecv(self, frame):
1598 def onframerecv(self, frame):
1688 """Process a frame that has been received off the wire.
1599 """Process a frame that has been received off the wire.
1689
1600
1690 Returns a 2-tuple of (action, meta) describing further action the
1601 Returns a 2-tuple of (action, meta) describing further action the
1691 caller needs to take as a result of receiving this frame.
1602 caller needs to take as a result of receiving this frame.
1692 """
1603 """
1693 if frame.streamid % 2:
1604 if frame.streamid % 2:
1694 return 'error', {
1605 return 'error', {
1695 'message': (
1606 'message': (
1696 _('received frame with odd numbered stream ID: %d') %
1607 _('received frame with odd numbered stream ID: %d') %
1697 frame.streamid),
1608 frame.streamid),
1698 }
1609 }
1699
1610
1700 if frame.streamid not in self._incomingstreams:
1611 if frame.streamid not in self._incomingstreams:
1701 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1612 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1702 return 'error', {
1613 return 'error', {
1703 'message': _('received frame on unknown stream '
1614 'message': _('received frame on unknown stream '
1704 'without beginning of stream flag set'),
1615 'without beginning of stream flag set'),
1705 }
1616 }
1706
1617
1707 self._incomingstreams[frame.streamid] = inputstream(
1618 self._incomingstreams[frame.streamid] = inputstream(
1708 frame.streamid)
1619 frame.streamid)
1709
1620
1710 stream = self._incomingstreams[frame.streamid]
1621 stream = self._incomingstreams[frame.streamid]
1711
1622
1712 # If the payload is encoded, ask the stream to decode it. We
1623 # If the payload is encoded, ask the stream to decode it. We
1713 # merely substitute the decoded result into the frame payload as
1624 # merely substitute the decoded result into the frame payload as
1714 # if it had been transferred all along.
1625 # if it had been transferred all along.
1715 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1626 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1716 frame.payload = stream.decode(frame.payload)
1627 frame.payload = stream.decode(frame.payload)
1717
1628
1718 if frame.streamflags & STREAM_FLAG_END_STREAM:
1629 if frame.streamflags & STREAM_FLAG_END_STREAM:
1719 del self._incomingstreams[frame.streamid]
1630 del self._incomingstreams[frame.streamid]
1720
1631
1721 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1632 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1722 return self._onstreamsettingsframe(frame)
1633 return self._onstreamsettingsframe(frame)
1723
1634
1724 if frame.requestid not in self._activerequests:
1635 if frame.requestid not in self._activerequests:
1725 return 'error', {
1636 return 'error', {
1726 'message': (_('received frame for inactive request ID: %d') %
1637 'message': (_('received frame for inactive request ID: %d') %
1727 frame.requestid),
1638 frame.requestid),
1728 }
1639 }
1729
1640
1730 request = self._activerequests[frame.requestid]
1641 request = self._activerequests[frame.requestid]
1731 request.state = 'receiving'
1642 request.state = 'receiving'
1732
1643
1733 handlers = {
1644 handlers = {
1734 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1645 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1735 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1646 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1736 }
1647 }
1737
1648
1738 meth = handlers.get(frame.typeid)
1649 meth = handlers.get(frame.typeid)
1739 if not meth:
1650 if not meth:
1740 raise error.ProgrammingError('unhandled frame type: %d' %
1651 raise error.ProgrammingError('unhandled frame type: %d' %
1741 frame.typeid)
1652 frame.typeid)
1742
1653
1743 return meth(request, frame)
1654 return meth(request, frame)
1744
1655
1745 def _onstreamsettingsframe(self, frame):
1656 def _onstreamsettingsframe(self, frame):
1746 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1657 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1747
1658
1748 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1659 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1749 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1660 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1750
1661
1751 if more and eos:
1662 if more and eos:
1752 return 'error', {
1663 return 'error', {
1753 'message': (_('stream encoding settings frame cannot have both '
1664 'message': (_('stream encoding settings frame cannot have both '
1754 'continuation and end of stream flags set')),
1665 'continuation and end of stream flags set')),
1755 }
1666 }
1756
1667
1757 if not more and not eos:
1668 if not more and not eos:
1758 return 'error', {
1669 return 'error', {
1759 'message': _('stream encoding settings frame must have '
1670 'message': _('stream encoding settings frame must have '
1760 'continuation or end of stream flag set'),
1671 'continuation or end of stream flag set'),
1761 }
1672 }
1762
1673
1763 if frame.streamid not in self._streamsettingsdecoders:
1674 if frame.streamid not in self._streamsettingsdecoders:
1764 decoder = cborutil.bufferingdecoder()
1675 decoder = cborutil.bufferingdecoder()
1765 self._streamsettingsdecoders[frame.streamid] = decoder
1676 self._streamsettingsdecoders[frame.streamid] = decoder
1766
1677
1767 decoder = self._streamsettingsdecoders[frame.streamid]
1678 decoder = self._streamsettingsdecoders[frame.streamid]
1768
1679
1769 try:
1680 try:
1770 decoder.decode(frame.payload)
1681 decoder.decode(frame.payload)
1771 except Exception as e:
1682 except Exception as e:
1772 return 'error', {
1683 return 'error', {
1773 'message': (_('error decoding CBOR from stream encoding '
1684 'message': (_('error decoding CBOR from stream encoding '
1774 'settings frame: %s') %
1685 'settings frame: %s') %
1775 stringutil.forcebytestr(e)),
1686 stringutil.forcebytestr(e)),
1776 }
1687 }
1777
1688
1778 if more:
1689 if more:
1779 return 'noop', {}
1690 return 'noop', {}
1780
1691
1781 assert eos
1692 assert eos
1782
1693
1783 decoded = decoder.getavailable()
1694 decoded = decoder.getavailable()
1784 del self._streamsettingsdecoders[frame.streamid]
1695 del self._streamsettingsdecoders[frame.streamid]
1785
1696
1786 if not decoded:
1697 if not decoded:
1787 return 'error', {
1698 return 'error', {
1788 'message': _('stream encoding settings frame did not contain '
1699 'message': _('stream encoding settings frame did not contain '
1789 'CBOR data'),
1700 'CBOR data'),
1790 }
1701 }
1791
1702
1792 try:
1703 try:
1793 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1704 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1794 decoded[0],
1705 decoded[0],
1795 decoded[1:])
1706 decoded[1:])
1796 except Exception as e:
1707 except Exception as e:
1797 return 'error', {
1708 return 'error', {
1798 'message': (_('error setting stream decoder: %s') %
1709 'message': (_('error setting stream decoder: %s') %
1799 stringutil.forcebytestr(e)),
1710 stringutil.forcebytestr(e)),
1800 }
1711 }
1801
1712
1802 return 'noop', {}
1713 return 'noop', {}
1803
1714
1804 def _oncommandresponseframe(self, request, frame):
1715 def _oncommandresponseframe(self, request, frame):
1805 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1716 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1806 request.state = 'received'
1717 request.state = 'received'
1807 del self._activerequests[request.requestid]
1718 del self._activerequests[request.requestid]
1808
1719
1809 return 'responsedata', {
1720 return 'responsedata', {
1810 'request': request,
1721 'request': request,
1811 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1722 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1812 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1723 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1813 'data': frame.payload,
1724 'data': frame.payload,
1814 }
1725 }
1815
1726
1816 def _onerrorresponseframe(self, request, frame):
1727 def _onerrorresponseframe(self, request, frame):
1817 request.state = 'errored'
1728 request.state = 'errored'
1818 del self._activerequests[request.requestid]
1729 del self._activerequests[request.requestid]
1819
1730
1820 # The payload should be a CBOR map.
1731 # The payload should be a CBOR map.
1821 m = cborutil.decodeall(frame.payload)[0]
1732 m = cborutil.decodeall(frame.payload)[0]
1822
1733
1823 return 'error', {
1734 return 'error', {
1824 'request': request,
1735 'request': request,
1825 'type': m['type'],
1736 'type': m['type'],
1826 'message': m['message'],
1737 'message': m['message'],
1827 }
1738 }
@@ -1,604 +1,629 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial.thirdparty import (
5 from mercurial.thirdparty import (
6 cbor,
6 cbor,
7 )
7 )
8 from mercurial import (
8 from mercurial import (
9 ui as uimod,
9 ui as uimod,
10 util,
10 util,
11 wireprotoframing as framing,
11 wireprotoframing as framing,
12 )
12 )
13 from mercurial.utils import (
13 from mercurial.utils import (
14 cborutil,
14 cborutil,
15 )
15 )
16
16
17 ffs = framing.makeframefromhumanstring
17 ffs = framing.makeframefromhumanstring
18
18
19 OK = cbor.dumps({b'status': b'ok'})
19 OK = cbor.dumps({b'status': b'ok'})
20
20
21 def makereactor(deferoutput=False):
21 def makereactor(deferoutput=False):
22 ui = uimod.ui()
22 ui = uimod.ui()
23 return framing.serverreactor(ui, deferoutput=deferoutput)
23 return framing.serverreactor(ui, deferoutput=deferoutput)
24
24
25 def sendframes(reactor, gen):
25 def sendframes(reactor, gen):
26 """Send a generator of frame bytearray to a reactor.
26 """Send a generator of frame bytearray to a reactor.
27
27
28 Emits a generator of results from ``onframerecv()`` calls.
28 Emits a generator of results from ``onframerecv()`` calls.
29 """
29 """
30 for frame in gen:
30 for frame in gen:
31 header = framing.parseheader(frame)
31 header = framing.parseheader(frame)
32 payload = frame[framing.FRAME_HEADER_SIZE:]
32 payload = frame[framing.FRAME_HEADER_SIZE:]
33 assert len(payload) == header.length
33 assert len(payload) == header.length
34
34
35 yield reactor.onframerecv(framing.frame(header.requestid,
35 yield reactor.onframerecv(framing.frame(header.requestid,
36 header.streamid,
36 header.streamid,
37 header.streamflags,
37 header.streamflags,
38 header.typeid,
38 header.typeid,
39 header.flags,
39 header.flags,
40 payload))
40 payload))
41
41
42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
43 """Generate frames to run a command and send them to a reactor."""
43 """Generate frames to run a command and send them to a reactor."""
44 return sendframes(reactor,
44 return sendframes(reactor,
45 framing.createcommandframes(stream, rid, cmd, args,
45 framing.createcommandframes(stream, rid, cmd, args,
46 datafh))
46 datafh))
47
47
48
48
49 class ServerReactorTests(unittest.TestCase):
49 class ServerReactorTests(unittest.TestCase):
50 def _sendsingleframe(self, reactor, f):
50 def _sendsingleframe(self, reactor, f):
51 results = list(sendframes(reactor, [f]))
51 results = list(sendframes(reactor, [f]))
52 self.assertEqual(len(results), 1)
52 self.assertEqual(len(results), 1)
53
53
54 return results[0]
54 return results[0]
55
55
56 def assertaction(self, res, expected):
56 def assertaction(self, res, expected):
57 self.assertIsInstance(res, tuple)
57 self.assertIsInstance(res, tuple)
58 self.assertEqual(len(res), 2)
58 self.assertEqual(len(res), 2)
59 self.assertIsInstance(res[1], dict)
59 self.assertIsInstance(res[1], dict)
60 self.assertEqual(res[0], expected)
60 self.assertEqual(res[0], expected)
61
61
62 def assertframesequal(self, frames, framestrings):
62 def assertframesequal(self, frames, framestrings):
63 expected = [ffs(s) for s in framestrings]
63 expected = [ffs(s) for s in framestrings]
64 self.assertEqual(list(frames), expected)
64 self.assertEqual(list(frames), expected)
65
65
66 def test1framecommand(self):
66 def test1framecommand(self):
67 """Receiving a command in a single frame yields request to run it."""
67 """Receiving a command in a single frame yields request to run it."""
68 reactor = makereactor()
68 reactor = makereactor()
69 stream = framing.stream(1)
69 stream = framing.stream(1)
70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
71 self.assertEqual(len(results), 1)
71 self.assertEqual(len(results), 1)
72 self.assertaction(results[0], b'runcommand')
72 self.assertaction(results[0], b'runcommand')
73 self.assertEqual(results[0][1], {
73 self.assertEqual(results[0][1], {
74 b'requestid': 1,
74 b'requestid': 1,
75 b'command': b'mycommand',
75 b'command': b'mycommand',
76 b'args': {},
76 b'args': {},
77 b'redirect': None,
77 b'redirect': None,
78 b'data': None,
78 b'data': None,
79 })
79 })
80
80
81 result = reactor.oninputeof()
81 result = reactor.oninputeof()
82 self.assertaction(result, b'noop')
82 self.assertaction(result, b'noop')
83
83
84 def test1argument(self):
84 def test1argument(self):
85 reactor = makereactor()
85 reactor = makereactor()
86 stream = framing.stream(1)
86 stream = framing.stream(1)
87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
88 {b'foo': b'bar'}))
88 {b'foo': b'bar'}))
89 self.assertEqual(len(results), 1)
89 self.assertEqual(len(results), 1)
90 self.assertaction(results[0], b'runcommand')
90 self.assertaction(results[0], b'runcommand')
91 self.assertEqual(results[0][1], {
91 self.assertEqual(results[0][1], {
92 b'requestid': 41,
92 b'requestid': 41,
93 b'command': b'mycommand',
93 b'command': b'mycommand',
94 b'args': {b'foo': b'bar'},
94 b'args': {b'foo': b'bar'},
95 b'redirect': None,
95 b'redirect': None,
96 b'data': None,
96 b'data': None,
97 })
97 })
98
98
99 def testmultiarguments(self):
99 def testmultiarguments(self):
100 reactor = makereactor()
100 reactor = makereactor()
101 stream = framing.stream(1)
101 stream = framing.stream(1)
102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
103 {b'foo': b'bar', b'biz': b'baz'}))
103 {b'foo': b'bar', b'biz': b'baz'}))
104 self.assertEqual(len(results), 1)
104 self.assertEqual(len(results), 1)
105 self.assertaction(results[0], b'runcommand')
105 self.assertaction(results[0], b'runcommand')
106 self.assertEqual(results[0][1], {
106 self.assertEqual(results[0][1], {
107 b'requestid': 1,
107 b'requestid': 1,
108 b'command': b'mycommand',
108 b'command': b'mycommand',
109 b'args': {b'foo': b'bar', b'biz': b'baz'},
109 b'args': {b'foo': b'bar', b'biz': b'baz'},
110 b'redirect': None,
110 b'redirect': None,
111 b'data': None,
111 b'data': None,
112 })
112 })
113
113
114 def testsimplecommanddata(self):
114 def testsimplecommanddata(self):
115 reactor = makereactor()
115 reactor = makereactor()
116 stream = framing.stream(1)
116 stream = framing.stream(1)
117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
118 util.bytesio(b'data!')))
118 util.bytesio(b'data!')))
119 self.assertEqual(len(results), 2)
119 self.assertEqual(len(results), 2)
120 self.assertaction(results[0], b'wantframe')
120 self.assertaction(results[0], b'wantframe')
121 self.assertaction(results[1], b'runcommand')
121 self.assertaction(results[1], b'runcommand')
122 self.assertEqual(results[1][1], {
122 self.assertEqual(results[1][1], {
123 b'requestid': 1,
123 b'requestid': 1,
124 b'command': b'mycommand',
124 b'command': b'mycommand',
125 b'args': {},
125 b'args': {},
126 b'redirect': None,
126 b'redirect': None,
127 b'data': b'data!',
127 b'data': b'data!',
128 })
128 })
129
129
130 def testmultipledataframes(self):
130 def testmultipledataframes(self):
131 frames = [
131 frames = [
132 ffs(b'1 1 stream-begin command-request new|have-data '
132 ffs(b'1 1 stream-begin command-request new|have-data '
133 b"cbor:{b'name': b'mycommand'}"),
133 b"cbor:{b'name': b'mycommand'}"),
134 ffs(b'1 1 0 command-data continuation data1'),
134 ffs(b'1 1 0 command-data continuation data1'),
135 ffs(b'1 1 0 command-data continuation data2'),
135 ffs(b'1 1 0 command-data continuation data2'),
136 ffs(b'1 1 0 command-data eos data3'),
136 ffs(b'1 1 0 command-data eos data3'),
137 ]
137 ]
138
138
139 reactor = makereactor()
139 reactor = makereactor()
140 results = list(sendframes(reactor, frames))
140 results = list(sendframes(reactor, frames))
141 self.assertEqual(len(results), 4)
141 self.assertEqual(len(results), 4)
142 for i in range(3):
142 for i in range(3):
143 self.assertaction(results[i], b'wantframe')
143 self.assertaction(results[i], b'wantframe')
144 self.assertaction(results[3], b'runcommand')
144 self.assertaction(results[3], b'runcommand')
145 self.assertEqual(results[3][1], {
145 self.assertEqual(results[3][1], {
146 b'requestid': 1,
146 b'requestid': 1,
147 b'command': b'mycommand',
147 b'command': b'mycommand',
148 b'args': {},
148 b'args': {},
149 b'redirect': None,
149 b'redirect': None,
150 b'data': b'data1data2data3',
150 b'data': b'data1data2data3',
151 })
151 })
152
152
153 def testargumentanddata(self):
153 def testargumentanddata(self):
154 frames = [
154 frames = [
155 ffs(b'1 1 stream-begin command-request new|have-data '
155 ffs(b'1 1 stream-begin command-request new|have-data '
156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
157 b"b'foo': b'bar'}}"),
157 b"b'foo': b'bar'}}"),
158 ffs(b'1 1 0 command-data continuation value1'),
158 ffs(b'1 1 0 command-data continuation value1'),
159 ffs(b'1 1 0 command-data eos value2'),
159 ffs(b'1 1 0 command-data eos value2'),
160 ]
160 ]
161
161
162 reactor = makereactor()
162 reactor = makereactor()
163 results = list(sendframes(reactor, frames))
163 results = list(sendframes(reactor, frames))
164
164
165 self.assertaction(results[-1], b'runcommand')
165 self.assertaction(results[-1], b'runcommand')
166 self.assertEqual(results[-1][1], {
166 self.assertEqual(results[-1][1], {
167 b'requestid': 1,
167 b'requestid': 1,
168 b'command': b'command',
168 b'command': b'command',
169 b'args': {
169 b'args': {
170 b'key': b'val',
170 b'key': b'val',
171 b'foo': b'bar',
171 b'foo': b'bar',
172 },
172 },
173 b'redirect': None,
173 b'redirect': None,
174 b'data': b'value1value2',
174 b'data': b'value1value2',
175 })
175 })
176
176
177 def testnewandcontinuation(self):
177 def testnewandcontinuation(self):
178 result = self._sendsingleframe(makereactor(),
178 result = self._sendsingleframe(makereactor(),
179 ffs(b'1 1 stream-begin command-request new|continuation '))
179 ffs(b'1 1 stream-begin command-request new|continuation '))
180 self.assertaction(result, b'error')
180 self.assertaction(result, b'error')
181 self.assertEqual(result[1], {
181 self.assertEqual(result[1], {
182 b'message': b'received command request frame with both new and '
182 b'message': b'received command request frame with both new and '
183 b'continuation flags set',
183 b'continuation flags set',
184 })
184 })
185
185
186 def testneithernewnorcontinuation(self):
186 def testneithernewnorcontinuation(self):
187 result = self._sendsingleframe(makereactor(),
187 result = self._sendsingleframe(makereactor(),
188 ffs(b'1 1 stream-begin command-request 0 '))
188 ffs(b'1 1 stream-begin command-request 0 '))
189 self.assertaction(result, b'error')
189 self.assertaction(result, b'error')
190 self.assertEqual(result[1], {
190 self.assertEqual(result[1], {
191 b'message': b'received command request frame with neither new nor '
191 b'message': b'received command request frame with neither new nor '
192 b'continuation flags set',
192 b'continuation flags set',
193 })
193 })
194
194
195 def testunexpectedcommanddata(self):
195 def testunexpectedcommanddata(self):
196 """Command data frame when not running a command is an error."""
196 """Command data frame when not running a command is an error."""
197 result = self._sendsingleframe(makereactor(),
197 result = self._sendsingleframe(makereactor(),
198 ffs(b'1 1 stream-begin command-data 0 ignored'))
198 ffs(b'1 1 stream-begin command-data 0 ignored'))
199 self.assertaction(result, b'error')
199 self.assertaction(result, b'error')
200 self.assertEqual(result[1], {
200 self.assertEqual(result[1], {
201 b'message': b'expected sender protocol settings or command request '
201 b'message': b'expected sender protocol settings or command request '
202 b'frame; got 2',
202 b'frame; got 2',
203 })
203 })
204
204
205 def testunexpectedcommanddatareceiving(self):
205 def testunexpectedcommanddatareceiving(self):
206 """Same as above except the command is receiving."""
206 """Same as above except the command is receiving."""
207 results = list(sendframes(makereactor(), [
207 results = list(sendframes(makereactor(), [
208 ffs(b'1 1 stream-begin command-request new|more '
208 ffs(b'1 1 stream-begin command-request new|more '
209 b"cbor:{b'name': b'ignored'}"),
209 b"cbor:{b'name': b'ignored'}"),
210 ffs(b'1 1 0 command-data eos ignored'),
210 ffs(b'1 1 0 command-data eos ignored'),
211 ]))
211 ]))
212
212
213 self.assertaction(results[0], b'wantframe')
213 self.assertaction(results[0], b'wantframe')
214 self.assertaction(results[1], b'error')
214 self.assertaction(results[1], b'error')
215 self.assertEqual(results[1][1], {
215 self.assertEqual(results[1][1], {
216 b'message': b'received command data frame for request that is not '
216 b'message': b'received command data frame for request that is not '
217 b'expecting data: 1',
217 b'expecting data: 1',
218 })
218 })
219
219
220 def testconflictingrequestidallowed(self):
220 def testconflictingrequestidallowed(self):
221 """Multiple fully serviced commands with same request ID is allowed."""
221 """Multiple fully serviced commands with same request ID is allowed."""
222 reactor = makereactor()
222 reactor = makereactor()
223 results = []
223 results = []
224 outstream = reactor.makeoutputstream()
224 outstream = reactor.makeoutputstream()
225 results.append(self._sendsingleframe(
225 results.append(self._sendsingleframe(
226 reactor, ffs(b'1 1 stream-begin command-request new '
226 reactor, ffs(b'1 1 stream-begin command-request new '
227 b"cbor:{b'name': b'command'}")))
227 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponseready(outstream, 1, b'response1')
228 result = reactor.oncommandresponsereadyobjects(
229 outstream, 1, [b'response1'])
229 self.assertaction(result, b'sendframes')
230 self.assertaction(result, b'sendframes')
230 list(result[1][b'framegen'])
231 list(result[1][b'framegen'])
231 results.append(self._sendsingleframe(
232 results.append(self._sendsingleframe(
232 reactor, ffs(b'1 1 stream-begin command-request new '
233 reactor, ffs(b'1 1 stream-begin command-request new '
233 b"cbor:{b'name': b'command'}")))
234 b"cbor:{b'name': b'command'}")))
234 result = reactor.oncommandresponseready(outstream, 1, b'response2')
235 result = reactor.oncommandresponsereadyobjects(
236 outstream, 1, [b'response2'])
235 self.assertaction(result, b'sendframes')
237 self.assertaction(result, b'sendframes')
236 list(result[1][b'framegen'])
238 list(result[1][b'framegen'])
237 results.append(self._sendsingleframe(
239 results.append(self._sendsingleframe(
238 reactor, ffs(b'1 1 stream-begin command-request new '
240 reactor, ffs(b'1 1 stream-begin command-request new '
239 b"cbor:{b'name': b'command'}")))
241 b"cbor:{b'name': b'command'}")))
240 result = reactor.oncommandresponseready(outstream, 1, b'response3')
242 result = reactor.oncommandresponsereadyobjects(
243 outstream, 1, [b'response3'])
241 self.assertaction(result, b'sendframes')
244 self.assertaction(result, b'sendframes')
242 list(result[1][b'framegen'])
245 list(result[1][b'framegen'])
243
246
244 for i in range(3):
247 for i in range(3):
245 self.assertaction(results[i], b'runcommand')
248 self.assertaction(results[i], b'runcommand')
246 self.assertEqual(results[i][1], {
249 self.assertEqual(results[i][1], {
247 b'requestid': 1,
250 b'requestid': 1,
248 b'command': b'command',
251 b'command': b'command',
249 b'args': {},
252 b'args': {},
250 b'redirect': None,
253 b'redirect': None,
251 b'data': None,
254 b'data': None,
252 })
255 })
253
256
254 def testconflictingrequestid(self):
257 def testconflictingrequestid(self):
255 """Request ID for new command matching in-flight command is illegal."""
258 """Request ID for new command matching in-flight command is illegal."""
256 results = list(sendframes(makereactor(), [
259 results = list(sendframes(makereactor(), [
257 ffs(b'1 1 stream-begin command-request new|more '
260 ffs(b'1 1 stream-begin command-request new|more '
258 b"cbor:{b'name': b'command'}"),
261 b"cbor:{b'name': b'command'}"),
259 ffs(b'1 1 0 command-request new '
262 ffs(b'1 1 0 command-request new '
260 b"cbor:{b'name': b'command1'}"),
263 b"cbor:{b'name': b'command1'}"),
261 ]))
264 ]))
262
265
263 self.assertaction(results[0], b'wantframe')
266 self.assertaction(results[0], b'wantframe')
264 self.assertaction(results[1], b'error')
267 self.assertaction(results[1], b'error')
265 self.assertEqual(results[1][1], {
268 self.assertEqual(results[1][1], {
266 b'message': b'request with ID 1 already received',
269 b'message': b'request with ID 1 already received',
267 })
270 })
268
271
269 def testinterleavedcommands(self):
272 def testinterleavedcommands(self):
270 cbor1 = cbor.dumps({
273 cbor1 = cbor.dumps({
271 b'name': b'command1',
274 b'name': b'command1',
272 b'args': {
275 b'args': {
273 b'foo': b'bar',
276 b'foo': b'bar',
274 b'key1': b'val',
277 b'key1': b'val',
275 }
278 }
276 }, canonical=True)
279 }, canonical=True)
277 cbor3 = cbor.dumps({
280 cbor3 = cbor.dumps({
278 b'name': b'command3',
281 b'name': b'command3',
279 b'args': {
282 b'args': {
280 b'biz': b'baz',
283 b'biz': b'baz',
281 b'key': b'val',
284 b'key': b'val',
282 },
285 },
283 }, canonical=True)
286 }, canonical=True)
284
287
285 results = list(sendframes(makereactor(), [
288 results = list(sendframes(makereactor(), [
286 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
289 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
287 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
290 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
288 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
291 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
289 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
292 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
290 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
293 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
291 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
294 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
292 ]))
295 ]))
293
296
294 self.assertEqual([t[0] for t in results], [
297 self.assertEqual([t[0] for t in results], [
295 b'wantframe',
298 b'wantframe',
296 b'wantframe',
299 b'wantframe',
297 b'wantframe',
300 b'wantframe',
298 b'wantframe',
301 b'wantframe',
299 b'runcommand',
302 b'runcommand',
300 b'runcommand',
303 b'runcommand',
301 ])
304 ])
302
305
303 self.assertEqual(results[4][1], {
306 self.assertEqual(results[4][1], {
304 b'requestid': 3,
307 b'requestid': 3,
305 b'command': b'command3',
308 b'command': b'command3',
306 b'args': {b'biz': b'baz', b'key': b'val'},
309 b'args': {b'biz': b'baz', b'key': b'val'},
307 b'redirect': None,
310 b'redirect': None,
308 b'data': None,
311 b'data': None,
309 })
312 })
310 self.assertEqual(results[5][1], {
313 self.assertEqual(results[5][1], {
311 b'requestid': 1,
314 b'requestid': 1,
312 b'command': b'command1',
315 b'command': b'command1',
313 b'args': {b'foo': b'bar', b'key1': b'val'},
316 b'args': {b'foo': b'bar', b'key1': b'val'},
314 b'redirect': None,
317 b'redirect': None,
315 b'data': None,
318 b'data': None,
316 })
319 })
317
320
318 def testmissingcommanddataframe(self):
321 def testmissingcommanddataframe(self):
319 # The reactor doesn't currently handle partially received commands.
322 # The reactor doesn't currently handle partially received commands.
320 # So this test is failing to do anything with request 1.
323 # So this test is failing to do anything with request 1.
321 frames = [
324 frames = [
322 ffs(b'1 1 stream-begin command-request new|have-data '
325 ffs(b'1 1 stream-begin command-request new|have-data '
323 b"cbor:{b'name': b'command1'}"),
326 b"cbor:{b'name': b'command1'}"),
324 ffs(b'3 1 0 command-request new '
327 ffs(b'3 1 0 command-request new '
325 b"cbor:{b'name': b'command2'}"),
328 b"cbor:{b'name': b'command2'}"),
326 ]
329 ]
327 results = list(sendframes(makereactor(), frames))
330 results = list(sendframes(makereactor(), frames))
328 self.assertEqual(len(results), 2)
331 self.assertEqual(len(results), 2)
329 self.assertaction(results[0], b'wantframe')
332 self.assertaction(results[0], b'wantframe')
330 self.assertaction(results[1], b'runcommand')
333 self.assertaction(results[1], b'runcommand')
331
334
332 def testmissingcommanddataframeflags(self):
335 def testmissingcommanddataframeflags(self):
333 frames = [
336 frames = [
334 ffs(b'1 1 stream-begin command-request new|have-data '
337 ffs(b'1 1 stream-begin command-request new|have-data '
335 b"cbor:{b'name': b'command1'}"),
338 b"cbor:{b'name': b'command1'}"),
336 ffs(b'1 1 0 command-data 0 data'),
339 ffs(b'1 1 0 command-data 0 data'),
337 ]
340 ]
338 results = list(sendframes(makereactor(), frames))
341 results = list(sendframes(makereactor(), frames))
339 self.assertEqual(len(results), 2)
342 self.assertEqual(len(results), 2)
340 self.assertaction(results[0], b'wantframe')
343 self.assertaction(results[0], b'wantframe')
341 self.assertaction(results[1], b'error')
344 self.assertaction(results[1], b'error')
342 self.assertEqual(results[1][1], {
345 self.assertEqual(results[1][1], {
343 b'message': b'command data frame without flags',
346 b'message': b'command data frame without flags',
344 })
347 })
345
348
346 def testframefornonreceivingrequest(self):
349 def testframefornonreceivingrequest(self):
347 """Receiving a frame for a command that is not receiving is illegal."""
350 """Receiving a frame for a command that is not receiving is illegal."""
348 results = list(sendframes(makereactor(), [
351 results = list(sendframes(makereactor(), [
349 ffs(b'1 1 stream-begin command-request new '
352 ffs(b'1 1 stream-begin command-request new '
350 b"cbor:{b'name': b'command1'}"),
353 b"cbor:{b'name': b'command1'}"),
351 ffs(b'3 1 0 command-request new|have-data '
354 ffs(b'3 1 0 command-request new|have-data '
352 b"cbor:{b'name': b'command3'}"),
355 b"cbor:{b'name': b'command3'}"),
353 ffs(b'5 1 0 command-data eos ignored'),
356 ffs(b'5 1 0 command-data eos ignored'),
354 ]))
357 ]))
355 self.assertaction(results[2], b'error')
358 self.assertaction(results[2], b'error')
356 self.assertEqual(results[2][1], {
359 self.assertEqual(results[2][1], {
357 b'message': b'received frame for request that is not receiving: 5',
360 b'message': b'received frame for request that is not receiving: 5',
358 })
361 })
359
362
360 def testsimpleresponse(self):
363 def testsimpleresponse(self):
361 """Bytes response to command sends result frames."""
364 """Bytes response to command sends result frames."""
362 reactor = makereactor()
365 reactor = makereactor()
363 instream = framing.stream(1)
366 instream = framing.stream(1)
364 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
365
368
366 outstream = reactor.makeoutputstream()
369 outstream = reactor.makeoutputstream()
367 result = reactor.oncommandresponseready(outstream, 1, b'response')
370 result = reactor.oncommandresponsereadyobjects(
371 outstream, 1, [b'response'])
368 self.assertaction(result, b'sendframes')
372 self.assertaction(result, b'sendframes')
369 self.assertframesequal(result[1][b'framegen'], [
373 self.assertframesequal(result[1][b'framegen'], [
370 b'1 2 stream-begin command-response eos %sresponse' % OK,
374 b'1 2 stream-begin command-response continuation %s' % OK,
375 b'1 2 0 command-response continuation cbor:b"response"',
376 b'1 2 0 command-response eos ',
371 ])
377 ])
372
378
373 def testmultiframeresponse(self):
379 def testmultiframeresponse(self):
374 """Bytes response spanning multiple frames is handled."""
380 """Bytes response spanning multiple frames is handled."""
375 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
381 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
376 second = b'y' * 100
382 second = b'y' * 100
377
383
378 reactor = makereactor()
384 reactor = makereactor()
379 instream = framing.stream(1)
385 instream = framing.stream(1)
380 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
381
387
382 outstream = reactor.makeoutputstream()
388 outstream = reactor.makeoutputstream()
383 result = reactor.oncommandresponseready(outstream, 1, first + second)
389 result = reactor.oncommandresponsereadyobjects(
390 outstream, 1, [first + second])
384 self.assertaction(result, b'sendframes')
391 self.assertaction(result, b'sendframes')
385 self.assertframesequal(result[1][b'framegen'], [
392 self.assertframesequal(result[1][b'framegen'], [
386 b'1 2 stream-begin command-response continuation %s' % OK,
393 b'1 2 stream-begin command-response continuation %s' % OK,
394 b'1 2 0 command-response continuation Y\x80d',
387 b'1 2 0 command-response continuation %s' % first,
395 b'1 2 0 command-response continuation %s' % first,
388 b'1 2 0 command-response eos %s' % second,
396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 b'1 2 0 command-response eos '
389 ])
399 ])
390
400
391 def testservererror(self):
401 def testservererror(self):
392 reactor = makereactor()
402 reactor = makereactor()
393 instream = framing.stream(1)
403 instream = framing.stream(1)
394 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
404 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
395
405
396 outstream = reactor.makeoutputstream()
406 outstream = reactor.makeoutputstream()
397 result = reactor.onservererror(outstream, 1, b'some message')
407 result = reactor.onservererror(outstream, 1, b'some message')
398 self.assertaction(result, b'sendframes')
408 self.assertaction(result, b'sendframes')
399 self.assertframesequal(result[1][b'framegen'], [
409 self.assertframesequal(result[1][b'framegen'], [
400 b"1 2 stream-begin error-response 0 "
410 b"1 2 stream-begin error-response 0 "
401 b"cbor:{b'type': b'server', "
411 b"cbor:{b'type': b'server', "
402 b"b'message': [{b'msg': b'some message'}]}",
412 b"b'message': [{b'msg': b'some message'}]}",
403 ])
413 ])
404
414
405 def test1commanddeferresponse(self):
415 def test1commanddeferresponse(self):
406 """Responses when in deferred output mode are delayed until EOF."""
416 """Responses when in deferred output mode are delayed until EOF."""
407 reactor = makereactor(deferoutput=True)
417 reactor = makereactor(deferoutput=True)
408 instream = framing.stream(1)
418 instream = framing.stream(1)
409 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
419 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
410 {}))
420 {}))
411 self.assertEqual(len(results), 1)
421 self.assertEqual(len(results), 1)
412 self.assertaction(results[0], b'runcommand')
422 self.assertaction(results[0], b'runcommand')
413
423
414 outstream = reactor.makeoutputstream()
424 outstream = reactor.makeoutputstream()
415 result = reactor.oncommandresponseready(outstream, 1, b'response')
425 result = reactor.oncommandresponsereadyobjects(
426 outstream, 1, [b'response'])
416 self.assertaction(result, b'noop')
427 self.assertaction(result, b'noop')
417 result = reactor.oninputeof()
428 result = reactor.oninputeof()
418 self.assertaction(result, b'sendframes')
429 self.assertaction(result, b'sendframes')
419 self.assertframesequal(result[1][b'framegen'], [
430 self.assertframesequal(result[1][b'framegen'], [
420 b'1 2 stream-begin command-response eos %sresponse' % OK,
431 b'1 2 stream-begin command-response continuation %s' % OK,
432 b'1 2 0 command-response continuation cbor:b"response"',
433 b'1 2 0 command-response eos ',
421 ])
434 ])
422
435
423 def testmultiplecommanddeferresponse(self):
436 def testmultiplecommanddeferresponse(self):
424 reactor = makereactor(deferoutput=True)
437 reactor = makereactor(deferoutput=True)
425 instream = framing.stream(1)
438 instream = framing.stream(1)
426 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
439 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
427 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
440 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
428
441
429 outstream = reactor.makeoutputstream()
442 outstream = reactor.makeoutputstream()
430 result = reactor.oncommandresponseready(outstream, 1, b'response1')
443 result = reactor.oncommandresponsereadyobjects(
444 outstream, 1, [b'response1'])
431 self.assertaction(result, b'noop')
445 self.assertaction(result, b'noop')
432 result = reactor.oncommandresponseready(outstream, 3, b'response2')
446 result = reactor.oncommandresponsereadyobjects(
447 outstream, 3, [b'response2'])
433 self.assertaction(result, b'noop')
448 self.assertaction(result, b'noop')
434 result = reactor.oninputeof()
449 result = reactor.oninputeof()
435 self.assertaction(result, b'sendframes')
450 self.assertaction(result, b'sendframes')
436 self.assertframesequal(result[1][b'framegen'], [
451 self.assertframesequal(result[1][b'framegen'], [
437 b'1 2 stream-begin command-response eos %sresponse1' % OK,
452 b'1 2 stream-begin command-response continuation %s' % OK,
438 b'3 2 0 command-response eos %sresponse2' % OK,
453 b'1 2 0 command-response continuation cbor:b"response1"',
454 b'1 2 0 command-response eos ',
455 b'3 2 0 command-response continuation %s' % OK,
456 b'3 2 0 command-response continuation cbor:b"response2"',
457 b'3 2 0 command-response eos ',
439 ])
458 ])
440
459
441 def testrequestidtracking(self):
460 def testrequestidtracking(self):
442 reactor = makereactor(deferoutput=True)
461 reactor = makereactor(deferoutput=True)
443 instream = framing.stream(1)
462 instream = framing.stream(1)
444 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
463 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
445 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
464 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
446 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
465 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
447
466
448 # Register results for commands out of order.
467 # Register results for commands out of order.
449 outstream = reactor.makeoutputstream()
468 outstream = reactor.makeoutputstream()
450 reactor.oncommandresponseready(outstream, 3, b'response3')
469 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
451 reactor.oncommandresponseready(outstream, 1, b'response1')
470 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
452 reactor.oncommandresponseready(outstream, 5, b'response5')
471 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
453
472
454 result = reactor.oninputeof()
473 result = reactor.oninputeof()
455 self.assertaction(result, b'sendframes')
474 self.assertaction(result, b'sendframes')
456 self.assertframesequal(result[1][b'framegen'], [
475 self.assertframesequal(result[1][b'framegen'], [
457 b'3 2 stream-begin command-response eos %sresponse3' % OK,
476 b'3 2 stream-begin command-response continuation %s' % OK,
458 b'1 2 0 command-response eos %sresponse1' % OK,
477 b'3 2 0 command-response continuation cbor:b"response3"',
459 b'5 2 0 command-response eos %sresponse5' % OK,
478 b'3 2 0 command-response eos ',
479 b'1 2 0 command-response continuation %s' % OK,
480 b'1 2 0 command-response continuation cbor:b"response1"',
481 b'1 2 0 command-response eos ',
482 b'5 2 0 command-response continuation %s' % OK,
483 b'5 2 0 command-response continuation cbor:b"response5"',
484 b'5 2 0 command-response eos ',
460 ])
485 ])
461
486
462 def testduplicaterequestonactivecommand(self):
487 def testduplicaterequestonactivecommand(self):
463 """Receiving a request ID that matches a request that isn't finished."""
488 """Receiving a request ID that matches a request that isn't finished."""
464 reactor = makereactor()
489 reactor = makereactor()
465 stream = framing.stream(1)
490 stream = framing.stream(1)
466 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
491 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
467 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
492 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
468
493
469 self.assertaction(results[0], b'error')
494 self.assertaction(results[0], b'error')
470 self.assertEqual(results[0][1], {
495 self.assertEqual(results[0][1], {
471 b'message': b'request with ID 1 is already active',
496 b'message': b'request with ID 1 is already active',
472 })
497 })
473
498
474 def testduplicaterequestonactivecommandnosend(self):
499 def testduplicaterequestonactivecommandnosend(self):
475 """Same as above but we've registered a response but haven't sent it."""
500 """Same as above but we've registered a response but haven't sent it."""
476 reactor = makereactor()
501 reactor = makereactor()
477 instream = framing.stream(1)
502 instream = framing.stream(1)
478 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
503 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
479 outstream = reactor.makeoutputstream()
504 outstream = reactor.makeoutputstream()
480 reactor.oncommandresponseready(outstream, 1, b'response')
505 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
481
506
482 # We've registered the response but haven't sent it. From the
507 # We've registered the response but haven't sent it. From the
483 # perspective of the reactor, the command is still active.
508 # perspective of the reactor, the command is still active.
484
509
485 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
510 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
486 self.assertaction(results[0], b'error')
511 self.assertaction(results[0], b'error')
487 self.assertEqual(results[0][1], {
512 self.assertEqual(results[0][1], {
488 b'message': b'request with ID 1 is already active',
513 b'message': b'request with ID 1 is already active',
489 })
514 })
490
515
491 def testduplicaterequestaftersend(self):
516 def testduplicaterequestaftersend(self):
492 """We can use a duplicate request ID after we've sent the response."""
517 """We can use a duplicate request ID after we've sent the response."""
493 reactor = makereactor()
518 reactor = makereactor()
494 instream = framing.stream(1)
519 instream = framing.stream(1)
495 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
520 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
496 outstream = reactor.makeoutputstream()
521 outstream = reactor.makeoutputstream()
497 res = reactor.oncommandresponseready(outstream, 1, b'response')
522 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
498 list(res[1][b'framegen'])
523 list(res[1][b'framegen'])
499
524
500 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
525 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
501 self.assertaction(results[0], b'runcommand')
526 self.assertaction(results[0], b'runcommand')
502
527
503 def testprotocolsettingsnoflags(self):
528 def testprotocolsettingsnoflags(self):
504 result = self._sendsingleframe(
529 result = self._sendsingleframe(
505 makereactor(),
530 makereactor(),
506 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
531 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
507 self.assertaction(result, b'error')
532 self.assertaction(result, b'error')
508 self.assertEqual(result[1], {
533 self.assertEqual(result[1], {
509 b'message': b'sender protocol settings frame must have '
534 b'message': b'sender protocol settings frame must have '
510 b'continuation or end of stream flag set',
535 b'continuation or end of stream flag set',
511 })
536 })
512
537
513 def testprotocolsettingsconflictflags(self):
538 def testprotocolsettingsconflictflags(self):
514 result = self._sendsingleframe(
539 result = self._sendsingleframe(
515 makereactor(),
540 makereactor(),
516 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
541 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
517 self.assertaction(result, b'error')
542 self.assertaction(result, b'error')
518 self.assertEqual(result[1], {
543 self.assertEqual(result[1], {
519 b'message': b'sender protocol settings frame cannot have both '
544 b'message': b'sender protocol settings frame cannot have both '
520 b'continuation and end of stream flags set',
545 b'continuation and end of stream flags set',
521 })
546 })
522
547
523 def testprotocolsettingsemptypayload(self):
548 def testprotocolsettingsemptypayload(self):
524 result = self._sendsingleframe(
549 result = self._sendsingleframe(
525 makereactor(),
550 makereactor(),
526 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
551 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
527 self.assertaction(result, b'error')
552 self.assertaction(result, b'error')
528 self.assertEqual(result[1], {
553 self.assertEqual(result[1], {
529 b'message': b'sender protocol settings frame did not contain CBOR '
554 b'message': b'sender protocol settings frame did not contain CBOR '
530 b'data',
555 b'data',
531 })
556 })
532
557
533 def testprotocolsettingsmultipleobjects(self):
558 def testprotocolsettingsmultipleobjects(self):
534 result = self._sendsingleframe(
559 result = self._sendsingleframe(
535 makereactor(),
560 makereactor(),
536 ffs(b'0 1 stream-begin sender-protocol-settings eos '
561 ffs(b'0 1 stream-begin sender-protocol-settings eos '
537 b'\x46foobar\x43foo'))
562 b'\x46foobar\x43foo'))
538 self.assertaction(result, b'error')
563 self.assertaction(result, b'error')
539 self.assertEqual(result[1], {
564 self.assertEqual(result[1], {
540 b'message': b'sender protocol settings frame contained multiple '
565 b'message': b'sender protocol settings frame contained multiple '
541 b'CBOR values',
566 b'CBOR values',
542 })
567 })
543
568
544 def testprotocolsettingscontentencodings(self):
569 def testprotocolsettingscontentencodings(self):
545 reactor = makereactor()
570 reactor = makereactor()
546
571
547 result = self._sendsingleframe(
572 result = self._sendsingleframe(
548 reactor,
573 reactor,
549 ffs(b'0 1 stream-begin sender-protocol-settings eos '
574 ffs(b'0 1 stream-begin sender-protocol-settings eos '
550 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
575 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
551 self.assertaction(result, b'wantframe')
576 self.assertaction(result, b'wantframe')
552
577
553 self.assertEqual(reactor._state, b'idle')
578 self.assertEqual(reactor._state, b'idle')
554 self.assertEqual(reactor._sendersettings[b'contentencodings'],
579 self.assertEqual(reactor._sendersettings[b'contentencodings'],
555 [b'a', b'b'])
580 [b'a', b'b'])
556
581
557 def testprotocolsettingsmultipleframes(self):
582 def testprotocolsettingsmultipleframes(self):
558 reactor = makereactor()
583 reactor = makereactor()
559
584
560 data = b''.join(cborutil.streamencode({
585 data = b''.join(cborutil.streamencode({
561 b'contentencodings': [b'value1', b'value2'],
586 b'contentencodings': [b'value1', b'value2'],
562 }))
587 }))
563
588
564 results = list(sendframes(reactor, [
589 results = list(sendframes(reactor, [
565 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
590 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
566 data[0:5]),
591 data[0:5]),
567 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
592 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
568 ]))
593 ]))
569
594
570 self.assertEqual(len(results), 2)
595 self.assertEqual(len(results), 2)
571
596
572 self.assertaction(results[0], b'wantframe')
597 self.assertaction(results[0], b'wantframe')
573 self.assertaction(results[1], b'wantframe')
598 self.assertaction(results[1], b'wantframe')
574
599
575 self.assertEqual(reactor._state, b'idle')
600 self.assertEqual(reactor._state, b'idle')
576 self.assertEqual(reactor._sendersettings[b'contentencodings'],
601 self.assertEqual(reactor._sendersettings[b'contentencodings'],
577 [b'value1', b'value2'])
602 [b'value1', b'value2'])
578
603
579 def testprotocolsettingsbadcbor(self):
604 def testprotocolsettingsbadcbor(self):
580 result = self._sendsingleframe(
605 result = self._sendsingleframe(
581 makereactor(),
606 makereactor(),
582 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
607 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
583 self.assertaction(result, b'error')
608 self.assertaction(result, b'error')
584
609
585 def testprotocolsettingsnoninitial(self):
610 def testprotocolsettingsnoninitial(self):
586 # Cannot have protocol settings frames as non-initial frames.
611 # Cannot have protocol settings frames as non-initial frames.
587 reactor = makereactor()
612 reactor = makereactor()
588
613
589 stream = framing.stream(1)
614 stream = framing.stream(1)
590 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
615 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
591 self.assertEqual(len(results), 1)
616 self.assertEqual(len(results), 1)
592 self.assertaction(results[0], b'runcommand')
617 self.assertaction(results[0], b'runcommand')
593
618
594 result = self._sendsingleframe(
619 result = self._sendsingleframe(
595 reactor,
620 reactor,
596 ffs(b'0 1 0 sender-protocol-settings eos '))
621 ffs(b'0 1 0 sender-protocol-settings eos '))
597 self.assertaction(result, b'error')
622 self.assertaction(result, b'error')
598 self.assertEqual(result[1], {
623 self.assertEqual(result[1], {
599 b'message': b'expected command request frame; got 8',
624 b'message': b'expected command request frame; got 8',
600 })
625 })
601
626
602 if __name__ == '__main__':
627 if __name__ == '__main__':
603 import silenttestrunner
628 import silenttestrunner
604 silenttestrunner.main(__name__)
629 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now