##// END OF EJS Templates
wireprotov2: don't emit empty frames...
Gregory Szorc -
r40171:3a6d6c54 default
parent child Browse files
Show More
@@ -1,1738 +1,1744 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54
54
55 FRAME_TYPES = {
55 FRAME_TYPES = {
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 }
64 }
65
65
66 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70
70
71 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
72 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 }
76 }
77
77
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
80
80
81 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
84 }
84 }
85
85
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88
88
89 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 }
92 }
93
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
100 }
101
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
108 }
109
109
110 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
111 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
117 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 }
120 }
121
121
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123
123
124 def humanflags(mapping, value):
124 def humanflags(mapping, value):
125 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
126 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
127 flags = []
127 flags = []
128 val = 1
128 val = 1
129 while value >= val:
129 while value >= val:
130 if value & val:
130 if value & val:
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 val <<= 1
132 val <<= 1
133
133
134 return b'|'.join(flags)
134 return b'|'.join(flags)
135
135
136 @attr.s(slots=True)
136 @attr.s(slots=True)
137 class frameheader(object):
137 class frameheader(object):
138 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
139
139
140 length = attr.ib()
140 length = attr.ib()
141 requestid = attr.ib()
141 requestid = attr.ib()
142 streamid = attr.ib()
142 streamid = attr.ib()
143 streamflags = attr.ib()
143 streamflags = attr.ib()
144 typeid = attr.ib()
144 typeid = attr.ib()
145 flags = attr.ib()
145 flags = attr.ib()
146
146
147 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
148 class frame(object):
148 class frame(object):
149 """Represents a parsed frame."""
149 """Represents a parsed frame."""
150
150
151 requestid = attr.ib()
151 requestid = attr.ib()
152 streamid = attr.ib()
152 streamid = attr.ib()
153 streamflags = attr.ib()
153 streamflags = attr.ib()
154 typeid = attr.ib()
154 typeid = attr.ib()
155 flags = attr.ib()
155 flags = attr.ib()
156 payload = attr.ib()
156 payload = attr.ib()
157
157
158 @encoding.strmethod
158 @encoding.strmethod
159 def __repr__(self):
159 def __repr__(self):
160 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
161 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
162 if value == self.typeid:
162 if value == self.typeid:
163 typename = name
163 typename = name
164 break
164 break
165
165
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
168 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171
171
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
174 # TODO assert size of payload.
174 # TODO assert size of payload.
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176
176
177 # 24 bits length
177 # 24 bits length
178 # 16 bits request id
178 # 16 bits request id
179 # 8 bits stream id
179 # 8 bits stream id
180 # 8 bits stream flags
180 # 8 bits stream flags
181 # 4 bits type
181 # 4 bits type
182 # 4 bits flags
182 # 4 bits flags
183
183
184 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
185 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
188 frame[8:] = payload
188 frame[8:] = payload
189
189
190 return frame
190 return frame
191
191
192 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
193 """Create a frame from a human readable string
193 """Create a frame from a human readable string
194
194
195 Strings have the form:
195 Strings have the form:
196
196
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198
198
199 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
200 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
201
201
202 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
203
203
204 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
205 named constant.
205 named constant.
206
206
207 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
208
208
209 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
210 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 byte string literal.
212 byte string literal.
213 """
213 """
214 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216
216
217 requestid = int(requestid)
217 requestid = int(requestid)
218 streamid = int(streamid)
218 streamid = int(streamid)
219
219
220 finalstreamflags = 0
220 finalstreamflags = 0
221 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
222 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
223 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
224 else:
224 else:
225 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
226
226
227 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
228 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
229 else:
229 else:
230 frametype = int(frametype)
230 frametype = int(frametype)
231
231
232 finalflags = 0
232 finalflags = 0
233 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
234 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
235 if flag in validflags:
235 if flag in validflags:
236 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
237 else:
237 else:
238 finalflags |= int(flag)
238 finalflags |= int(flag)
239
239
240 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
241 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
242 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
243
243
244 else:
244 else:
245 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
246
246
247 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
248 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
249 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
250
250
251 def parseheader(data):
251 def parseheader(data):
252 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
253
253
254 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
255 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
256 """
256 """
257 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
258 # 16 bits request ID
258 # 16 bits request ID
259 # 8 bits stream ID
259 # 8 bits stream ID
260 # 8 bits stream flags
260 # 8 bits stream flags
261 # 4 bits frame type
261 # 4 bits frame type
262 # 4 bits frame flags
262 # 4 bits frame flags
263 # ... payload
263 # ... payload
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 typeflags = data[7]
266 typeflags = data[7]
267
267
268 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
269 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
270
270
271 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
272 frametype, frameflags)
272 frametype, frameflags)
273
273
274 def readframe(fh):
274 def readframe(fh):
275 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
276
276
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
279 seen.
279 seen.
280 """
280 """
281 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
282
282
283 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
284
284
285 if readcount == 0:
285 if readcount == 0:
286 return None
286 return None
287
287
288 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 (readcount, header))
290 (readcount, header))
291
291
292 h = parseheader(header)
292 h = parseheader(header)
293
293
294 payload = fh.read(h.length)
294 payload = fh.read(h.length)
295 if len(payload) != h.length:
295 if len(payload) != h.length:
296 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 (h.length, len(payload)))
297 (h.length, len(payload)))
298
298
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 payload)
300 payload)
301
301
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 redirect=None):
304 redirect=None):
305 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
306
306
307 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
308 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
309 """
309 """
310 data = {b'name': cmd}
310 data = {b'name': cmd}
311 if args:
311 if args:
312 data[b'args'] = args
312 data[b'args'] = args
313
313
314 if redirect:
314 if redirect:
315 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
316
316
317 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
318
318
319 offset = 0
319 offset = 0
320
320
321 while True:
321 while True:
322 flags = 0
322 flags = 0
323
323
324 # Must set new or continuation flag.
324 # Must set new or continuation flag.
325 if not offset:
325 if not offset:
326 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
327 else:
327 else:
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329
329
330 # Data frames is set on all frames.
330 # Data frames is set on all frames.
331 if datafh:
331 if datafh:
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333
333
334 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
335 offset += len(payload)
335 offset += len(payload)
336
336
337 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339
339
340 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 flags=flags,
342 flags=flags,
343 payload=payload)
343 payload=payload)
344
344
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 break
346 break
347
347
348 if datafh:
348 if datafh:
349 while True:
349 while True:
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351
351
352 done = False
352 done = False
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 else:
355 else:
356 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
357 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
358 done = True
358 done = True
359
359
360 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
361 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
362 flags=flags,
362 flags=flags,
363 payload=data)
363 payload=data)
364
364
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseokframe(stream, requestid):
368 def createcommandresponseokframe(stream, requestid):
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
370
370
371 return stream.makeframe(requestid=requestid,
371 return stream.makeframe(requestid=requestid,
372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
374 payload=overall)
374 payload=overall)
375
375
376 def createcommandresponseeosframe(stream, requestid):
376 def createcommandresponseeosframe(stream, requestid):
377 """Create an empty payload frame representing command end-of-stream."""
377 """Create an empty payload frame representing command end-of-stream."""
378 return stream.makeframe(requestid=requestid,
378 return stream.makeframe(requestid=requestid,
379 typeid=FRAME_TYPE_COMMAND_RESPONSE,
379 typeid=FRAME_TYPE_COMMAND_RESPONSE,
380 flags=FLAG_COMMAND_RESPONSE_EOS,
380 flags=FLAG_COMMAND_RESPONSE_EOS,
381 payload=b'')
381 payload=b'')
382
382
383 def createalternatelocationresponseframe(stream, requestid, location):
383 def createalternatelocationresponseframe(stream, requestid, location):
384 data = {
384 data = {
385 b'status': b'redirect',
385 b'status': b'redirect',
386 b'location': {
386 b'location': {
387 b'url': location.url,
387 b'url': location.url,
388 b'mediatype': location.mediatype,
388 b'mediatype': location.mediatype,
389 }
389 }
390 }
390 }
391
391
392 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
392 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
393 r'servercadercerts'):
393 r'servercadercerts'):
394 value = getattr(location, a)
394 value = getattr(location, a)
395 if value is not None:
395 if value is not None:
396 data[b'location'][pycompat.bytestr(a)] = value
396 data[b'location'][pycompat.bytestr(a)] = value
397
397
398 return stream.makeframe(requestid=requestid,
398 return stream.makeframe(requestid=requestid,
399 typeid=FRAME_TYPE_COMMAND_RESPONSE,
399 typeid=FRAME_TYPE_COMMAND_RESPONSE,
400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
401 payload=b''.join(cborutil.streamencode(data)))
401 payload=b''.join(cborutil.streamencode(data)))
402
402
403 def createcommanderrorresponse(stream, requestid, message, args=None):
403 def createcommanderrorresponse(stream, requestid, message, args=None):
404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
405 # formatting works consistently?
405 # formatting works consistently?
406 m = {
406 m = {
407 b'status': b'error',
407 b'status': b'error',
408 b'error': {
408 b'error': {
409 b'message': message,
409 b'message': message,
410 }
410 }
411 }
411 }
412
412
413 if args:
413 if args:
414 m[b'error'][b'args'] = args
414 m[b'error'][b'args'] = args
415
415
416 overall = b''.join(cborutil.streamencode(m))
416 overall = b''.join(cborutil.streamencode(m))
417
417
418 yield stream.makeframe(requestid=requestid,
418 yield stream.makeframe(requestid=requestid,
419 typeid=FRAME_TYPE_COMMAND_RESPONSE,
419 typeid=FRAME_TYPE_COMMAND_RESPONSE,
420 flags=FLAG_COMMAND_RESPONSE_EOS,
420 flags=FLAG_COMMAND_RESPONSE_EOS,
421 payload=overall)
421 payload=overall)
422
422
423 def createerrorframe(stream, requestid, msg, errtype):
423 def createerrorframe(stream, requestid, msg, errtype):
424 # TODO properly handle frame size limits.
424 # TODO properly handle frame size limits.
425 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
425 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
426
426
427 payload = b''.join(cborutil.streamencode({
427 payload = b''.join(cborutil.streamencode({
428 b'type': errtype,
428 b'type': errtype,
429 b'message': [{b'msg': msg}],
429 b'message': [{b'msg': msg}],
430 }))
430 }))
431
431
432 yield stream.makeframe(requestid=requestid,
432 yield stream.makeframe(requestid=requestid,
433 typeid=FRAME_TYPE_ERROR_RESPONSE,
433 typeid=FRAME_TYPE_ERROR_RESPONSE,
434 flags=0,
434 flags=0,
435 payload=payload)
435 payload=payload)
436
436
437 def createtextoutputframe(stream, requestid, atoms,
437 def createtextoutputframe(stream, requestid, atoms,
438 maxframesize=DEFAULT_MAX_FRAME_SIZE):
438 maxframesize=DEFAULT_MAX_FRAME_SIZE):
439 """Create a text output frame to render text to people.
439 """Create a text output frame to render text to people.
440
440
441 ``atoms`` is a 3-tuple of (formatting string, args, labels).
441 ``atoms`` is a 3-tuple of (formatting string, args, labels).
442
442
443 The formatting string contains ``%s`` tokens to be replaced by the
443 The formatting string contains ``%s`` tokens to be replaced by the
444 corresponding indexed entry in ``args``. ``labels`` is an iterable of
444 corresponding indexed entry in ``args``. ``labels`` is an iterable of
445 formatters to be applied at rendering time. In terms of the ``ui``
445 formatters to be applied at rendering time. In terms of the ``ui``
446 class, each atom corresponds to a ``ui.write()``.
446 class, each atom corresponds to a ``ui.write()``.
447 """
447 """
448 atomdicts = []
448 atomdicts = []
449
449
450 for (formatting, args, labels) in atoms:
450 for (formatting, args, labels) in atoms:
451 # TODO look for localstr, other types here?
451 # TODO look for localstr, other types here?
452
452
453 if not isinstance(formatting, bytes):
453 if not isinstance(formatting, bytes):
454 raise ValueError('must use bytes formatting strings')
454 raise ValueError('must use bytes formatting strings')
455 for arg in args:
455 for arg in args:
456 if not isinstance(arg, bytes):
456 if not isinstance(arg, bytes):
457 raise ValueError('must use bytes for arguments')
457 raise ValueError('must use bytes for arguments')
458 for label in labels:
458 for label in labels:
459 if not isinstance(label, bytes):
459 if not isinstance(label, bytes):
460 raise ValueError('must use bytes for labels')
460 raise ValueError('must use bytes for labels')
461
461
462 # Formatting string must be ASCII.
462 # Formatting string must be ASCII.
463 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
463 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
464
464
465 # Arguments must be UTF-8.
465 # Arguments must be UTF-8.
466 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
466 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
467
467
468 # Labels must be ASCII.
468 # Labels must be ASCII.
469 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
469 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
470 for l in labels]
470 for l in labels]
471
471
472 atom = {b'msg': formatting}
472 atom = {b'msg': formatting}
473 if args:
473 if args:
474 atom[b'args'] = args
474 atom[b'args'] = args
475 if labels:
475 if labels:
476 atom[b'labels'] = labels
476 atom[b'labels'] = labels
477
477
478 atomdicts.append(atom)
478 atomdicts.append(atom)
479
479
480 payload = b''.join(cborutil.streamencode(atomdicts))
480 payload = b''.join(cborutil.streamencode(atomdicts))
481
481
482 if len(payload) > maxframesize:
482 if len(payload) > maxframesize:
483 raise ValueError('cannot encode data in a single frame')
483 raise ValueError('cannot encode data in a single frame')
484
484
485 yield stream.makeframe(requestid=requestid,
485 yield stream.makeframe(requestid=requestid,
486 typeid=FRAME_TYPE_TEXT_OUTPUT,
486 typeid=FRAME_TYPE_TEXT_OUTPUT,
487 flags=0,
487 flags=0,
488 payload=payload)
488 payload=payload)
489
489
490 class bufferingcommandresponseemitter(object):
490 class bufferingcommandresponseemitter(object):
491 """Helper object to emit command response frames intelligently.
491 """Helper object to emit command response frames intelligently.
492
492
493 Raw command response data is likely emitted in chunks much smaller
493 Raw command response data is likely emitted in chunks much smaller
494 than what can fit in a single frame. This class exists to buffer
494 than what can fit in a single frame. This class exists to buffer
495 chunks until enough data is available to fit in a single frame.
495 chunks until enough data is available to fit in a single frame.
496
496
497 TODO we'll need something like this when compression is supported.
497 TODO we'll need something like this when compression is supported.
498 So it might make sense to implement this functionality at the stream
498 So it might make sense to implement this functionality at the stream
499 level.
499 level.
500 """
500 """
501 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
501 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
502 self._stream = stream
502 self._stream = stream
503 self._requestid = requestid
503 self._requestid = requestid
504 self._maxsize = maxframesize
504 self._maxsize = maxframesize
505 self._chunks = []
505 self._chunks = []
506 self._chunkssize = 0
506 self._chunkssize = 0
507
507
508 def send(self, data):
508 def send(self, data):
509 """Send new data for emission.
509 """Send new data for emission.
510
510
511 Is a generator of new frames that were derived from the new input.
511 Is a generator of new frames that were derived from the new input.
512
512
513 If the special input ``None`` is received, flushes all buffered
513 If the special input ``None`` is received, flushes all buffered
514 data to frames.
514 data to frames.
515 """
515 """
516
516
517 if data is None:
517 if data is None:
518 for frame in self._flush():
518 for frame in self._flush():
519 yield frame
519 yield frame
520 return
520 return
521
521
522 # There is a ton of potential to do more complicated things here.
522 # There is a ton of potential to do more complicated things here.
523 # Our immediate goal is to coalesce small chunks into big frames,
523 # Our immediate goal is to coalesce small chunks into big frames,
524 # not achieve the fewest number of frames possible. So we go with
524 # not achieve the fewest number of frames possible. So we go with
525 # a simple implementation:
525 # a simple implementation:
526 #
526 #
527 # * If a chunk is too large for a frame, we flush and emit frames
527 # * If a chunk is too large for a frame, we flush and emit frames
528 # for the new chunk.
528 # for the new chunk.
529 # * If a chunk can be buffered without total buffered size limits
529 # * If a chunk can be buffered without total buffered size limits
530 # being exceeded, we do that.
530 # being exceeded, we do that.
531 # * If a chunk causes us to go over our buffering limit, we flush
531 # * If a chunk causes us to go over our buffering limit, we flush
532 # and then buffer the new chunk.
532 # and then buffer the new chunk.
533
533
534 if not data:
535 return
536
534 if len(data) > self._maxsize:
537 if len(data) > self._maxsize:
535 for frame in self._flush():
538 for frame in self._flush():
536 yield frame
539 yield frame
537
540
538 # Now emit frames for the big chunk.
541 # Now emit frames for the big chunk.
539 offset = 0
542 offset = 0
540 while True:
543 while True:
541 chunk = data[offset:offset + self._maxsize]
544 chunk = data[offset:offset + self._maxsize]
542 offset += len(chunk)
545 offset += len(chunk)
543
546
544 yield self._stream.makeframe(
547 yield self._stream.makeframe(
545 self._requestid,
548 self._requestid,
546 typeid=FRAME_TYPE_COMMAND_RESPONSE,
549 typeid=FRAME_TYPE_COMMAND_RESPONSE,
547 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
550 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
548 payload=chunk)
551 payload=chunk)
549
552
550 if offset == len(data):
553 if offset == len(data):
551 return
554 return
552
555
553 # If we don't have enough to constitute a full frame, buffer and
556 # If we don't have enough to constitute a full frame, buffer and
554 # return.
557 # return.
555 if len(data) + self._chunkssize < self._maxsize:
558 if len(data) + self._chunkssize < self._maxsize:
556 self._chunks.append(data)
559 self._chunks.append(data)
557 self._chunkssize += len(data)
560 self._chunkssize += len(data)
558 return
561 return
559
562
560 # Else flush what we have and buffer the new chunk. We could do
563 # Else flush what we have and buffer the new chunk. We could do
561 # something more intelligent here, like break the chunk. Let's
564 # something more intelligent here, like break the chunk. Let's
562 # keep things simple for now.
565 # keep things simple for now.
563 for frame in self._flush():
566 for frame in self._flush():
564 yield frame
567 yield frame
565
568
566 self._chunks.append(data)
569 self._chunks.append(data)
567 self._chunkssize = len(data)
570 self._chunkssize = len(data)
568
571
569 def _flush(self):
572 def _flush(self):
570 payload = b''.join(self._chunks)
573 payload = b''.join(self._chunks)
571 assert len(payload) <= self._maxsize
574 assert len(payload) <= self._maxsize
572
575
573 self._chunks[:] = []
576 self._chunks[:] = []
574 self._chunkssize = 0
577 self._chunkssize = 0
575
578
579 if not payload:
580 return
581
576 yield self._stream.makeframe(
582 yield self._stream.makeframe(
577 self._requestid,
583 self._requestid,
578 typeid=FRAME_TYPE_COMMAND_RESPONSE,
584 typeid=FRAME_TYPE_COMMAND_RESPONSE,
579 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
585 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
580 payload=payload)
586 payload=payload)
581
587
582 # TODO consider defining encoders/decoders using the util.compressionengine
588 # TODO consider defining encoders/decoders using the util.compressionengine
583 # mechanism.
589 # mechanism.
584
590
585 class identityencoder(object):
591 class identityencoder(object):
586 """Encoder for the "identity" stream encoding profile."""
592 """Encoder for the "identity" stream encoding profile."""
587 def __init__(self, ui):
593 def __init__(self, ui):
588 pass
594 pass
589
595
590 def encode(self, data):
596 def encode(self, data):
591 return data
597 return data
592
598
593 def flush(self):
599 def flush(self):
594 return b''
600 return b''
595
601
596 def finish(self):
602 def finish(self):
597 return b''
603 return b''
598
604
599 class identitydecoder(object):
605 class identitydecoder(object):
600 """Decoder for the "identity" stream encoding profile."""
606 """Decoder for the "identity" stream encoding profile."""
601
607
602 def __init__(self, ui, extraobjs):
608 def __init__(self, ui, extraobjs):
603 if extraobjs:
609 if extraobjs:
604 raise error.Abort(_('identity decoder received unexpected '
610 raise error.Abort(_('identity decoder received unexpected '
605 'additional values'))
611 'additional values'))
606
612
607 def decode(self, data):
613 def decode(self, data):
608 return data
614 return data
609
615
610 class zlibencoder(object):
616 class zlibencoder(object):
611 def __init__(self, ui):
617 def __init__(self, ui):
612 import zlib
618 import zlib
613 self._zlib = zlib
619 self._zlib = zlib
614 self._compressor = zlib.compressobj()
620 self._compressor = zlib.compressobj()
615
621
616 def encode(self, data):
622 def encode(self, data):
617 return self._compressor.compress(data)
623 return self._compressor.compress(data)
618
624
619 def flush(self):
625 def flush(self):
620 # Z_SYNC_FLUSH doesn't reset compression context, which is
626 # Z_SYNC_FLUSH doesn't reset compression context, which is
621 # what we want.
627 # what we want.
622 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
628 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
623
629
624 def finish(self):
630 def finish(self):
625 res = self._compressor.flush(self._zlib.Z_FINISH)
631 res = self._compressor.flush(self._zlib.Z_FINISH)
626 self._compressor = None
632 self._compressor = None
627 return res
633 return res
628
634
629 class zlibdecoder(object):
635 class zlibdecoder(object):
630 def __init__(self, ui, extraobjs):
636 def __init__(self, ui, extraobjs):
631 import zlib
637 import zlib
632
638
633 if extraobjs:
639 if extraobjs:
634 raise error.Abort(_('zlib decoder received unexpected '
640 raise error.Abort(_('zlib decoder received unexpected '
635 'additional values'))
641 'additional values'))
636
642
637 self._decompressor = zlib.decompressobj()
643 self._decompressor = zlib.decompressobj()
638
644
639 def decode(self, data):
645 def decode(self, data):
640 # Python 2's zlib module doesn't use the buffer protocol and can't
646 # Python 2's zlib module doesn't use the buffer protocol and can't
641 # handle all bytes-like types.
647 # handle all bytes-like types.
642 if not pycompat.ispy3 and isinstance(data, bytearray):
648 if not pycompat.ispy3 and isinstance(data, bytearray):
643 data = bytes(data)
649 data = bytes(data)
644
650
645 return self._decompressor.decompress(data)
651 return self._decompressor.decompress(data)
646
652
647 class zstdbaseencoder(object):
653 class zstdbaseencoder(object):
648 def __init__(self, level):
654 def __init__(self, level):
649 from . import zstd
655 from . import zstd
650
656
651 self._zstd = zstd
657 self._zstd = zstd
652 cctx = zstd.ZstdCompressor(level=level)
658 cctx = zstd.ZstdCompressor(level=level)
653 self._compressor = cctx.compressobj()
659 self._compressor = cctx.compressobj()
654
660
655 def encode(self, data):
661 def encode(self, data):
656 return self._compressor.compress(data)
662 return self._compressor.compress(data)
657
663
658 def flush(self):
664 def flush(self):
659 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
665 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
660 # compressor and allows a decompressor to access all encoded data
666 # compressor and allows a decompressor to access all encoded data
661 # up to this point.
667 # up to this point.
662 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
668 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
663
669
664 def finish(self):
670 def finish(self):
665 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
671 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
666 self._compressor = None
672 self._compressor = None
667 return res
673 return res
668
674
669 class zstd8mbencoder(zstdbaseencoder):
675 class zstd8mbencoder(zstdbaseencoder):
670 def __init__(self, ui):
676 def __init__(self, ui):
671 super(zstd8mbencoder, self).__init__(3)
677 super(zstd8mbencoder, self).__init__(3)
672
678
673 class zstdbasedecoder(object):
679 class zstdbasedecoder(object):
674 def __init__(self, maxwindowsize):
680 def __init__(self, maxwindowsize):
675 from . import zstd
681 from . import zstd
676 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
682 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
677 self._decompressor = dctx.decompressobj()
683 self._decompressor = dctx.decompressobj()
678
684
679 def decode(self, data):
685 def decode(self, data):
680 return self._decompressor.decompress(data)
686 return self._decompressor.decompress(data)
681
687
682 class zstd8mbdecoder(zstdbasedecoder):
688 class zstd8mbdecoder(zstdbasedecoder):
683 def __init__(self, ui, extraobjs):
689 def __init__(self, ui, extraobjs):
684 if extraobjs:
690 if extraobjs:
685 raise error.Abort(_('zstd8mb decoder received unexpected '
691 raise error.Abort(_('zstd8mb decoder received unexpected '
686 'additional values'))
692 'additional values'))
687
693
688 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
694 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
689
695
690 # We lazily populate this to avoid excessive module imports when importing
696 # We lazily populate this to avoid excessive module imports when importing
691 # this module.
697 # this module.
692 STREAM_ENCODERS = {}
698 STREAM_ENCODERS = {}
693 STREAM_ENCODERS_ORDER = []
699 STREAM_ENCODERS_ORDER = []
694
700
695 def populatestreamencoders():
701 def populatestreamencoders():
696 if STREAM_ENCODERS:
702 if STREAM_ENCODERS:
697 return
703 return
698
704
699 try:
705 try:
700 from . import zstd
706 from . import zstd
701 zstd.__version__
707 zstd.__version__
702 except ImportError:
708 except ImportError:
703 zstd = None
709 zstd = None
704
710
705 # zstandard is fastest and is preferred.
711 # zstandard is fastest and is preferred.
706 if zstd:
712 if zstd:
707 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
713 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
708 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
714 STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
709
715
710 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
716 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
711 STREAM_ENCODERS_ORDER.append(b'zlib')
717 STREAM_ENCODERS_ORDER.append(b'zlib')
712
718
713 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
719 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
714 STREAM_ENCODERS_ORDER.append(b'identity')
720 STREAM_ENCODERS_ORDER.append(b'identity')
715
721
716 class stream(object):
722 class stream(object):
717 """Represents a logical unidirectional series of frames."""
723 """Represents a logical unidirectional series of frames."""
718
724
719 def __init__(self, streamid, active=False):
725 def __init__(self, streamid, active=False):
720 self.streamid = streamid
726 self.streamid = streamid
721 self._active = active
727 self._active = active
722
728
723 def makeframe(self, requestid, typeid, flags, payload):
729 def makeframe(self, requestid, typeid, flags, payload):
724 """Create a frame to be sent out over this stream.
730 """Create a frame to be sent out over this stream.
725
731
726 Only returns the frame instance. Does not actually send it.
732 Only returns the frame instance. Does not actually send it.
727 """
733 """
728 streamflags = 0
734 streamflags = 0
729 if not self._active:
735 if not self._active:
730 streamflags |= STREAM_FLAG_BEGIN_STREAM
736 streamflags |= STREAM_FLAG_BEGIN_STREAM
731 self._active = True
737 self._active = True
732
738
733 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
739 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
734 payload)
740 payload)
735
741
736 class inputstream(stream):
742 class inputstream(stream):
737 """Represents a stream used for receiving data."""
743 """Represents a stream used for receiving data."""
738
744
739 def __init__(self, streamid, active=False):
745 def __init__(self, streamid, active=False):
740 super(inputstream, self).__init__(streamid, active=active)
746 super(inputstream, self).__init__(streamid, active=active)
741 self._decoder = None
747 self._decoder = None
742
748
743 def setdecoder(self, ui, name, extraobjs):
749 def setdecoder(self, ui, name, extraobjs):
744 """Set the decoder for this stream.
750 """Set the decoder for this stream.
745
751
746 Receives the stream profile name and any additional CBOR objects
752 Receives the stream profile name and any additional CBOR objects
747 decoded from the stream encoding settings frame payloads.
753 decoded from the stream encoding settings frame payloads.
748 """
754 """
749 if name not in STREAM_ENCODERS:
755 if name not in STREAM_ENCODERS:
750 raise error.Abort(_('unknown stream decoder: %s') % name)
756 raise error.Abort(_('unknown stream decoder: %s') % name)
751
757
752 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
758 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
753
759
754 def decode(self, data):
760 def decode(self, data):
755 # Default is identity decoder. We don't bother instantiating one
761 # Default is identity decoder. We don't bother instantiating one
756 # because it is trivial.
762 # because it is trivial.
757 if not self._decoder:
763 if not self._decoder:
758 return data
764 return data
759
765
760 return self._decoder.decode(data)
766 return self._decoder.decode(data)
761
767
762 def flush(self):
768 def flush(self):
763 if not self._decoder:
769 if not self._decoder:
764 return b''
770 return b''
765
771
766 return self._decoder.flush()
772 return self._decoder.flush()
767
773
768 class outputstream(stream):
774 class outputstream(stream):
769 """Represents a stream used for sending data."""
775 """Represents a stream used for sending data."""
770
776
771 def __init__(self, streamid, active=False):
777 def __init__(self, streamid, active=False):
772 super(outputstream, self).__init__(streamid, active=active)
778 super(outputstream, self).__init__(streamid, active=active)
773 self._encoder = None
779 self._encoder = None
774
780
775 def setencoder(self, ui, name):
781 def setencoder(self, ui, name):
776 """Set the encoder for this stream.
782 """Set the encoder for this stream.
777
783
778 Receives the stream profile name.
784 Receives the stream profile name.
779 """
785 """
780 if name not in STREAM_ENCODERS:
786 if name not in STREAM_ENCODERS:
781 raise error.Abort(_('unknown stream encoder: %s') % name)
787 raise error.Abort(_('unknown stream encoder: %s') % name)
782
788
783 self._encoder = STREAM_ENCODERS[name][0](ui)
789 self._encoder = STREAM_ENCODERS[name][0](ui)
784
790
785 def encode(self, data):
791 def encode(self, data):
786 if not self._encoder:
792 if not self._encoder:
787 return data
793 return data
788
794
789 return self._encoder.encode(data)
795 return self._encoder.encode(data)
790
796
791 def flush(self):
797 def flush(self):
792 if not self._encoder:
798 if not self._encoder:
793 return b''
799 return b''
794
800
795 return self._encoder.flush()
801 return self._encoder.flush()
796
802
797 def finish(self):
803 def finish(self):
798 if not self._encoder:
804 if not self._encoder:
799 return b''
805 return b''
800
806
801 self._encoder.finish()
807 self._encoder.finish()
802
808
803 def ensureserverstream(stream):
809 def ensureserverstream(stream):
804 if stream.streamid % 2:
810 if stream.streamid % 2:
805 raise error.ProgrammingError('server should only write to even '
811 raise error.ProgrammingError('server should only write to even '
806 'numbered streams; %d is not even' %
812 'numbered streams; %d is not even' %
807 stream.streamid)
813 stream.streamid)
808
814
809 DEFAULT_PROTOCOL_SETTINGS = {
815 DEFAULT_PROTOCOL_SETTINGS = {
810 'contentencodings': [b'identity'],
816 'contentencodings': [b'identity'],
811 }
817 }
812
818
813 class serverreactor(object):
819 class serverreactor(object):
814 """Holds state of a server handling frame-based protocol requests.
820 """Holds state of a server handling frame-based protocol requests.
815
821
816 This class is the "brain" of the unified frame-based protocol server
822 This class is the "brain" of the unified frame-based protocol server
817 component. While the protocol is stateless from the perspective of
823 component. While the protocol is stateless from the perspective of
818 requests/commands, something needs to track which frames have been
824 requests/commands, something needs to track which frames have been
819 received, what frames to expect, etc. This class is that thing.
825 received, what frames to expect, etc. This class is that thing.
820
826
821 Instances are modeled as a state machine of sorts. Instances are also
827 Instances are modeled as a state machine of sorts. Instances are also
822 reactionary to external events. The point of this class is to encapsulate
828 reactionary to external events. The point of this class is to encapsulate
823 the state of the connection and the exchange of frames, not to perform
829 the state of the connection and the exchange of frames, not to perform
824 work. Instead, callers tell this class when something occurs, like a
830 work. Instead, callers tell this class when something occurs, like a
825 frame arriving. If that activity is worthy of a follow-up action (say
831 frame arriving. If that activity is worthy of a follow-up action (say
826 *run a command*), the return value of that handler will say so.
832 *run a command*), the return value of that handler will say so.
827
833
828 I/O and CPU intensive operations are purposefully delegated outside of
834 I/O and CPU intensive operations are purposefully delegated outside of
829 this class.
835 this class.
830
836
831 Consumers are expected to tell instances when events occur. They do so by
837 Consumers are expected to tell instances when events occur. They do so by
832 calling the various ``on*`` methods. These methods return a 2-tuple
838 calling the various ``on*`` methods. These methods return a 2-tuple
833 describing any follow-up action(s) to take. The first element is the
839 describing any follow-up action(s) to take. The first element is the
834 name of an action to perform. The second is a data structure (usually
840 name of an action to perform. The second is a data structure (usually
835 a dict) specific to that action that contains more information. e.g.
841 a dict) specific to that action that contains more information. e.g.
836 if the server wants to send frames back to the client, the data structure
842 if the server wants to send frames back to the client, the data structure
837 will contain a reference to those frames.
843 will contain a reference to those frames.
838
844
839 Valid actions that consumers can be instructed to take are:
845 Valid actions that consumers can be instructed to take are:
840
846
841 sendframes
847 sendframes
842 Indicates that frames should be sent to the client. The ``framegen``
848 Indicates that frames should be sent to the client. The ``framegen``
843 key contains a generator of frames that should be sent. The server
849 key contains a generator of frames that should be sent. The server
844 assumes that all frames are sent to the client.
850 assumes that all frames are sent to the client.
845
851
846 error
852 error
847 Indicates that an error occurred. Consumer should probably abort.
853 Indicates that an error occurred. Consumer should probably abort.
848
854
849 runcommand
855 runcommand
850 Indicates that the consumer should run a wire protocol command. Details
856 Indicates that the consumer should run a wire protocol command. Details
851 of the command to run are given in the data structure.
857 of the command to run are given in the data structure.
852
858
853 wantframe
859 wantframe
854 Indicates that nothing of interest happened and the server is waiting on
860 Indicates that nothing of interest happened and the server is waiting on
855 more frames from the client before anything interesting can be done.
861 more frames from the client before anything interesting can be done.
856
862
857 noop
863 noop
858 Indicates no additional action is required.
864 Indicates no additional action is required.
859
865
860 Known Issues
866 Known Issues
861 ------------
867 ------------
862
868
863 There are no limits to the number of partially received commands or their
869 There are no limits to the number of partially received commands or their
864 size. A malicious client could stream command request data and exhaust the
870 size. A malicious client could stream command request data and exhaust the
865 server's memory.
871 server's memory.
866
872
867 Partially received commands are not acted upon when end of input is
873 Partially received commands are not acted upon when end of input is
868 reached. Should the server error if it receives a partial request?
874 reached. Should the server error if it receives a partial request?
869 Should the client send a message to abort a partially transmitted request
875 Should the client send a message to abort a partially transmitted request
870 to facilitate graceful shutdown?
876 to facilitate graceful shutdown?
871
877
872 Active requests that haven't been responded to aren't tracked. This means
878 Active requests that haven't been responded to aren't tracked. This means
873 that if we receive a command and instruct its dispatch, another command
879 that if we receive a command and instruct its dispatch, another command
874 with its request ID can come in over the wire and there will be a race
880 with its request ID can come in over the wire and there will be a race
875 between who responds to what.
881 between who responds to what.
876 """
882 """
877
883
878 def __init__(self, ui, deferoutput=False):
884 def __init__(self, ui, deferoutput=False):
879 """Construct a new server reactor.
885 """Construct a new server reactor.
880
886
881 ``deferoutput`` can be used to indicate that no output frames should be
887 ``deferoutput`` can be used to indicate that no output frames should be
882 instructed to be sent until input has been exhausted. In this mode,
888 instructed to be sent until input has been exhausted. In this mode,
883 events that would normally generate output frames (such as a command
889 events that would normally generate output frames (such as a command
884 response being ready) will instead defer instructing the consumer to
890 response being ready) will instead defer instructing the consumer to
885 send those frames. This is useful for half-duplex transports where the
891 send those frames. This is useful for half-duplex transports where the
886 sender cannot receive until all data has been transmitted.
892 sender cannot receive until all data has been transmitted.
887 """
893 """
888 self._ui = ui
894 self._ui = ui
889 self._deferoutput = deferoutput
895 self._deferoutput = deferoutput
890 self._state = 'initial'
896 self._state = 'initial'
891 self._nextoutgoingstreamid = 2
897 self._nextoutgoingstreamid = 2
892 self._bufferedframegens = []
898 self._bufferedframegens = []
893 # stream id -> stream instance for all active streams from the client.
899 # stream id -> stream instance for all active streams from the client.
894 self._incomingstreams = {}
900 self._incomingstreams = {}
895 self._outgoingstreams = {}
901 self._outgoingstreams = {}
896 # request id -> dict of commands that are actively being received.
902 # request id -> dict of commands that are actively being received.
897 self._receivingcommands = {}
903 self._receivingcommands = {}
898 # Request IDs that have been received and are actively being processed.
904 # Request IDs that have been received and are actively being processed.
899 # Once all output for a request has been sent, it is removed from this
905 # Once all output for a request has been sent, it is removed from this
900 # set.
906 # set.
901 self._activecommands = set()
907 self._activecommands = set()
902
908
903 self._protocolsettingsdecoder = None
909 self._protocolsettingsdecoder = None
904
910
905 # Sender protocol settings are optional. Set implied default values.
911 # Sender protocol settings are optional. Set implied default values.
906 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
912 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
907
913
908 populatestreamencoders()
914 populatestreamencoders()
909
915
910 def onframerecv(self, frame):
916 def onframerecv(self, frame):
911 """Process a frame that has been received off the wire.
917 """Process a frame that has been received off the wire.
912
918
913 Returns a dict with an ``action`` key that details what action,
919 Returns a dict with an ``action`` key that details what action,
914 if any, the consumer should take next.
920 if any, the consumer should take next.
915 """
921 """
916 if not frame.streamid % 2:
922 if not frame.streamid % 2:
917 self._state = 'errored'
923 self._state = 'errored'
918 return self._makeerrorresult(
924 return self._makeerrorresult(
919 _('received frame with even numbered stream ID: %d') %
925 _('received frame with even numbered stream ID: %d') %
920 frame.streamid)
926 frame.streamid)
921
927
922 if frame.streamid not in self._incomingstreams:
928 if frame.streamid not in self._incomingstreams:
923 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
929 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
924 self._state = 'errored'
930 self._state = 'errored'
925 return self._makeerrorresult(
931 return self._makeerrorresult(
926 _('received frame on unknown inactive stream without '
932 _('received frame on unknown inactive stream without '
927 'beginning of stream flag set'))
933 'beginning of stream flag set'))
928
934
929 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
935 self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
930
936
931 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
937 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
932 # TODO handle decoding frames
938 # TODO handle decoding frames
933 self._state = 'errored'
939 self._state = 'errored'
934 raise error.ProgrammingError('support for decoding stream payloads '
940 raise error.ProgrammingError('support for decoding stream payloads '
935 'not yet implemented')
941 'not yet implemented')
936
942
937 if frame.streamflags & STREAM_FLAG_END_STREAM:
943 if frame.streamflags & STREAM_FLAG_END_STREAM:
938 del self._incomingstreams[frame.streamid]
944 del self._incomingstreams[frame.streamid]
939
945
940 handlers = {
946 handlers = {
941 'initial': self._onframeinitial,
947 'initial': self._onframeinitial,
942 'protocol-settings-receiving': self._onframeprotocolsettings,
948 'protocol-settings-receiving': self._onframeprotocolsettings,
943 'idle': self._onframeidle,
949 'idle': self._onframeidle,
944 'command-receiving': self._onframecommandreceiving,
950 'command-receiving': self._onframecommandreceiving,
945 'errored': self._onframeerrored,
951 'errored': self._onframeerrored,
946 }
952 }
947
953
948 meth = handlers.get(self._state)
954 meth = handlers.get(self._state)
949 if not meth:
955 if not meth:
950 raise error.ProgrammingError('unhandled state: %s' % self._state)
956 raise error.ProgrammingError('unhandled state: %s' % self._state)
951
957
952 return meth(frame)
958 return meth(frame)
953
959
954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
960 def oncommandresponsereadyobjects(self, stream, requestid, objs):
955 """Signal that objects are ready to be sent to the client.
961 """Signal that objects are ready to be sent to the client.
956
962
957 ``objs`` is an iterable of objects (typically a generator) that will
963 ``objs`` is an iterable of objects (typically a generator) that will
958 be encoded via CBOR and added to frames, which will be sent to the
964 be encoded via CBOR and added to frames, which will be sent to the
959 client.
965 client.
960 """
966 """
961 ensureserverstream(stream)
967 ensureserverstream(stream)
962
968
963 # A more robust solution would be to check for objs.{next,__next__}.
969 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
970 if isinstance(objs, list):
965 objs = iter(objs)
971 objs = iter(objs)
966
972
967 # We need to take care over exception handling. Uncaught exceptions
973 # We need to take care over exception handling. Uncaught exceptions
968 # when generating frames could lead to premature end of the frame
974 # when generating frames could lead to premature end of the frame
969 # stream and the possibility of the server or client process getting
975 # stream and the possibility of the server or client process getting
970 # in a bad state.
976 # in a bad state.
971 #
977 #
972 # Keep in mind that if ``objs`` is a generator, advancing it could
978 # Keep in mind that if ``objs`` is a generator, advancing it could
973 # raise exceptions that originated in e.g. wire protocol command
979 # raise exceptions that originated in e.g. wire protocol command
974 # functions. That is why we differentiate between exceptions raised
980 # functions. That is why we differentiate between exceptions raised
975 # when iterating versus other exceptions that occur.
981 # when iterating versus other exceptions that occur.
976 #
982 #
977 # In all cases, when the function finishes, the request is fully
983 # In all cases, when the function finishes, the request is fully
978 # handled and no new frames for it should be seen.
984 # handled and no new frames for it should be seen.
979
985
980 def sendframes():
986 def sendframes():
981 emitted = False
987 emitted = False
982 alternatelocationsent = False
988 alternatelocationsent = False
983 emitter = bufferingcommandresponseemitter(stream, requestid)
989 emitter = bufferingcommandresponseemitter(stream, requestid)
984 while True:
990 while True:
985 try:
991 try:
986 o = next(objs)
992 o = next(objs)
987 except StopIteration:
993 except StopIteration:
988 for frame in emitter.send(None):
994 for frame in emitter.send(None):
989 yield frame
995 yield frame
990
996
991 if emitted:
997 if emitted:
992 yield createcommandresponseeosframe(stream, requestid)
998 yield createcommandresponseeosframe(stream, requestid)
993 break
999 break
994
1000
995 except error.WireprotoCommandError as e:
1001 except error.WireprotoCommandError as e:
996 for frame in createcommanderrorresponse(
1002 for frame in createcommanderrorresponse(
997 stream, requestid, e.message, e.messageargs):
1003 stream, requestid, e.message, e.messageargs):
998 yield frame
1004 yield frame
999 break
1005 break
1000
1006
1001 except Exception as e:
1007 except Exception as e:
1002 for frame in createerrorframe(
1008 for frame in createerrorframe(
1003 stream, requestid, '%s' % stringutil.forcebytestr(e),
1009 stream, requestid, '%s' % stringutil.forcebytestr(e),
1004 errtype='server'):
1010 errtype='server'):
1005
1011
1006 yield frame
1012 yield frame
1007
1013
1008 break
1014 break
1009
1015
1010 try:
1016 try:
1011 # Alternate location responses can only be the first and
1017 # Alternate location responses can only be the first and
1012 # only object in the output stream.
1018 # only object in the output stream.
1013 if isinstance(o, wireprototypes.alternatelocationresponse):
1019 if isinstance(o, wireprototypes.alternatelocationresponse):
1014 if emitted:
1020 if emitted:
1015 raise error.ProgrammingError(
1021 raise error.ProgrammingError(
1016 'alternatelocationresponse seen after initial '
1022 'alternatelocationresponse seen after initial '
1017 'output object')
1023 'output object')
1018
1024
1019 yield createalternatelocationresponseframe(
1025 yield createalternatelocationresponseframe(
1020 stream, requestid, o)
1026 stream, requestid, o)
1021
1027
1022 alternatelocationsent = True
1028 alternatelocationsent = True
1023 emitted = True
1029 emitted = True
1024 continue
1030 continue
1025
1031
1026 if alternatelocationsent:
1032 if alternatelocationsent:
1027 raise error.ProgrammingError(
1033 raise error.ProgrammingError(
1028 'object follows alternatelocationresponse')
1034 'object follows alternatelocationresponse')
1029
1035
1030 if not emitted:
1036 if not emitted:
1031 yield createcommandresponseokframe(stream, requestid)
1037 yield createcommandresponseokframe(stream, requestid)
1032 emitted = True
1038 emitted = True
1033
1039
1034 # Objects emitted by command functions can be serializable
1040 # Objects emitted by command functions can be serializable
1035 # data structures or special types.
1041 # data structures or special types.
1036 # TODO consider extracting the content normalization to a
1042 # TODO consider extracting the content normalization to a
1037 # standalone function, as it may be useful for e.g. cachers.
1043 # standalone function, as it may be useful for e.g. cachers.
1038
1044
1039 # A pre-encoded object is sent directly to the emitter.
1045 # A pre-encoded object is sent directly to the emitter.
1040 if isinstance(o, wireprototypes.encodedresponse):
1046 if isinstance(o, wireprototypes.encodedresponse):
1041 for frame in emitter.send(o.data):
1047 for frame in emitter.send(o.data):
1042 yield frame
1048 yield frame
1043
1049
1044 # A regular object is CBOR encoded.
1050 # A regular object is CBOR encoded.
1045 else:
1051 else:
1046 for chunk in cborutil.streamencode(o):
1052 for chunk in cborutil.streamencode(o):
1047 for frame in emitter.send(chunk):
1053 for frame in emitter.send(chunk):
1048 yield frame
1054 yield frame
1049
1055
1050 except Exception as e:
1056 except Exception as e:
1051 for frame in createerrorframe(stream, requestid,
1057 for frame in createerrorframe(stream, requestid,
1052 '%s' % e,
1058 '%s' % e,
1053 errtype='server'):
1059 errtype='server'):
1054 yield frame
1060 yield frame
1055
1061
1056 break
1062 break
1057
1063
1058 self._activecommands.remove(requestid)
1064 self._activecommands.remove(requestid)
1059
1065
1060 return self._handlesendframes(sendframes())
1066 return self._handlesendframes(sendframes())
1061
1067
1062 def oninputeof(self):
1068 def oninputeof(self):
1063 """Signals that end of input has been received.
1069 """Signals that end of input has been received.
1064
1070
1065 No more frames will be received. All pending activity should be
1071 No more frames will be received. All pending activity should be
1066 completed.
1072 completed.
1067 """
1073 """
1068 # TODO should we do anything about in-flight commands?
1074 # TODO should we do anything about in-flight commands?
1069
1075
1070 if not self._deferoutput or not self._bufferedframegens:
1076 if not self._deferoutput or not self._bufferedframegens:
1071 return 'noop', {}
1077 return 'noop', {}
1072
1078
1073 # If we buffered all our responses, emit those.
1079 # If we buffered all our responses, emit those.
1074 def makegen():
1080 def makegen():
1075 for gen in self._bufferedframegens:
1081 for gen in self._bufferedframegens:
1076 for frame in gen:
1082 for frame in gen:
1077 yield frame
1083 yield frame
1078
1084
1079 return 'sendframes', {
1085 return 'sendframes', {
1080 'framegen': makegen(),
1086 'framegen': makegen(),
1081 }
1087 }
1082
1088
1083 def _handlesendframes(self, framegen):
1089 def _handlesendframes(self, framegen):
1084 if self._deferoutput:
1090 if self._deferoutput:
1085 self._bufferedframegens.append(framegen)
1091 self._bufferedframegens.append(framegen)
1086 return 'noop', {}
1092 return 'noop', {}
1087 else:
1093 else:
1088 return 'sendframes', {
1094 return 'sendframes', {
1089 'framegen': framegen,
1095 'framegen': framegen,
1090 }
1096 }
1091
1097
1092 def onservererror(self, stream, requestid, msg):
1098 def onservererror(self, stream, requestid, msg):
1093 ensureserverstream(stream)
1099 ensureserverstream(stream)
1094
1100
1095 def sendframes():
1101 def sendframes():
1096 for frame in createerrorframe(stream, requestid, msg,
1102 for frame in createerrorframe(stream, requestid, msg,
1097 errtype='server'):
1103 errtype='server'):
1098 yield frame
1104 yield frame
1099
1105
1100 self._activecommands.remove(requestid)
1106 self._activecommands.remove(requestid)
1101
1107
1102 return self._handlesendframes(sendframes())
1108 return self._handlesendframes(sendframes())
1103
1109
1104 def oncommanderror(self, stream, requestid, message, args=None):
1110 def oncommanderror(self, stream, requestid, message, args=None):
1105 """Called when a command encountered an error before sending output."""
1111 """Called when a command encountered an error before sending output."""
1106 ensureserverstream(stream)
1112 ensureserverstream(stream)
1107
1113
1108 def sendframes():
1114 def sendframes():
1109 for frame in createcommanderrorresponse(stream, requestid, message,
1115 for frame in createcommanderrorresponse(stream, requestid, message,
1110 args):
1116 args):
1111 yield frame
1117 yield frame
1112
1118
1113 self._activecommands.remove(requestid)
1119 self._activecommands.remove(requestid)
1114
1120
1115 return self._handlesendframes(sendframes())
1121 return self._handlesendframes(sendframes())
1116
1122
1117 def makeoutputstream(self):
1123 def makeoutputstream(self):
1118 """Create a stream to be used for sending data to the client."""
1124 """Create a stream to be used for sending data to the client."""
1119 streamid = self._nextoutgoingstreamid
1125 streamid = self._nextoutgoingstreamid
1120 self._nextoutgoingstreamid += 2
1126 self._nextoutgoingstreamid += 2
1121
1127
1122 s = outputstream(streamid)
1128 s = outputstream(streamid)
1123 self._outgoingstreams[streamid] = s
1129 self._outgoingstreams[streamid] = s
1124
1130
1125 return s
1131 return s
1126
1132
1127 def _makeerrorresult(self, msg):
1133 def _makeerrorresult(self, msg):
1128 return 'error', {
1134 return 'error', {
1129 'message': msg,
1135 'message': msg,
1130 }
1136 }
1131
1137
1132 def _makeruncommandresult(self, requestid):
1138 def _makeruncommandresult(self, requestid):
1133 entry = self._receivingcommands[requestid]
1139 entry = self._receivingcommands[requestid]
1134
1140
1135 if not entry['requestdone']:
1141 if not entry['requestdone']:
1136 self._state = 'errored'
1142 self._state = 'errored'
1137 raise error.ProgrammingError('should not be called without '
1143 raise error.ProgrammingError('should not be called without '
1138 'requestdone set')
1144 'requestdone set')
1139
1145
1140 del self._receivingcommands[requestid]
1146 del self._receivingcommands[requestid]
1141
1147
1142 if self._receivingcommands:
1148 if self._receivingcommands:
1143 self._state = 'command-receiving'
1149 self._state = 'command-receiving'
1144 else:
1150 else:
1145 self._state = 'idle'
1151 self._state = 'idle'
1146
1152
1147 # Decode the payloads as CBOR.
1153 # Decode the payloads as CBOR.
1148 entry['payload'].seek(0)
1154 entry['payload'].seek(0)
1149 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1155 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1150
1156
1151 if b'name' not in request:
1157 if b'name' not in request:
1152 self._state = 'errored'
1158 self._state = 'errored'
1153 return self._makeerrorresult(
1159 return self._makeerrorresult(
1154 _('command request missing "name" field'))
1160 _('command request missing "name" field'))
1155
1161
1156 if b'args' not in request:
1162 if b'args' not in request:
1157 request[b'args'] = {}
1163 request[b'args'] = {}
1158
1164
1159 assert requestid not in self._activecommands
1165 assert requestid not in self._activecommands
1160 self._activecommands.add(requestid)
1166 self._activecommands.add(requestid)
1161
1167
1162 return 'runcommand', {
1168 return 'runcommand', {
1163 'requestid': requestid,
1169 'requestid': requestid,
1164 'command': request[b'name'],
1170 'command': request[b'name'],
1165 'args': request[b'args'],
1171 'args': request[b'args'],
1166 'redirect': request.get(b'redirect'),
1172 'redirect': request.get(b'redirect'),
1167 'data': entry['data'].getvalue() if entry['data'] else None,
1173 'data': entry['data'].getvalue() if entry['data'] else None,
1168 }
1174 }
1169
1175
1170 def _makewantframeresult(self):
1176 def _makewantframeresult(self):
1171 return 'wantframe', {
1177 return 'wantframe', {
1172 'state': self._state,
1178 'state': self._state,
1173 }
1179 }
1174
1180
1175 def _validatecommandrequestframe(self, frame):
1181 def _validatecommandrequestframe(self, frame):
1176 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1182 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1177 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1183 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1178
1184
1179 if new and continuation:
1185 if new and continuation:
1180 self._state = 'errored'
1186 self._state = 'errored'
1181 return self._makeerrorresult(
1187 return self._makeerrorresult(
1182 _('received command request frame with both new and '
1188 _('received command request frame with both new and '
1183 'continuation flags set'))
1189 'continuation flags set'))
1184
1190
1185 if not new and not continuation:
1191 if not new and not continuation:
1186 self._state = 'errored'
1192 self._state = 'errored'
1187 return self._makeerrorresult(
1193 return self._makeerrorresult(
1188 _('received command request frame with neither new nor '
1194 _('received command request frame with neither new nor '
1189 'continuation flags set'))
1195 'continuation flags set'))
1190
1196
1191 def _onframeinitial(self, frame):
1197 def _onframeinitial(self, frame):
1192 # Called when we receive a frame when in the "initial" state.
1198 # Called when we receive a frame when in the "initial" state.
1193 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1199 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1194 self._state = 'protocol-settings-receiving'
1200 self._state = 'protocol-settings-receiving'
1195 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1201 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1196 return self._onframeprotocolsettings(frame)
1202 return self._onframeprotocolsettings(frame)
1197
1203
1198 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1204 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1199 self._state = 'idle'
1205 self._state = 'idle'
1200 return self._onframeidle(frame)
1206 return self._onframeidle(frame)
1201
1207
1202 else:
1208 else:
1203 self._state = 'errored'
1209 self._state = 'errored'
1204 return self._makeerrorresult(
1210 return self._makeerrorresult(
1205 _('expected sender protocol settings or command request '
1211 _('expected sender protocol settings or command request '
1206 'frame; got %d') % frame.typeid)
1212 'frame; got %d') % frame.typeid)
1207
1213
1208 def _onframeprotocolsettings(self, frame):
1214 def _onframeprotocolsettings(self, frame):
1209 assert self._state == 'protocol-settings-receiving'
1215 assert self._state == 'protocol-settings-receiving'
1210 assert self._protocolsettingsdecoder is not None
1216 assert self._protocolsettingsdecoder is not None
1211
1217
1212 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1218 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1213 self._state = 'errored'
1219 self._state = 'errored'
1214 return self._makeerrorresult(
1220 return self._makeerrorresult(
1215 _('expected sender protocol settings frame; got %d') %
1221 _('expected sender protocol settings frame; got %d') %
1216 frame.typeid)
1222 frame.typeid)
1217
1223
1218 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1224 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1219 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1225 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1220
1226
1221 if more and eos:
1227 if more and eos:
1222 self._state = 'errored'
1228 self._state = 'errored'
1223 return self._makeerrorresult(
1229 return self._makeerrorresult(
1224 _('sender protocol settings frame cannot have both '
1230 _('sender protocol settings frame cannot have both '
1225 'continuation and end of stream flags set'))
1231 'continuation and end of stream flags set'))
1226
1232
1227 if not more and not eos:
1233 if not more and not eos:
1228 self._state = 'errored'
1234 self._state = 'errored'
1229 return self._makeerrorresult(
1235 return self._makeerrorresult(
1230 _('sender protocol settings frame must have continuation or '
1236 _('sender protocol settings frame must have continuation or '
1231 'end of stream flag set'))
1237 'end of stream flag set'))
1232
1238
1233 # TODO establish limits for maximum amount of data that can be
1239 # TODO establish limits for maximum amount of data that can be
1234 # buffered.
1240 # buffered.
1235 try:
1241 try:
1236 self._protocolsettingsdecoder.decode(frame.payload)
1242 self._protocolsettingsdecoder.decode(frame.payload)
1237 except Exception as e:
1243 except Exception as e:
1238 self._state = 'errored'
1244 self._state = 'errored'
1239 return self._makeerrorresult(
1245 return self._makeerrorresult(
1240 _('error decoding CBOR from sender protocol settings frame: %s')
1246 _('error decoding CBOR from sender protocol settings frame: %s')
1241 % stringutil.forcebytestr(e))
1247 % stringutil.forcebytestr(e))
1242
1248
1243 if more:
1249 if more:
1244 return self._makewantframeresult()
1250 return self._makewantframeresult()
1245
1251
1246 assert eos
1252 assert eos
1247
1253
1248 decoded = self._protocolsettingsdecoder.getavailable()
1254 decoded = self._protocolsettingsdecoder.getavailable()
1249 self._protocolsettingsdecoder = None
1255 self._protocolsettingsdecoder = None
1250
1256
1251 if not decoded:
1257 if not decoded:
1252 self._state = 'errored'
1258 self._state = 'errored'
1253 return self._makeerrorresult(
1259 return self._makeerrorresult(
1254 _('sender protocol settings frame did not contain CBOR data'))
1260 _('sender protocol settings frame did not contain CBOR data'))
1255 elif len(decoded) > 1:
1261 elif len(decoded) > 1:
1256 self._state = 'errored'
1262 self._state = 'errored'
1257 return self._makeerrorresult(
1263 return self._makeerrorresult(
1258 _('sender protocol settings frame contained multiple CBOR '
1264 _('sender protocol settings frame contained multiple CBOR '
1259 'values'))
1265 'values'))
1260
1266
1261 d = decoded[0]
1267 d = decoded[0]
1262
1268
1263 if b'contentencodings' in d:
1269 if b'contentencodings' in d:
1264 self._sendersettings['contentencodings'] = d[b'contentencodings']
1270 self._sendersettings['contentencodings'] = d[b'contentencodings']
1265
1271
1266 self._state = 'idle'
1272 self._state = 'idle'
1267
1273
1268 return self._makewantframeresult()
1274 return self._makewantframeresult()
1269
1275
1270 def _onframeidle(self, frame):
1276 def _onframeidle(self, frame):
1271 # The only frame type that should be received in this state is a
1277 # The only frame type that should be received in this state is a
1272 # command request.
1278 # command request.
1273 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1279 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1274 self._state = 'errored'
1280 self._state = 'errored'
1275 return self._makeerrorresult(
1281 return self._makeerrorresult(
1276 _('expected command request frame; got %d') % frame.typeid)
1282 _('expected command request frame; got %d') % frame.typeid)
1277
1283
1278 res = self._validatecommandrequestframe(frame)
1284 res = self._validatecommandrequestframe(frame)
1279 if res:
1285 if res:
1280 return res
1286 return res
1281
1287
1282 if frame.requestid in self._receivingcommands:
1288 if frame.requestid in self._receivingcommands:
1283 self._state = 'errored'
1289 self._state = 'errored'
1284 return self._makeerrorresult(
1290 return self._makeerrorresult(
1285 _('request with ID %d already received') % frame.requestid)
1291 _('request with ID %d already received') % frame.requestid)
1286
1292
1287 if frame.requestid in self._activecommands:
1293 if frame.requestid in self._activecommands:
1288 self._state = 'errored'
1294 self._state = 'errored'
1289 return self._makeerrorresult(
1295 return self._makeerrorresult(
1290 _('request with ID %d is already active') % frame.requestid)
1296 _('request with ID %d is already active') % frame.requestid)
1291
1297
1292 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1298 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1293 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1299 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1294 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1300 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1295
1301
1296 if not new:
1302 if not new:
1297 self._state = 'errored'
1303 self._state = 'errored'
1298 return self._makeerrorresult(
1304 return self._makeerrorresult(
1299 _('received command request frame without new flag set'))
1305 _('received command request frame without new flag set'))
1300
1306
1301 payload = util.bytesio()
1307 payload = util.bytesio()
1302 payload.write(frame.payload)
1308 payload.write(frame.payload)
1303
1309
1304 self._receivingcommands[frame.requestid] = {
1310 self._receivingcommands[frame.requestid] = {
1305 'payload': payload,
1311 'payload': payload,
1306 'data': None,
1312 'data': None,
1307 'requestdone': not moreframes,
1313 'requestdone': not moreframes,
1308 'expectingdata': bool(expectingdata),
1314 'expectingdata': bool(expectingdata),
1309 }
1315 }
1310
1316
1311 # This is the final frame for this request. Dispatch it.
1317 # This is the final frame for this request. Dispatch it.
1312 if not moreframes and not expectingdata:
1318 if not moreframes and not expectingdata:
1313 return self._makeruncommandresult(frame.requestid)
1319 return self._makeruncommandresult(frame.requestid)
1314
1320
1315 assert moreframes or expectingdata
1321 assert moreframes or expectingdata
1316 self._state = 'command-receiving'
1322 self._state = 'command-receiving'
1317 return self._makewantframeresult()
1323 return self._makewantframeresult()
1318
1324
1319 def _onframecommandreceiving(self, frame):
1325 def _onframecommandreceiving(self, frame):
1320 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1326 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1321 # Process new command requests as such.
1327 # Process new command requests as such.
1322 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1328 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1323 return self._onframeidle(frame)
1329 return self._onframeidle(frame)
1324
1330
1325 res = self._validatecommandrequestframe(frame)
1331 res = self._validatecommandrequestframe(frame)
1326 if res:
1332 if res:
1327 return res
1333 return res
1328
1334
1329 # All other frames should be related to a command that is currently
1335 # All other frames should be related to a command that is currently
1330 # receiving but is not active.
1336 # receiving but is not active.
1331 if frame.requestid in self._activecommands:
1337 if frame.requestid in self._activecommands:
1332 self._state = 'errored'
1338 self._state = 'errored'
1333 return self._makeerrorresult(
1339 return self._makeerrorresult(
1334 _('received frame for request that is still active: %d') %
1340 _('received frame for request that is still active: %d') %
1335 frame.requestid)
1341 frame.requestid)
1336
1342
1337 if frame.requestid not in self._receivingcommands:
1343 if frame.requestid not in self._receivingcommands:
1338 self._state = 'errored'
1344 self._state = 'errored'
1339 return self._makeerrorresult(
1345 return self._makeerrorresult(
1340 _('received frame for request that is not receiving: %d') %
1346 _('received frame for request that is not receiving: %d') %
1341 frame.requestid)
1347 frame.requestid)
1342
1348
1343 entry = self._receivingcommands[frame.requestid]
1349 entry = self._receivingcommands[frame.requestid]
1344
1350
1345 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1351 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1346 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1352 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1347 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1353 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1348
1354
1349 if entry['requestdone']:
1355 if entry['requestdone']:
1350 self._state = 'errored'
1356 self._state = 'errored'
1351 return self._makeerrorresult(
1357 return self._makeerrorresult(
1352 _('received command request frame when request frames '
1358 _('received command request frame when request frames '
1353 'were supposedly done'))
1359 'were supposedly done'))
1354
1360
1355 if expectingdata != entry['expectingdata']:
1361 if expectingdata != entry['expectingdata']:
1356 self._state = 'errored'
1362 self._state = 'errored'
1357 return self._makeerrorresult(
1363 return self._makeerrorresult(
1358 _('mismatch between expect data flag and previous frame'))
1364 _('mismatch between expect data flag and previous frame'))
1359
1365
1360 entry['payload'].write(frame.payload)
1366 entry['payload'].write(frame.payload)
1361
1367
1362 if not moreframes:
1368 if not moreframes:
1363 entry['requestdone'] = True
1369 entry['requestdone'] = True
1364
1370
1365 if not moreframes and not expectingdata:
1371 if not moreframes and not expectingdata:
1366 return self._makeruncommandresult(frame.requestid)
1372 return self._makeruncommandresult(frame.requestid)
1367
1373
1368 return self._makewantframeresult()
1374 return self._makewantframeresult()
1369
1375
1370 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1376 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1371 if not entry['expectingdata']:
1377 if not entry['expectingdata']:
1372 self._state = 'errored'
1378 self._state = 'errored'
1373 return self._makeerrorresult(_(
1379 return self._makeerrorresult(_(
1374 'received command data frame for request that is not '
1380 'received command data frame for request that is not '
1375 'expecting data: %d') % frame.requestid)
1381 'expecting data: %d') % frame.requestid)
1376
1382
1377 if entry['data'] is None:
1383 if entry['data'] is None:
1378 entry['data'] = util.bytesio()
1384 entry['data'] = util.bytesio()
1379
1385
1380 return self._handlecommanddataframe(frame, entry)
1386 return self._handlecommanddataframe(frame, entry)
1381 else:
1387 else:
1382 self._state = 'errored'
1388 self._state = 'errored'
1383 return self._makeerrorresult(_(
1389 return self._makeerrorresult(_(
1384 'received unexpected frame type: %d') % frame.typeid)
1390 'received unexpected frame type: %d') % frame.typeid)
1385
1391
1386 def _handlecommanddataframe(self, frame, entry):
1392 def _handlecommanddataframe(self, frame, entry):
1387 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1393 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1388
1394
1389 # TODO support streaming data instead of buffering it.
1395 # TODO support streaming data instead of buffering it.
1390 entry['data'].write(frame.payload)
1396 entry['data'].write(frame.payload)
1391
1397
1392 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1398 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1393 return self._makewantframeresult()
1399 return self._makewantframeresult()
1394 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1400 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1395 entry['data'].seek(0)
1401 entry['data'].seek(0)
1396 return self._makeruncommandresult(frame.requestid)
1402 return self._makeruncommandresult(frame.requestid)
1397 else:
1403 else:
1398 self._state = 'errored'
1404 self._state = 'errored'
1399 return self._makeerrorresult(_('command data frame without '
1405 return self._makeerrorresult(_('command data frame without '
1400 'flags'))
1406 'flags'))
1401
1407
1402 def _onframeerrored(self, frame):
1408 def _onframeerrored(self, frame):
1403 return self._makeerrorresult(_('server already errored'))
1409 return self._makeerrorresult(_('server already errored'))
1404
1410
1405 class commandrequest(object):
1411 class commandrequest(object):
1406 """Represents a request to run a command."""
1412 """Represents a request to run a command."""
1407
1413
1408 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1414 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1409 self.requestid = requestid
1415 self.requestid = requestid
1410 self.name = name
1416 self.name = name
1411 self.args = args
1417 self.args = args
1412 self.datafh = datafh
1418 self.datafh = datafh
1413 self.redirect = redirect
1419 self.redirect = redirect
1414 self.state = 'pending'
1420 self.state = 'pending'
1415
1421
1416 class clientreactor(object):
1422 class clientreactor(object):
1417 """Holds state of a client issuing frame-based protocol requests.
1423 """Holds state of a client issuing frame-based protocol requests.
1418
1424
1419 This is like ``serverreactor`` but for client-side state.
1425 This is like ``serverreactor`` but for client-side state.
1420
1426
1421 Each instance is bound to the lifetime of a connection. For persistent
1427 Each instance is bound to the lifetime of a connection. For persistent
1422 connection transports using e.g. TCP sockets and speaking the raw
1428 connection transports using e.g. TCP sockets and speaking the raw
1423 framing protocol, there will be a single instance for the lifetime of
1429 framing protocol, there will be a single instance for the lifetime of
1424 the TCP socket. For transports where there are multiple discrete
1430 the TCP socket. For transports where there are multiple discrete
1425 interactions (say tunneled within in HTTP request), there will be a
1431 interactions (say tunneled within in HTTP request), there will be a
1426 separate instance for each distinct interaction.
1432 separate instance for each distinct interaction.
1427
1433
1428 Consumers are expected to tell instances when events occur by calling
1434 Consumers are expected to tell instances when events occur by calling
1429 various methods. These methods return a 2-tuple describing any follow-up
1435 various methods. These methods return a 2-tuple describing any follow-up
1430 action(s) to take. The first element is the name of an action to
1436 action(s) to take. The first element is the name of an action to
1431 perform. The second is a data structure (usually a dict) specific to
1437 perform. The second is a data structure (usually a dict) specific to
1432 that action that contains more information. e.g. if the reactor wants
1438 that action that contains more information. e.g. if the reactor wants
1433 to send frames to the server, the data structure will contain a reference
1439 to send frames to the server, the data structure will contain a reference
1434 to those frames.
1440 to those frames.
1435
1441
1436 Valid actions that consumers can be instructed to take are:
1442 Valid actions that consumers can be instructed to take are:
1437
1443
1438 noop
1444 noop
1439 Indicates no additional action is required.
1445 Indicates no additional action is required.
1440
1446
1441 sendframes
1447 sendframes
1442 Indicates that frames should be sent to the server. The ``framegen``
1448 Indicates that frames should be sent to the server. The ``framegen``
1443 key contains a generator of frames that should be sent. The reactor
1449 key contains a generator of frames that should be sent. The reactor
1444 assumes that all frames in this generator are sent to the server.
1450 assumes that all frames in this generator are sent to the server.
1445
1451
1446 error
1452 error
1447 Indicates that an error occurred. The ``message`` key contains an
1453 Indicates that an error occurred. The ``message`` key contains an
1448 error message describing the failure.
1454 error message describing the failure.
1449
1455
1450 responsedata
1456 responsedata
1451 Indicates a response to a previously-issued command was received.
1457 Indicates a response to a previously-issued command was received.
1452
1458
1453 The ``request`` key contains the ``commandrequest`` instance that
1459 The ``request`` key contains the ``commandrequest`` instance that
1454 represents the request this data is for.
1460 represents the request this data is for.
1455
1461
1456 The ``data`` key contains the decoded data from the server.
1462 The ``data`` key contains the decoded data from the server.
1457
1463
1458 ``expectmore`` and ``eos`` evaluate to True when more response data
1464 ``expectmore`` and ``eos`` evaluate to True when more response data
1459 is expected to follow or we're at the end of the response stream,
1465 is expected to follow or we're at the end of the response stream,
1460 respectively.
1466 respectively.
1461 """
1467 """
1462 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1468 def __init__(self, ui, hasmultiplesend=False, buffersends=True,
1463 clientcontentencoders=None):
1469 clientcontentencoders=None):
1464 """Create a new instance.
1470 """Create a new instance.
1465
1471
1466 ``hasmultiplesend`` indicates whether multiple sends are supported
1472 ``hasmultiplesend`` indicates whether multiple sends are supported
1467 by the transport. When True, it is possible to send commands immediately
1473 by the transport. When True, it is possible to send commands immediately
1468 instead of buffering until the caller signals an intent to finish a
1474 instead of buffering until the caller signals an intent to finish a
1469 send operation.
1475 send operation.
1470
1476
1471 ``buffercommands`` indicates whether sends should be buffered until the
1477 ``buffercommands`` indicates whether sends should be buffered until the
1472 last request has been issued.
1478 last request has been issued.
1473
1479
1474 ``clientcontentencoders`` is an iterable of content encoders the client
1480 ``clientcontentencoders`` is an iterable of content encoders the client
1475 will advertise to the server and that the server can use for encoding
1481 will advertise to the server and that the server can use for encoding
1476 data. If not defined, the client will not advertise content encoders
1482 data. If not defined, the client will not advertise content encoders
1477 to the server.
1483 to the server.
1478 """
1484 """
1479 self._ui = ui
1485 self._ui = ui
1480 self._hasmultiplesend = hasmultiplesend
1486 self._hasmultiplesend = hasmultiplesend
1481 self._buffersends = buffersends
1487 self._buffersends = buffersends
1482 self._clientcontentencoders = clientcontentencoders
1488 self._clientcontentencoders = clientcontentencoders
1483
1489
1484 self._canissuecommands = True
1490 self._canissuecommands = True
1485 self._cansend = True
1491 self._cansend = True
1486 self._protocolsettingssent = False
1492 self._protocolsettingssent = False
1487
1493
1488 self._nextrequestid = 1
1494 self._nextrequestid = 1
1489 # We only support a single outgoing stream for now.
1495 # We only support a single outgoing stream for now.
1490 self._outgoingstream = outputstream(1)
1496 self._outgoingstream = outputstream(1)
1491 self._pendingrequests = collections.deque()
1497 self._pendingrequests = collections.deque()
1492 self._activerequests = {}
1498 self._activerequests = {}
1493 self._incomingstreams = {}
1499 self._incomingstreams = {}
1494 self._streamsettingsdecoders = {}
1500 self._streamsettingsdecoders = {}
1495
1501
1496 populatestreamencoders()
1502 populatestreamencoders()
1497
1503
1498 def callcommand(self, name, args, datafh=None, redirect=None):
1504 def callcommand(self, name, args, datafh=None, redirect=None):
1499 """Request that a command be executed.
1505 """Request that a command be executed.
1500
1506
1501 Receives the command name, a dict of arguments to pass to the command,
1507 Receives the command name, a dict of arguments to pass to the command,
1502 and an optional file object containing the raw data for the command.
1508 and an optional file object containing the raw data for the command.
1503
1509
1504 Returns a 3-tuple of (request, action, action data).
1510 Returns a 3-tuple of (request, action, action data).
1505 """
1511 """
1506 if not self._canissuecommands:
1512 if not self._canissuecommands:
1507 raise error.ProgrammingError('cannot issue new commands')
1513 raise error.ProgrammingError('cannot issue new commands')
1508
1514
1509 requestid = self._nextrequestid
1515 requestid = self._nextrequestid
1510 self._nextrequestid += 2
1516 self._nextrequestid += 2
1511
1517
1512 request = commandrequest(requestid, name, args, datafh=datafh,
1518 request = commandrequest(requestid, name, args, datafh=datafh,
1513 redirect=redirect)
1519 redirect=redirect)
1514
1520
1515 if self._buffersends:
1521 if self._buffersends:
1516 self._pendingrequests.append(request)
1522 self._pendingrequests.append(request)
1517 return request, 'noop', {}
1523 return request, 'noop', {}
1518 else:
1524 else:
1519 if not self._cansend:
1525 if not self._cansend:
1520 raise error.ProgrammingError('sends cannot be performed on '
1526 raise error.ProgrammingError('sends cannot be performed on '
1521 'this instance')
1527 'this instance')
1522
1528
1523 if not self._hasmultiplesend:
1529 if not self._hasmultiplesend:
1524 self._cansend = False
1530 self._cansend = False
1525 self._canissuecommands = False
1531 self._canissuecommands = False
1526
1532
1527 return request, 'sendframes', {
1533 return request, 'sendframes', {
1528 'framegen': self._makecommandframes(request),
1534 'framegen': self._makecommandframes(request),
1529 }
1535 }
1530
1536
1531 def flushcommands(self):
1537 def flushcommands(self):
1532 """Request that all queued commands be sent.
1538 """Request that all queued commands be sent.
1533
1539
1534 If any commands are buffered, this will instruct the caller to send
1540 If any commands are buffered, this will instruct the caller to send
1535 them over the wire. If no commands are buffered it instructs the client
1541 them over the wire. If no commands are buffered it instructs the client
1536 to no-op.
1542 to no-op.
1537
1543
1538 If instances aren't configured for multiple sends, no new command
1544 If instances aren't configured for multiple sends, no new command
1539 requests are allowed after this is called.
1545 requests are allowed after this is called.
1540 """
1546 """
1541 if not self._pendingrequests:
1547 if not self._pendingrequests:
1542 return 'noop', {}
1548 return 'noop', {}
1543
1549
1544 if not self._cansend:
1550 if not self._cansend:
1545 raise error.ProgrammingError('sends cannot be performed on this '
1551 raise error.ProgrammingError('sends cannot be performed on this '
1546 'instance')
1552 'instance')
1547
1553
1548 # If the instance only allows sending once, mark that we have fired
1554 # If the instance only allows sending once, mark that we have fired
1549 # our one shot.
1555 # our one shot.
1550 if not self._hasmultiplesend:
1556 if not self._hasmultiplesend:
1551 self._canissuecommands = False
1557 self._canissuecommands = False
1552 self._cansend = False
1558 self._cansend = False
1553
1559
1554 def makeframes():
1560 def makeframes():
1555 while self._pendingrequests:
1561 while self._pendingrequests:
1556 request = self._pendingrequests.popleft()
1562 request = self._pendingrequests.popleft()
1557 for frame in self._makecommandframes(request):
1563 for frame in self._makecommandframes(request):
1558 yield frame
1564 yield frame
1559
1565
1560 return 'sendframes', {
1566 return 'sendframes', {
1561 'framegen': makeframes(),
1567 'framegen': makeframes(),
1562 }
1568 }
1563
1569
1564 def _makecommandframes(self, request):
1570 def _makecommandframes(self, request):
1565 """Emit frames to issue a command request.
1571 """Emit frames to issue a command request.
1566
1572
1567 As a side-effect, update request accounting to reflect its changed
1573 As a side-effect, update request accounting to reflect its changed
1568 state.
1574 state.
1569 """
1575 """
1570 self._activerequests[request.requestid] = request
1576 self._activerequests[request.requestid] = request
1571 request.state = 'sending'
1577 request.state = 'sending'
1572
1578
1573 if not self._protocolsettingssent and self._clientcontentencoders:
1579 if not self._protocolsettingssent and self._clientcontentencoders:
1574 self._protocolsettingssent = True
1580 self._protocolsettingssent = True
1575
1581
1576 payload = b''.join(cborutil.streamencode({
1582 payload = b''.join(cborutil.streamencode({
1577 b'contentencodings': self._clientcontentencoders,
1583 b'contentencodings': self._clientcontentencoders,
1578 }))
1584 }))
1579
1585
1580 yield self._outgoingstream.makeframe(
1586 yield self._outgoingstream.makeframe(
1581 requestid=request.requestid,
1587 requestid=request.requestid,
1582 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1588 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1583 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1589 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1584 payload=payload)
1590 payload=payload)
1585
1591
1586 res = createcommandframes(self._outgoingstream,
1592 res = createcommandframes(self._outgoingstream,
1587 request.requestid,
1593 request.requestid,
1588 request.name,
1594 request.name,
1589 request.args,
1595 request.args,
1590 datafh=request.datafh,
1596 datafh=request.datafh,
1591 redirect=request.redirect)
1597 redirect=request.redirect)
1592
1598
1593 for frame in res:
1599 for frame in res:
1594 yield frame
1600 yield frame
1595
1601
1596 request.state = 'sent'
1602 request.state = 'sent'
1597
1603
1598 def onframerecv(self, frame):
1604 def onframerecv(self, frame):
1599 """Process a frame that has been received off the wire.
1605 """Process a frame that has been received off the wire.
1600
1606
1601 Returns a 2-tuple of (action, meta) describing further action the
1607 Returns a 2-tuple of (action, meta) describing further action the
1602 caller needs to take as a result of receiving this frame.
1608 caller needs to take as a result of receiving this frame.
1603 """
1609 """
1604 if frame.streamid % 2:
1610 if frame.streamid % 2:
1605 return 'error', {
1611 return 'error', {
1606 'message': (
1612 'message': (
1607 _('received frame with odd numbered stream ID: %d') %
1613 _('received frame with odd numbered stream ID: %d') %
1608 frame.streamid),
1614 frame.streamid),
1609 }
1615 }
1610
1616
1611 if frame.streamid not in self._incomingstreams:
1617 if frame.streamid not in self._incomingstreams:
1612 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1618 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1613 return 'error', {
1619 return 'error', {
1614 'message': _('received frame on unknown stream '
1620 'message': _('received frame on unknown stream '
1615 'without beginning of stream flag set'),
1621 'without beginning of stream flag set'),
1616 }
1622 }
1617
1623
1618 self._incomingstreams[frame.streamid] = inputstream(
1624 self._incomingstreams[frame.streamid] = inputstream(
1619 frame.streamid)
1625 frame.streamid)
1620
1626
1621 stream = self._incomingstreams[frame.streamid]
1627 stream = self._incomingstreams[frame.streamid]
1622
1628
1623 # If the payload is encoded, ask the stream to decode it. We
1629 # If the payload is encoded, ask the stream to decode it. We
1624 # merely substitute the decoded result into the frame payload as
1630 # merely substitute the decoded result into the frame payload as
1625 # if it had been transferred all along.
1631 # if it had been transferred all along.
1626 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1632 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1627 frame.payload = stream.decode(frame.payload)
1633 frame.payload = stream.decode(frame.payload)
1628
1634
1629 if frame.streamflags & STREAM_FLAG_END_STREAM:
1635 if frame.streamflags & STREAM_FLAG_END_STREAM:
1630 del self._incomingstreams[frame.streamid]
1636 del self._incomingstreams[frame.streamid]
1631
1637
1632 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1638 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1633 return self._onstreamsettingsframe(frame)
1639 return self._onstreamsettingsframe(frame)
1634
1640
1635 if frame.requestid not in self._activerequests:
1641 if frame.requestid not in self._activerequests:
1636 return 'error', {
1642 return 'error', {
1637 'message': (_('received frame for inactive request ID: %d') %
1643 'message': (_('received frame for inactive request ID: %d') %
1638 frame.requestid),
1644 frame.requestid),
1639 }
1645 }
1640
1646
1641 request = self._activerequests[frame.requestid]
1647 request = self._activerequests[frame.requestid]
1642 request.state = 'receiving'
1648 request.state = 'receiving'
1643
1649
1644 handlers = {
1650 handlers = {
1645 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1651 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1646 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1652 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1647 }
1653 }
1648
1654
1649 meth = handlers.get(frame.typeid)
1655 meth = handlers.get(frame.typeid)
1650 if not meth:
1656 if not meth:
1651 raise error.ProgrammingError('unhandled frame type: %d' %
1657 raise error.ProgrammingError('unhandled frame type: %d' %
1652 frame.typeid)
1658 frame.typeid)
1653
1659
1654 return meth(request, frame)
1660 return meth(request, frame)
1655
1661
1656 def _onstreamsettingsframe(self, frame):
1662 def _onstreamsettingsframe(self, frame):
1657 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1663 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1658
1664
1659 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1665 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1660 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1666 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1661
1667
1662 if more and eos:
1668 if more and eos:
1663 return 'error', {
1669 return 'error', {
1664 'message': (_('stream encoding settings frame cannot have both '
1670 'message': (_('stream encoding settings frame cannot have both '
1665 'continuation and end of stream flags set')),
1671 'continuation and end of stream flags set')),
1666 }
1672 }
1667
1673
1668 if not more and not eos:
1674 if not more and not eos:
1669 return 'error', {
1675 return 'error', {
1670 'message': _('stream encoding settings frame must have '
1676 'message': _('stream encoding settings frame must have '
1671 'continuation or end of stream flag set'),
1677 'continuation or end of stream flag set'),
1672 }
1678 }
1673
1679
1674 if frame.streamid not in self._streamsettingsdecoders:
1680 if frame.streamid not in self._streamsettingsdecoders:
1675 decoder = cborutil.bufferingdecoder()
1681 decoder = cborutil.bufferingdecoder()
1676 self._streamsettingsdecoders[frame.streamid] = decoder
1682 self._streamsettingsdecoders[frame.streamid] = decoder
1677
1683
1678 decoder = self._streamsettingsdecoders[frame.streamid]
1684 decoder = self._streamsettingsdecoders[frame.streamid]
1679
1685
1680 try:
1686 try:
1681 decoder.decode(frame.payload)
1687 decoder.decode(frame.payload)
1682 except Exception as e:
1688 except Exception as e:
1683 return 'error', {
1689 return 'error', {
1684 'message': (_('error decoding CBOR from stream encoding '
1690 'message': (_('error decoding CBOR from stream encoding '
1685 'settings frame: %s') %
1691 'settings frame: %s') %
1686 stringutil.forcebytestr(e)),
1692 stringutil.forcebytestr(e)),
1687 }
1693 }
1688
1694
1689 if more:
1695 if more:
1690 return 'noop', {}
1696 return 'noop', {}
1691
1697
1692 assert eos
1698 assert eos
1693
1699
1694 decoded = decoder.getavailable()
1700 decoded = decoder.getavailable()
1695 del self._streamsettingsdecoders[frame.streamid]
1701 del self._streamsettingsdecoders[frame.streamid]
1696
1702
1697 if not decoded:
1703 if not decoded:
1698 return 'error', {
1704 return 'error', {
1699 'message': _('stream encoding settings frame did not contain '
1705 'message': _('stream encoding settings frame did not contain '
1700 'CBOR data'),
1706 'CBOR data'),
1701 }
1707 }
1702
1708
1703 try:
1709 try:
1704 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1710 self._incomingstreams[frame.streamid].setdecoder(self._ui,
1705 decoded[0],
1711 decoded[0],
1706 decoded[1:])
1712 decoded[1:])
1707 except Exception as e:
1713 except Exception as e:
1708 return 'error', {
1714 return 'error', {
1709 'message': (_('error setting stream decoder: %s') %
1715 'message': (_('error setting stream decoder: %s') %
1710 stringutil.forcebytestr(e)),
1716 stringutil.forcebytestr(e)),
1711 }
1717 }
1712
1718
1713 return 'noop', {}
1719 return 'noop', {}
1714
1720
1715 def _oncommandresponseframe(self, request, frame):
1721 def _oncommandresponseframe(self, request, frame):
1716 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1722 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1717 request.state = 'received'
1723 request.state = 'received'
1718 del self._activerequests[request.requestid]
1724 del self._activerequests[request.requestid]
1719
1725
1720 return 'responsedata', {
1726 return 'responsedata', {
1721 'request': request,
1727 'request': request,
1722 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1728 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1723 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1729 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1724 'data': frame.payload,
1730 'data': frame.payload,
1725 }
1731 }
1726
1732
1727 def _onerrorresponseframe(self, request, frame):
1733 def _onerrorresponseframe(self, request, frame):
1728 request.state = 'errored'
1734 request.state = 'errored'
1729 del self._activerequests[request.requestid]
1735 del self._activerequests[request.requestid]
1730
1736
1731 # The payload should be a CBOR map.
1737 # The payload should be a CBOR map.
1732 m = cborutil.decodeall(frame.payload)[0]
1738 m = cborutil.decodeall(frame.payload)[0]
1733
1739
1734 return 'error', {
1740 return 'error', {
1735 'request': request,
1741 'request': request,
1736 'type': m['type'],
1742 'type': m['type'],
1737 'message': m['message'],
1743 'message': m['message'],
1738 }
1744 }
@@ -1,629 +1,628 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial.thirdparty import (
5 from mercurial.thirdparty import (
6 cbor,
6 cbor,
7 )
7 )
8 from mercurial import (
8 from mercurial import (
9 ui as uimod,
9 ui as uimod,
10 util,
10 util,
11 wireprotoframing as framing,
11 wireprotoframing as framing,
12 )
12 )
13 from mercurial.utils import (
13 from mercurial.utils import (
14 cborutil,
14 cborutil,
15 )
15 )
16
16
17 ffs = framing.makeframefromhumanstring
17 ffs = framing.makeframefromhumanstring
18
18
19 OK = cbor.dumps({b'status': b'ok'})
19 OK = cbor.dumps({b'status': b'ok'})
20
20
21 def makereactor(deferoutput=False):
21 def makereactor(deferoutput=False):
22 ui = uimod.ui()
22 ui = uimod.ui()
23 return framing.serverreactor(ui, deferoutput=deferoutput)
23 return framing.serverreactor(ui, deferoutput=deferoutput)
24
24
25 def sendframes(reactor, gen):
25 def sendframes(reactor, gen):
26 """Send a generator of frame bytearray to a reactor.
26 """Send a generator of frame bytearray to a reactor.
27
27
28 Emits a generator of results from ``onframerecv()`` calls.
28 Emits a generator of results from ``onframerecv()`` calls.
29 """
29 """
30 for frame in gen:
30 for frame in gen:
31 header = framing.parseheader(frame)
31 header = framing.parseheader(frame)
32 payload = frame[framing.FRAME_HEADER_SIZE:]
32 payload = frame[framing.FRAME_HEADER_SIZE:]
33 assert len(payload) == header.length
33 assert len(payload) == header.length
34
34
35 yield reactor.onframerecv(framing.frame(header.requestid,
35 yield reactor.onframerecv(framing.frame(header.requestid,
36 header.streamid,
36 header.streamid,
37 header.streamflags,
37 header.streamflags,
38 header.typeid,
38 header.typeid,
39 header.flags,
39 header.flags,
40 payload))
40 payload))
41
41
42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
43 """Generate frames to run a command and send them to a reactor."""
43 """Generate frames to run a command and send them to a reactor."""
44 return sendframes(reactor,
44 return sendframes(reactor,
45 framing.createcommandframes(stream, rid, cmd, args,
45 framing.createcommandframes(stream, rid, cmd, args,
46 datafh))
46 datafh))
47
47
48
48
49 class ServerReactorTests(unittest.TestCase):
49 class ServerReactorTests(unittest.TestCase):
50 def _sendsingleframe(self, reactor, f):
50 def _sendsingleframe(self, reactor, f):
51 results = list(sendframes(reactor, [f]))
51 results = list(sendframes(reactor, [f]))
52 self.assertEqual(len(results), 1)
52 self.assertEqual(len(results), 1)
53
53
54 return results[0]
54 return results[0]
55
55
56 def assertaction(self, res, expected):
56 def assertaction(self, res, expected):
57 self.assertIsInstance(res, tuple)
57 self.assertIsInstance(res, tuple)
58 self.assertEqual(len(res), 2)
58 self.assertEqual(len(res), 2)
59 self.assertIsInstance(res[1], dict)
59 self.assertIsInstance(res[1], dict)
60 self.assertEqual(res[0], expected)
60 self.assertEqual(res[0], expected)
61
61
62 def assertframesequal(self, frames, framestrings):
62 def assertframesequal(self, frames, framestrings):
63 expected = [ffs(s) for s in framestrings]
63 expected = [ffs(s) for s in framestrings]
64 self.assertEqual(list(frames), expected)
64 self.assertEqual(list(frames), expected)
65
65
66 def test1framecommand(self):
66 def test1framecommand(self):
67 """Receiving a command in a single frame yields request to run it."""
67 """Receiving a command in a single frame yields request to run it."""
68 reactor = makereactor()
68 reactor = makereactor()
69 stream = framing.stream(1)
69 stream = framing.stream(1)
70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
71 self.assertEqual(len(results), 1)
71 self.assertEqual(len(results), 1)
72 self.assertaction(results[0], b'runcommand')
72 self.assertaction(results[0], b'runcommand')
73 self.assertEqual(results[0][1], {
73 self.assertEqual(results[0][1], {
74 b'requestid': 1,
74 b'requestid': 1,
75 b'command': b'mycommand',
75 b'command': b'mycommand',
76 b'args': {},
76 b'args': {},
77 b'redirect': None,
77 b'redirect': None,
78 b'data': None,
78 b'data': None,
79 })
79 })
80
80
81 result = reactor.oninputeof()
81 result = reactor.oninputeof()
82 self.assertaction(result, b'noop')
82 self.assertaction(result, b'noop')
83
83
84 def test1argument(self):
84 def test1argument(self):
85 reactor = makereactor()
85 reactor = makereactor()
86 stream = framing.stream(1)
86 stream = framing.stream(1)
87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
88 {b'foo': b'bar'}))
88 {b'foo': b'bar'}))
89 self.assertEqual(len(results), 1)
89 self.assertEqual(len(results), 1)
90 self.assertaction(results[0], b'runcommand')
90 self.assertaction(results[0], b'runcommand')
91 self.assertEqual(results[0][1], {
91 self.assertEqual(results[0][1], {
92 b'requestid': 41,
92 b'requestid': 41,
93 b'command': b'mycommand',
93 b'command': b'mycommand',
94 b'args': {b'foo': b'bar'},
94 b'args': {b'foo': b'bar'},
95 b'redirect': None,
95 b'redirect': None,
96 b'data': None,
96 b'data': None,
97 })
97 })
98
98
99 def testmultiarguments(self):
99 def testmultiarguments(self):
100 reactor = makereactor()
100 reactor = makereactor()
101 stream = framing.stream(1)
101 stream = framing.stream(1)
102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
103 {b'foo': b'bar', b'biz': b'baz'}))
103 {b'foo': b'bar', b'biz': b'baz'}))
104 self.assertEqual(len(results), 1)
104 self.assertEqual(len(results), 1)
105 self.assertaction(results[0], b'runcommand')
105 self.assertaction(results[0], b'runcommand')
106 self.assertEqual(results[0][1], {
106 self.assertEqual(results[0][1], {
107 b'requestid': 1,
107 b'requestid': 1,
108 b'command': b'mycommand',
108 b'command': b'mycommand',
109 b'args': {b'foo': b'bar', b'biz': b'baz'},
109 b'args': {b'foo': b'bar', b'biz': b'baz'},
110 b'redirect': None,
110 b'redirect': None,
111 b'data': None,
111 b'data': None,
112 })
112 })
113
113
114 def testsimplecommanddata(self):
114 def testsimplecommanddata(self):
115 reactor = makereactor()
115 reactor = makereactor()
116 stream = framing.stream(1)
116 stream = framing.stream(1)
117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
118 util.bytesio(b'data!')))
118 util.bytesio(b'data!')))
119 self.assertEqual(len(results), 2)
119 self.assertEqual(len(results), 2)
120 self.assertaction(results[0], b'wantframe')
120 self.assertaction(results[0], b'wantframe')
121 self.assertaction(results[1], b'runcommand')
121 self.assertaction(results[1], b'runcommand')
122 self.assertEqual(results[1][1], {
122 self.assertEqual(results[1][1], {
123 b'requestid': 1,
123 b'requestid': 1,
124 b'command': b'mycommand',
124 b'command': b'mycommand',
125 b'args': {},
125 b'args': {},
126 b'redirect': None,
126 b'redirect': None,
127 b'data': b'data!',
127 b'data': b'data!',
128 })
128 })
129
129
130 def testmultipledataframes(self):
130 def testmultipledataframes(self):
131 frames = [
131 frames = [
132 ffs(b'1 1 stream-begin command-request new|have-data '
132 ffs(b'1 1 stream-begin command-request new|have-data '
133 b"cbor:{b'name': b'mycommand'}"),
133 b"cbor:{b'name': b'mycommand'}"),
134 ffs(b'1 1 0 command-data continuation data1'),
134 ffs(b'1 1 0 command-data continuation data1'),
135 ffs(b'1 1 0 command-data continuation data2'),
135 ffs(b'1 1 0 command-data continuation data2'),
136 ffs(b'1 1 0 command-data eos data3'),
136 ffs(b'1 1 0 command-data eos data3'),
137 ]
137 ]
138
138
139 reactor = makereactor()
139 reactor = makereactor()
140 results = list(sendframes(reactor, frames))
140 results = list(sendframes(reactor, frames))
141 self.assertEqual(len(results), 4)
141 self.assertEqual(len(results), 4)
142 for i in range(3):
142 for i in range(3):
143 self.assertaction(results[i], b'wantframe')
143 self.assertaction(results[i], b'wantframe')
144 self.assertaction(results[3], b'runcommand')
144 self.assertaction(results[3], b'runcommand')
145 self.assertEqual(results[3][1], {
145 self.assertEqual(results[3][1], {
146 b'requestid': 1,
146 b'requestid': 1,
147 b'command': b'mycommand',
147 b'command': b'mycommand',
148 b'args': {},
148 b'args': {},
149 b'redirect': None,
149 b'redirect': None,
150 b'data': b'data1data2data3',
150 b'data': b'data1data2data3',
151 })
151 })
152
152
153 def testargumentanddata(self):
153 def testargumentanddata(self):
154 frames = [
154 frames = [
155 ffs(b'1 1 stream-begin command-request new|have-data '
155 ffs(b'1 1 stream-begin command-request new|have-data '
156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
157 b"b'foo': b'bar'}}"),
157 b"b'foo': b'bar'}}"),
158 ffs(b'1 1 0 command-data continuation value1'),
158 ffs(b'1 1 0 command-data continuation value1'),
159 ffs(b'1 1 0 command-data eos value2'),
159 ffs(b'1 1 0 command-data eos value2'),
160 ]
160 ]
161
161
162 reactor = makereactor()
162 reactor = makereactor()
163 results = list(sendframes(reactor, frames))
163 results = list(sendframes(reactor, frames))
164
164
165 self.assertaction(results[-1], b'runcommand')
165 self.assertaction(results[-1], b'runcommand')
166 self.assertEqual(results[-1][1], {
166 self.assertEqual(results[-1][1], {
167 b'requestid': 1,
167 b'requestid': 1,
168 b'command': b'command',
168 b'command': b'command',
169 b'args': {
169 b'args': {
170 b'key': b'val',
170 b'key': b'val',
171 b'foo': b'bar',
171 b'foo': b'bar',
172 },
172 },
173 b'redirect': None,
173 b'redirect': None,
174 b'data': b'value1value2',
174 b'data': b'value1value2',
175 })
175 })
176
176
177 def testnewandcontinuation(self):
177 def testnewandcontinuation(self):
178 result = self._sendsingleframe(makereactor(),
178 result = self._sendsingleframe(makereactor(),
179 ffs(b'1 1 stream-begin command-request new|continuation '))
179 ffs(b'1 1 stream-begin command-request new|continuation '))
180 self.assertaction(result, b'error')
180 self.assertaction(result, b'error')
181 self.assertEqual(result[1], {
181 self.assertEqual(result[1], {
182 b'message': b'received command request frame with both new and '
182 b'message': b'received command request frame with both new and '
183 b'continuation flags set',
183 b'continuation flags set',
184 })
184 })
185
185
186 def testneithernewnorcontinuation(self):
186 def testneithernewnorcontinuation(self):
187 result = self._sendsingleframe(makereactor(),
187 result = self._sendsingleframe(makereactor(),
188 ffs(b'1 1 stream-begin command-request 0 '))
188 ffs(b'1 1 stream-begin command-request 0 '))
189 self.assertaction(result, b'error')
189 self.assertaction(result, b'error')
190 self.assertEqual(result[1], {
190 self.assertEqual(result[1], {
191 b'message': b'received command request frame with neither new nor '
191 b'message': b'received command request frame with neither new nor '
192 b'continuation flags set',
192 b'continuation flags set',
193 })
193 })
194
194
195 def testunexpectedcommanddata(self):
195 def testunexpectedcommanddata(self):
196 """Command data frame when not running a command is an error."""
196 """Command data frame when not running a command is an error."""
197 result = self._sendsingleframe(makereactor(),
197 result = self._sendsingleframe(makereactor(),
198 ffs(b'1 1 stream-begin command-data 0 ignored'))
198 ffs(b'1 1 stream-begin command-data 0 ignored'))
199 self.assertaction(result, b'error')
199 self.assertaction(result, b'error')
200 self.assertEqual(result[1], {
200 self.assertEqual(result[1], {
201 b'message': b'expected sender protocol settings or command request '
201 b'message': b'expected sender protocol settings or command request '
202 b'frame; got 2',
202 b'frame; got 2',
203 })
203 })
204
204
205 def testunexpectedcommanddatareceiving(self):
205 def testunexpectedcommanddatareceiving(self):
206 """Same as above except the command is receiving."""
206 """Same as above except the command is receiving."""
207 results = list(sendframes(makereactor(), [
207 results = list(sendframes(makereactor(), [
208 ffs(b'1 1 stream-begin command-request new|more '
208 ffs(b'1 1 stream-begin command-request new|more '
209 b"cbor:{b'name': b'ignored'}"),
209 b"cbor:{b'name': b'ignored'}"),
210 ffs(b'1 1 0 command-data eos ignored'),
210 ffs(b'1 1 0 command-data eos ignored'),
211 ]))
211 ]))
212
212
213 self.assertaction(results[0], b'wantframe')
213 self.assertaction(results[0], b'wantframe')
214 self.assertaction(results[1], b'error')
214 self.assertaction(results[1], b'error')
215 self.assertEqual(results[1][1], {
215 self.assertEqual(results[1][1], {
216 b'message': b'received command data frame for request that is not '
216 b'message': b'received command data frame for request that is not '
217 b'expecting data: 1',
217 b'expecting data: 1',
218 })
218 })
219
219
220 def testconflictingrequestidallowed(self):
220 def testconflictingrequestidallowed(self):
221 """Multiple fully serviced commands with same request ID is allowed."""
221 """Multiple fully serviced commands with same request ID is allowed."""
222 reactor = makereactor()
222 reactor = makereactor()
223 results = []
223 results = []
224 outstream = reactor.makeoutputstream()
224 outstream = reactor.makeoutputstream()
225 results.append(self._sendsingleframe(
225 results.append(self._sendsingleframe(
226 reactor, ffs(b'1 1 stream-begin command-request new '
226 reactor, ffs(b'1 1 stream-begin command-request new '
227 b"cbor:{b'name': b'command'}")))
227 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponsereadyobjects(
228 result = reactor.oncommandresponsereadyobjects(
229 outstream, 1, [b'response1'])
229 outstream, 1, [b'response1'])
230 self.assertaction(result, b'sendframes')
230 self.assertaction(result, b'sendframes')
231 list(result[1][b'framegen'])
231 list(result[1][b'framegen'])
232 results.append(self._sendsingleframe(
232 results.append(self._sendsingleframe(
233 reactor, ffs(b'1 1 stream-begin command-request new '
233 reactor, ffs(b'1 1 stream-begin command-request new '
234 b"cbor:{b'name': b'command'}")))
234 b"cbor:{b'name': b'command'}")))
235 result = reactor.oncommandresponsereadyobjects(
235 result = reactor.oncommandresponsereadyobjects(
236 outstream, 1, [b'response2'])
236 outstream, 1, [b'response2'])
237 self.assertaction(result, b'sendframes')
237 self.assertaction(result, b'sendframes')
238 list(result[1][b'framegen'])
238 list(result[1][b'framegen'])
239 results.append(self._sendsingleframe(
239 results.append(self._sendsingleframe(
240 reactor, ffs(b'1 1 stream-begin command-request new '
240 reactor, ffs(b'1 1 stream-begin command-request new '
241 b"cbor:{b'name': b'command'}")))
241 b"cbor:{b'name': b'command'}")))
242 result = reactor.oncommandresponsereadyobjects(
242 result = reactor.oncommandresponsereadyobjects(
243 outstream, 1, [b'response3'])
243 outstream, 1, [b'response3'])
244 self.assertaction(result, b'sendframes')
244 self.assertaction(result, b'sendframes')
245 list(result[1][b'framegen'])
245 list(result[1][b'framegen'])
246
246
247 for i in range(3):
247 for i in range(3):
248 self.assertaction(results[i], b'runcommand')
248 self.assertaction(results[i], b'runcommand')
249 self.assertEqual(results[i][1], {
249 self.assertEqual(results[i][1], {
250 b'requestid': 1,
250 b'requestid': 1,
251 b'command': b'command',
251 b'command': b'command',
252 b'args': {},
252 b'args': {},
253 b'redirect': None,
253 b'redirect': None,
254 b'data': None,
254 b'data': None,
255 })
255 })
256
256
257 def testconflictingrequestid(self):
257 def testconflictingrequestid(self):
258 """Request ID for new command matching in-flight command is illegal."""
258 """Request ID for new command matching in-flight command is illegal."""
259 results = list(sendframes(makereactor(), [
259 results = list(sendframes(makereactor(), [
260 ffs(b'1 1 stream-begin command-request new|more '
260 ffs(b'1 1 stream-begin command-request new|more '
261 b"cbor:{b'name': b'command'}"),
261 b"cbor:{b'name': b'command'}"),
262 ffs(b'1 1 0 command-request new '
262 ffs(b'1 1 0 command-request new '
263 b"cbor:{b'name': b'command1'}"),
263 b"cbor:{b'name': b'command1'}"),
264 ]))
264 ]))
265
265
266 self.assertaction(results[0], b'wantframe')
266 self.assertaction(results[0], b'wantframe')
267 self.assertaction(results[1], b'error')
267 self.assertaction(results[1], b'error')
268 self.assertEqual(results[1][1], {
268 self.assertEqual(results[1][1], {
269 b'message': b'request with ID 1 already received',
269 b'message': b'request with ID 1 already received',
270 })
270 })
271
271
272 def testinterleavedcommands(self):
272 def testinterleavedcommands(self):
273 cbor1 = cbor.dumps({
273 cbor1 = cbor.dumps({
274 b'name': b'command1',
274 b'name': b'command1',
275 b'args': {
275 b'args': {
276 b'foo': b'bar',
276 b'foo': b'bar',
277 b'key1': b'val',
277 b'key1': b'val',
278 }
278 }
279 }, canonical=True)
279 }, canonical=True)
280 cbor3 = cbor.dumps({
280 cbor3 = cbor.dumps({
281 b'name': b'command3',
281 b'name': b'command3',
282 b'args': {
282 b'args': {
283 b'biz': b'baz',
283 b'biz': b'baz',
284 b'key': b'val',
284 b'key': b'val',
285 },
285 },
286 }, canonical=True)
286 }, canonical=True)
287
287
288 results = list(sendframes(makereactor(), [
288 results = list(sendframes(makereactor(), [
289 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
289 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
290 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
290 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
291 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
291 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
292 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
292 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
293 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
293 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
294 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
294 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
295 ]))
295 ]))
296
296
297 self.assertEqual([t[0] for t in results], [
297 self.assertEqual([t[0] for t in results], [
298 b'wantframe',
298 b'wantframe',
299 b'wantframe',
299 b'wantframe',
300 b'wantframe',
300 b'wantframe',
301 b'wantframe',
301 b'wantframe',
302 b'runcommand',
302 b'runcommand',
303 b'runcommand',
303 b'runcommand',
304 ])
304 ])
305
305
306 self.assertEqual(results[4][1], {
306 self.assertEqual(results[4][1], {
307 b'requestid': 3,
307 b'requestid': 3,
308 b'command': b'command3',
308 b'command': b'command3',
309 b'args': {b'biz': b'baz', b'key': b'val'},
309 b'args': {b'biz': b'baz', b'key': b'val'},
310 b'redirect': None,
310 b'redirect': None,
311 b'data': None,
311 b'data': None,
312 })
312 })
313 self.assertEqual(results[5][1], {
313 self.assertEqual(results[5][1], {
314 b'requestid': 1,
314 b'requestid': 1,
315 b'command': b'command1',
315 b'command': b'command1',
316 b'args': {b'foo': b'bar', b'key1': b'val'},
316 b'args': {b'foo': b'bar', b'key1': b'val'},
317 b'redirect': None,
317 b'redirect': None,
318 b'data': None,
318 b'data': None,
319 })
319 })
320
320
321 def testmissingcommanddataframe(self):
321 def testmissingcommanddataframe(self):
322 # The reactor doesn't currently handle partially received commands.
322 # The reactor doesn't currently handle partially received commands.
323 # So this test is failing to do anything with request 1.
323 # So this test is failing to do anything with request 1.
324 frames = [
324 frames = [
325 ffs(b'1 1 stream-begin command-request new|have-data '
325 ffs(b'1 1 stream-begin command-request new|have-data '
326 b"cbor:{b'name': b'command1'}"),
326 b"cbor:{b'name': b'command1'}"),
327 ffs(b'3 1 0 command-request new '
327 ffs(b'3 1 0 command-request new '
328 b"cbor:{b'name': b'command2'}"),
328 b"cbor:{b'name': b'command2'}"),
329 ]
329 ]
330 results = list(sendframes(makereactor(), frames))
330 results = list(sendframes(makereactor(), frames))
331 self.assertEqual(len(results), 2)
331 self.assertEqual(len(results), 2)
332 self.assertaction(results[0], b'wantframe')
332 self.assertaction(results[0], b'wantframe')
333 self.assertaction(results[1], b'runcommand')
333 self.assertaction(results[1], b'runcommand')
334
334
335 def testmissingcommanddataframeflags(self):
335 def testmissingcommanddataframeflags(self):
336 frames = [
336 frames = [
337 ffs(b'1 1 stream-begin command-request new|have-data '
337 ffs(b'1 1 stream-begin command-request new|have-data '
338 b"cbor:{b'name': b'command1'}"),
338 b"cbor:{b'name': b'command1'}"),
339 ffs(b'1 1 0 command-data 0 data'),
339 ffs(b'1 1 0 command-data 0 data'),
340 ]
340 ]
341 results = list(sendframes(makereactor(), frames))
341 results = list(sendframes(makereactor(), frames))
342 self.assertEqual(len(results), 2)
342 self.assertEqual(len(results), 2)
343 self.assertaction(results[0], b'wantframe')
343 self.assertaction(results[0], b'wantframe')
344 self.assertaction(results[1], b'error')
344 self.assertaction(results[1], b'error')
345 self.assertEqual(results[1][1], {
345 self.assertEqual(results[1][1], {
346 b'message': b'command data frame without flags',
346 b'message': b'command data frame without flags',
347 })
347 })
348
348
349 def testframefornonreceivingrequest(self):
349 def testframefornonreceivingrequest(self):
350 """Receiving a frame for a command that is not receiving is illegal."""
350 """Receiving a frame for a command that is not receiving is illegal."""
351 results = list(sendframes(makereactor(), [
351 results = list(sendframes(makereactor(), [
352 ffs(b'1 1 stream-begin command-request new '
352 ffs(b'1 1 stream-begin command-request new '
353 b"cbor:{b'name': b'command1'}"),
353 b"cbor:{b'name': b'command1'}"),
354 ffs(b'3 1 0 command-request new|have-data '
354 ffs(b'3 1 0 command-request new|have-data '
355 b"cbor:{b'name': b'command3'}"),
355 b"cbor:{b'name': b'command3'}"),
356 ffs(b'5 1 0 command-data eos ignored'),
356 ffs(b'5 1 0 command-data eos ignored'),
357 ]))
357 ]))
358 self.assertaction(results[2], b'error')
358 self.assertaction(results[2], b'error')
359 self.assertEqual(results[2][1], {
359 self.assertEqual(results[2][1], {
360 b'message': b'received frame for request that is not receiving: 5',
360 b'message': b'received frame for request that is not receiving: 5',
361 })
361 })
362
362
363 def testsimpleresponse(self):
363 def testsimpleresponse(self):
364 """Bytes response to command sends result frames."""
364 """Bytes response to command sends result frames."""
365 reactor = makereactor()
365 reactor = makereactor()
366 instream = framing.stream(1)
366 instream = framing.stream(1)
367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
368
368
369 outstream = reactor.makeoutputstream()
369 outstream = reactor.makeoutputstream()
370 result = reactor.oncommandresponsereadyobjects(
370 result = reactor.oncommandresponsereadyobjects(
371 outstream, 1, [b'response'])
371 outstream, 1, [b'response'])
372 self.assertaction(result, b'sendframes')
372 self.assertaction(result, b'sendframes')
373 self.assertframesequal(result[1][b'framegen'], [
373 self.assertframesequal(result[1][b'framegen'], [
374 b'1 2 stream-begin command-response continuation %s' % OK,
374 b'1 2 stream-begin command-response continuation %s' % OK,
375 b'1 2 0 command-response continuation cbor:b"response"',
375 b'1 2 0 command-response continuation cbor:b"response"',
376 b'1 2 0 command-response eos ',
376 b'1 2 0 command-response eos ',
377 ])
377 ])
378
378
379 def testmultiframeresponse(self):
379 def testmultiframeresponse(self):
380 """Bytes response spanning multiple frames is handled."""
380 """Bytes response spanning multiple frames is handled."""
381 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
381 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
382 second = b'y' * 100
382 second = b'y' * 100
383
383
384 reactor = makereactor()
384 reactor = makereactor()
385 instream = framing.stream(1)
385 instream = framing.stream(1)
386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
387
387
388 outstream = reactor.makeoutputstream()
388 outstream = reactor.makeoutputstream()
389 result = reactor.oncommandresponsereadyobjects(
389 result = reactor.oncommandresponsereadyobjects(
390 outstream, 1, [first + second])
390 outstream, 1, [first + second])
391 self.assertaction(result, b'sendframes')
391 self.assertaction(result, b'sendframes')
392 self.assertframesequal(result[1][b'framegen'], [
392 self.assertframesequal(result[1][b'framegen'], [
393 b'1 2 stream-begin command-response continuation %s' % OK,
393 b'1 2 stream-begin command-response continuation %s' % OK,
394 b'1 2 0 command-response continuation Y\x80d',
394 b'1 2 0 command-response continuation Y\x80d',
395 b'1 2 0 command-response continuation %s' % first,
395 b'1 2 0 command-response continuation %s' % first,
396 b'1 2 0 command-response continuation %s' % second,
396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 b'1 2 0 command-response eos '
397 b'1 2 0 command-response eos '
399 ])
398 ])
400
399
401 def testservererror(self):
400 def testservererror(self):
402 reactor = makereactor()
401 reactor = makereactor()
403 instream = framing.stream(1)
402 instream = framing.stream(1)
404 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
403 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
405
404
406 outstream = reactor.makeoutputstream()
405 outstream = reactor.makeoutputstream()
407 result = reactor.onservererror(outstream, 1, b'some message')
406 result = reactor.onservererror(outstream, 1, b'some message')
408 self.assertaction(result, b'sendframes')
407 self.assertaction(result, b'sendframes')
409 self.assertframesequal(result[1][b'framegen'], [
408 self.assertframesequal(result[1][b'framegen'], [
410 b"1 2 stream-begin error-response 0 "
409 b"1 2 stream-begin error-response 0 "
411 b"cbor:{b'type': b'server', "
410 b"cbor:{b'type': b'server', "
412 b"b'message': [{b'msg': b'some message'}]}",
411 b"b'message': [{b'msg': b'some message'}]}",
413 ])
412 ])
414
413
415 def test1commanddeferresponse(self):
414 def test1commanddeferresponse(self):
416 """Responses when in deferred output mode are delayed until EOF."""
415 """Responses when in deferred output mode are delayed until EOF."""
417 reactor = makereactor(deferoutput=True)
416 reactor = makereactor(deferoutput=True)
418 instream = framing.stream(1)
417 instream = framing.stream(1)
419 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
418 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
420 {}))
419 {}))
421 self.assertEqual(len(results), 1)
420 self.assertEqual(len(results), 1)
422 self.assertaction(results[0], b'runcommand')
421 self.assertaction(results[0], b'runcommand')
423
422
424 outstream = reactor.makeoutputstream()
423 outstream = reactor.makeoutputstream()
425 result = reactor.oncommandresponsereadyobjects(
424 result = reactor.oncommandresponsereadyobjects(
426 outstream, 1, [b'response'])
425 outstream, 1, [b'response'])
427 self.assertaction(result, b'noop')
426 self.assertaction(result, b'noop')
428 result = reactor.oninputeof()
427 result = reactor.oninputeof()
429 self.assertaction(result, b'sendframes')
428 self.assertaction(result, b'sendframes')
430 self.assertframesequal(result[1][b'framegen'], [
429 self.assertframesequal(result[1][b'framegen'], [
431 b'1 2 stream-begin command-response continuation %s' % OK,
430 b'1 2 stream-begin command-response continuation %s' % OK,
432 b'1 2 0 command-response continuation cbor:b"response"',
431 b'1 2 0 command-response continuation cbor:b"response"',
433 b'1 2 0 command-response eos ',
432 b'1 2 0 command-response eos ',
434 ])
433 ])
435
434
436 def testmultiplecommanddeferresponse(self):
435 def testmultiplecommanddeferresponse(self):
437 reactor = makereactor(deferoutput=True)
436 reactor = makereactor(deferoutput=True)
438 instream = framing.stream(1)
437 instream = framing.stream(1)
439 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
438 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
440 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
439 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
441
440
442 outstream = reactor.makeoutputstream()
441 outstream = reactor.makeoutputstream()
443 result = reactor.oncommandresponsereadyobjects(
442 result = reactor.oncommandresponsereadyobjects(
444 outstream, 1, [b'response1'])
443 outstream, 1, [b'response1'])
445 self.assertaction(result, b'noop')
444 self.assertaction(result, b'noop')
446 result = reactor.oncommandresponsereadyobjects(
445 result = reactor.oncommandresponsereadyobjects(
447 outstream, 3, [b'response2'])
446 outstream, 3, [b'response2'])
448 self.assertaction(result, b'noop')
447 self.assertaction(result, b'noop')
449 result = reactor.oninputeof()
448 result = reactor.oninputeof()
450 self.assertaction(result, b'sendframes')
449 self.assertaction(result, b'sendframes')
451 self.assertframesequal(result[1][b'framegen'], [
450 self.assertframesequal(result[1][b'framegen'], [
452 b'1 2 stream-begin command-response continuation %s' % OK,
451 b'1 2 stream-begin command-response continuation %s' % OK,
453 b'1 2 0 command-response continuation cbor:b"response1"',
452 b'1 2 0 command-response continuation cbor:b"response1"',
454 b'1 2 0 command-response eos ',
453 b'1 2 0 command-response eos ',
455 b'3 2 0 command-response continuation %s' % OK,
454 b'3 2 0 command-response continuation %s' % OK,
456 b'3 2 0 command-response continuation cbor:b"response2"',
455 b'3 2 0 command-response continuation cbor:b"response2"',
457 b'3 2 0 command-response eos ',
456 b'3 2 0 command-response eos ',
458 ])
457 ])
459
458
460 def testrequestidtracking(self):
459 def testrequestidtracking(self):
461 reactor = makereactor(deferoutput=True)
460 reactor = makereactor(deferoutput=True)
462 instream = framing.stream(1)
461 instream = framing.stream(1)
463 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
462 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
464 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
463 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
465 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
464 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
466
465
467 # Register results for commands out of order.
466 # Register results for commands out of order.
468 outstream = reactor.makeoutputstream()
467 outstream = reactor.makeoutputstream()
469 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
468 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
470 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
469 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
471 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
470 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
472
471
473 result = reactor.oninputeof()
472 result = reactor.oninputeof()
474 self.assertaction(result, b'sendframes')
473 self.assertaction(result, b'sendframes')
475 self.assertframesequal(result[1][b'framegen'], [
474 self.assertframesequal(result[1][b'framegen'], [
476 b'3 2 stream-begin command-response continuation %s' % OK,
475 b'3 2 stream-begin command-response continuation %s' % OK,
477 b'3 2 0 command-response continuation cbor:b"response3"',
476 b'3 2 0 command-response continuation cbor:b"response3"',
478 b'3 2 0 command-response eos ',
477 b'3 2 0 command-response eos ',
479 b'1 2 0 command-response continuation %s' % OK,
478 b'1 2 0 command-response continuation %s' % OK,
480 b'1 2 0 command-response continuation cbor:b"response1"',
479 b'1 2 0 command-response continuation cbor:b"response1"',
481 b'1 2 0 command-response eos ',
480 b'1 2 0 command-response eos ',
482 b'5 2 0 command-response continuation %s' % OK,
481 b'5 2 0 command-response continuation %s' % OK,
483 b'5 2 0 command-response continuation cbor:b"response5"',
482 b'5 2 0 command-response continuation cbor:b"response5"',
484 b'5 2 0 command-response eos ',
483 b'5 2 0 command-response eos ',
485 ])
484 ])
486
485
487 def testduplicaterequestonactivecommand(self):
486 def testduplicaterequestonactivecommand(self):
488 """Receiving a request ID that matches a request that isn't finished."""
487 """Receiving a request ID that matches a request that isn't finished."""
489 reactor = makereactor()
488 reactor = makereactor()
490 stream = framing.stream(1)
489 stream = framing.stream(1)
491 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
490 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
492 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
491 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
493
492
494 self.assertaction(results[0], b'error')
493 self.assertaction(results[0], b'error')
495 self.assertEqual(results[0][1], {
494 self.assertEqual(results[0][1], {
496 b'message': b'request with ID 1 is already active',
495 b'message': b'request with ID 1 is already active',
497 })
496 })
498
497
499 def testduplicaterequestonactivecommandnosend(self):
498 def testduplicaterequestonactivecommandnosend(self):
500 """Same as above but we've registered a response but haven't sent it."""
499 """Same as above but we've registered a response but haven't sent it."""
501 reactor = makereactor()
500 reactor = makereactor()
502 instream = framing.stream(1)
501 instream = framing.stream(1)
503 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
502 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
504 outstream = reactor.makeoutputstream()
503 outstream = reactor.makeoutputstream()
505 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
504 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
506
505
507 # We've registered the response but haven't sent it. From the
506 # We've registered the response but haven't sent it. From the
508 # perspective of the reactor, the command is still active.
507 # perspective of the reactor, the command is still active.
509
508
510 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
509 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
511 self.assertaction(results[0], b'error')
510 self.assertaction(results[0], b'error')
512 self.assertEqual(results[0][1], {
511 self.assertEqual(results[0][1], {
513 b'message': b'request with ID 1 is already active',
512 b'message': b'request with ID 1 is already active',
514 })
513 })
515
514
516 def testduplicaterequestaftersend(self):
515 def testduplicaterequestaftersend(self):
517 """We can use a duplicate request ID after we've sent the response."""
516 """We can use a duplicate request ID after we've sent the response."""
518 reactor = makereactor()
517 reactor = makereactor()
519 instream = framing.stream(1)
518 instream = framing.stream(1)
520 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
519 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
521 outstream = reactor.makeoutputstream()
520 outstream = reactor.makeoutputstream()
522 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
521 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
523 list(res[1][b'framegen'])
522 list(res[1][b'framegen'])
524
523
525 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
524 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
526 self.assertaction(results[0], b'runcommand')
525 self.assertaction(results[0], b'runcommand')
527
526
528 def testprotocolsettingsnoflags(self):
527 def testprotocolsettingsnoflags(self):
529 result = self._sendsingleframe(
528 result = self._sendsingleframe(
530 makereactor(),
529 makereactor(),
531 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
530 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
532 self.assertaction(result, b'error')
531 self.assertaction(result, b'error')
533 self.assertEqual(result[1], {
532 self.assertEqual(result[1], {
534 b'message': b'sender protocol settings frame must have '
533 b'message': b'sender protocol settings frame must have '
535 b'continuation or end of stream flag set',
534 b'continuation or end of stream flag set',
536 })
535 })
537
536
538 def testprotocolsettingsconflictflags(self):
537 def testprotocolsettingsconflictflags(self):
539 result = self._sendsingleframe(
538 result = self._sendsingleframe(
540 makereactor(),
539 makereactor(),
541 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
540 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
542 self.assertaction(result, b'error')
541 self.assertaction(result, b'error')
543 self.assertEqual(result[1], {
542 self.assertEqual(result[1], {
544 b'message': b'sender protocol settings frame cannot have both '
543 b'message': b'sender protocol settings frame cannot have both '
545 b'continuation and end of stream flags set',
544 b'continuation and end of stream flags set',
546 })
545 })
547
546
548 def testprotocolsettingsemptypayload(self):
547 def testprotocolsettingsemptypayload(self):
549 result = self._sendsingleframe(
548 result = self._sendsingleframe(
550 makereactor(),
549 makereactor(),
551 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
550 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
552 self.assertaction(result, b'error')
551 self.assertaction(result, b'error')
553 self.assertEqual(result[1], {
552 self.assertEqual(result[1], {
554 b'message': b'sender protocol settings frame did not contain CBOR '
553 b'message': b'sender protocol settings frame did not contain CBOR '
555 b'data',
554 b'data',
556 })
555 })
557
556
558 def testprotocolsettingsmultipleobjects(self):
557 def testprotocolsettingsmultipleobjects(self):
559 result = self._sendsingleframe(
558 result = self._sendsingleframe(
560 makereactor(),
559 makereactor(),
561 ffs(b'0 1 stream-begin sender-protocol-settings eos '
560 ffs(b'0 1 stream-begin sender-protocol-settings eos '
562 b'\x46foobar\x43foo'))
561 b'\x46foobar\x43foo'))
563 self.assertaction(result, b'error')
562 self.assertaction(result, b'error')
564 self.assertEqual(result[1], {
563 self.assertEqual(result[1], {
565 b'message': b'sender protocol settings frame contained multiple '
564 b'message': b'sender protocol settings frame contained multiple '
566 b'CBOR values',
565 b'CBOR values',
567 })
566 })
568
567
569 def testprotocolsettingscontentencodings(self):
568 def testprotocolsettingscontentencodings(self):
570 reactor = makereactor()
569 reactor = makereactor()
571
570
572 result = self._sendsingleframe(
571 result = self._sendsingleframe(
573 reactor,
572 reactor,
574 ffs(b'0 1 stream-begin sender-protocol-settings eos '
573 ffs(b'0 1 stream-begin sender-protocol-settings eos '
575 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
574 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
576 self.assertaction(result, b'wantframe')
575 self.assertaction(result, b'wantframe')
577
576
578 self.assertEqual(reactor._state, b'idle')
577 self.assertEqual(reactor._state, b'idle')
579 self.assertEqual(reactor._sendersettings[b'contentencodings'],
578 self.assertEqual(reactor._sendersettings[b'contentencodings'],
580 [b'a', b'b'])
579 [b'a', b'b'])
581
580
582 def testprotocolsettingsmultipleframes(self):
581 def testprotocolsettingsmultipleframes(self):
583 reactor = makereactor()
582 reactor = makereactor()
584
583
585 data = b''.join(cborutil.streamencode({
584 data = b''.join(cborutil.streamencode({
586 b'contentencodings': [b'value1', b'value2'],
585 b'contentencodings': [b'value1', b'value2'],
587 }))
586 }))
588
587
589 results = list(sendframes(reactor, [
588 results = list(sendframes(reactor, [
590 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
589 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
591 data[0:5]),
590 data[0:5]),
592 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
591 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
593 ]))
592 ]))
594
593
595 self.assertEqual(len(results), 2)
594 self.assertEqual(len(results), 2)
596
595
597 self.assertaction(results[0], b'wantframe')
596 self.assertaction(results[0], b'wantframe')
598 self.assertaction(results[1], b'wantframe')
597 self.assertaction(results[1], b'wantframe')
599
598
600 self.assertEqual(reactor._state, b'idle')
599 self.assertEqual(reactor._state, b'idle')
601 self.assertEqual(reactor._sendersettings[b'contentencodings'],
600 self.assertEqual(reactor._sendersettings[b'contentencodings'],
602 [b'value1', b'value2'])
601 [b'value1', b'value2'])
603
602
604 def testprotocolsettingsbadcbor(self):
603 def testprotocolsettingsbadcbor(self):
605 result = self._sendsingleframe(
604 result = self._sendsingleframe(
606 makereactor(),
605 makereactor(),
607 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
606 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
608 self.assertaction(result, b'error')
607 self.assertaction(result, b'error')
609
608
610 def testprotocolsettingsnoninitial(self):
609 def testprotocolsettingsnoninitial(self):
611 # Cannot have protocol settings frames as non-initial frames.
610 # Cannot have protocol settings frames as non-initial frames.
612 reactor = makereactor()
611 reactor = makereactor()
613
612
614 stream = framing.stream(1)
613 stream = framing.stream(1)
615 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
614 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
616 self.assertEqual(len(results), 1)
615 self.assertEqual(len(results), 1)
617 self.assertaction(results[0], b'runcommand')
616 self.assertaction(results[0], b'runcommand')
618
617
619 result = self._sendsingleframe(
618 result = self._sendsingleframe(
620 reactor,
619 reactor,
621 ffs(b'0 1 0 sender-protocol-settings eos '))
620 ffs(b'0 1 0 sender-protocol-settings eos '))
622 self.assertaction(result, b'error')
621 self.assertaction(result, b'error')
623 self.assertEqual(result[1], {
622 self.assertEqual(result[1], {
624 b'message': b'expected command request frame; got 8',
623 b'message': b'expected command request frame; got 8',
625 })
624 })
626
625
627 if __name__ == '__main__':
626 if __name__ == '__main__':
628 import silenttestrunner
627 import silenttestrunner
629 silenttestrunner.main(__name__)
628 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now