##// END OF EJS Templates
wireprotoframing: record when new stream is encountered...
Gregory Szorc -
r37674:e6870bca default
parent child Browse files
Show More
@@ -1,1070 +1,1072 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 cbor,
20 cbor,
21 )
21 )
22 from . import (
22 from . import (
23 encoding,
23 encoding,
24 error,
24 error,
25 util,
25 util,
26 )
26 )
27 from .utils import (
27 from .utils import (
28 stringutil,
28 stringutil,
29 )
29 )
30
30
31 FRAME_HEADER_SIZE = 8
31 FRAME_HEADER_SIZE = 8
32 DEFAULT_MAX_FRAME_SIZE = 32768
32 DEFAULT_MAX_FRAME_SIZE = 32768
33
33
34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_END_STREAM = 0x02
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37
37
38 STREAM_FLAGS = {
38 STREAM_FLAGS = {
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 }
42 }
43
43
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 FRAME_TYPE_COMMAND_DATA = 0x03
45 FRAME_TYPE_COMMAND_DATA = 0x03
46 FRAME_TYPE_BYTES_RESPONSE = 0x04
46 FRAME_TYPE_BYTES_RESPONSE = 0x04
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_PROGRESS = 0x07
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51
51
52 FRAME_TYPES = {
52 FRAME_TYPES = {
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
55 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 b'progress': FRAME_TYPE_PROGRESS,
58 b'progress': FRAME_TYPE_PROGRESS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 }
60 }
61
61
62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66
66
67 FLAGS_COMMAND_REQUEST = {
67 FLAGS_COMMAND_REQUEST = {
68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 }
72 }
73
73
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 FLAG_COMMAND_DATA_EOS = 0x02
75 FLAG_COMMAND_DATA_EOS = 0x02
76
76
77 FLAGS_COMMAND_DATA = {
77 FLAGS_COMMAND_DATA = {
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 b'eos': FLAG_COMMAND_DATA_EOS,
79 b'eos': FLAG_COMMAND_DATA_EOS,
80 }
80 }
81
81
82 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
82 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
83 FLAG_BYTES_RESPONSE_EOS = 0x02
83 FLAG_BYTES_RESPONSE_EOS = 0x02
84 FLAG_BYTES_RESPONSE_CBOR = 0x04
84 FLAG_BYTES_RESPONSE_CBOR = 0x04
85
85
86 FLAGS_BYTES_RESPONSE = {
86 FLAGS_BYTES_RESPONSE = {
87 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
87 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
88 b'eos': FLAG_BYTES_RESPONSE_EOS,
88 b'eos': FLAG_BYTES_RESPONSE_EOS,
89 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
89 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
90 }
90 }
91
91
92 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
92 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
93 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
93 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
94
94
95 FLAGS_ERROR_RESPONSE = {
95 FLAGS_ERROR_RESPONSE = {
96 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
96 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
97 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
97 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
98 }
98 }
99
99
100 # Maps frame types to their available flags.
100 # Maps frame types to their available flags.
101 FRAME_TYPE_FLAGS = {
101 FRAME_TYPE_FLAGS = {
102 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
102 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
103 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
103 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
104 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
104 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
105 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
105 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
106 FRAME_TYPE_TEXT_OUTPUT: {},
106 FRAME_TYPE_TEXT_OUTPUT: {},
107 FRAME_TYPE_PROGRESS: {},
107 FRAME_TYPE_PROGRESS: {},
108 FRAME_TYPE_STREAM_SETTINGS: {},
108 FRAME_TYPE_STREAM_SETTINGS: {},
109 }
109 }
110
110
111 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
111 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
112
112
113 def humanflags(mapping, value):
113 def humanflags(mapping, value):
114 """Convert a numeric flags value to a human value, using a mapping table."""
114 """Convert a numeric flags value to a human value, using a mapping table."""
115 namemap = {v: k for k, v in mapping.iteritems()}
115 namemap = {v: k for k, v in mapping.iteritems()}
116 flags = []
116 flags = []
117 val = 1
117 val = 1
118 while value >= val:
118 while value >= val:
119 if value & val:
119 if value & val:
120 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
120 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
121 val <<= 1
121 val <<= 1
122
122
123 return b'|'.join(flags)
123 return b'|'.join(flags)
124
124
125 @attr.s(slots=True)
125 @attr.s(slots=True)
126 class frameheader(object):
126 class frameheader(object):
127 """Represents the data in a frame header."""
127 """Represents the data in a frame header."""
128
128
129 length = attr.ib()
129 length = attr.ib()
130 requestid = attr.ib()
130 requestid = attr.ib()
131 streamid = attr.ib()
131 streamid = attr.ib()
132 streamflags = attr.ib()
132 streamflags = attr.ib()
133 typeid = attr.ib()
133 typeid = attr.ib()
134 flags = attr.ib()
134 flags = attr.ib()
135
135
136 @attr.s(slots=True, repr=False)
136 @attr.s(slots=True, repr=False)
137 class frame(object):
137 class frame(object):
138 """Represents a parsed frame."""
138 """Represents a parsed frame."""
139
139
140 requestid = attr.ib()
140 requestid = attr.ib()
141 streamid = attr.ib()
141 streamid = attr.ib()
142 streamflags = attr.ib()
142 streamflags = attr.ib()
143 typeid = attr.ib()
143 typeid = attr.ib()
144 flags = attr.ib()
144 flags = attr.ib()
145 payload = attr.ib()
145 payload = attr.ib()
146
146
147 @encoding.strmethod
147 @encoding.strmethod
148 def __repr__(self):
148 def __repr__(self):
149 typename = '<unknown 0x%02x>' % self.typeid
149 typename = '<unknown 0x%02x>' % self.typeid
150 for name, value in FRAME_TYPES.iteritems():
150 for name, value in FRAME_TYPES.iteritems():
151 if value == self.typeid:
151 if value == self.typeid:
152 typename = name
152 typename = name
153 break
153 break
154
154
155 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
155 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
156 'type=%s; flags=%s)' % (
156 'type=%s; flags=%s)' % (
157 len(self.payload), self.requestid, self.streamid,
157 len(self.payload), self.requestid, self.streamid,
158 humanflags(STREAM_FLAGS, self.streamflags), typename,
158 humanflags(STREAM_FLAGS, self.streamflags), typename,
159 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
159 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
160
160
161 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
161 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
162 """Assemble a frame into a byte array."""
162 """Assemble a frame into a byte array."""
163 # TODO assert size of payload.
163 # TODO assert size of payload.
164 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
164 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
165
165
166 # 24 bits length
166 # 24 bits length
167 # 16 bits request id
167 # 16 bits request id
168 # 8 bits stream id
168 # 8 bits stream id
169 # 8 bits stream flags
169 # 8 bits stream flags
170 # 4 bits type
170 # 4 bits type
171 # 4 bits flags
171 # 4 bits flags
172
172
173 l = struct.pack(r'<I', len(payload))
173 l = struct.pack(r'<I', len(payload))
174 frame[0:3] = l[0:3]
174 frame[0:3] = l[0:3]
175 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
175 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
176 frame[7] = (typeid << 4) | flags
176 frame[7] = (typeid << 4) | flags
177 frame[8:] = payload
177 frame[8:] = payload
178
178
179 return frame
179 return frame
180
180
181 def makeframefromhumanstring(s):
181 def makeframefromhumanstring(s):
182 """Create a frame from a human readable string
182 """Create a frame from a human readable string
183
183
184 Strings have the form:
184 Strings have the form:
185
185
186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
187
187
188 This can be used by user-facing applications and tests for creating
188 This can be used by user-facing applications and tests for creating
189 frames easily without having to type out a bunch of constants.
189 frames easily without having to type out a bunch of constants.
190
190
191 Request ID and stream IDs are integers.
191 Request ID and stream IDs are integers.
192
192
193 Stream flags, frame type, and flags can be specified by integer or
193 Stream flags, frame type, and flags can be specified by integer or
194 named constant.
194 named constant.
195
195
196 Flags can be delimited by `|` to bitwise OR them together.
196 Flags can be delimited by `|` to bitwise OR them together.
197
197
198 If the payload begins with ``cbor:``, the following string will be
198 If the payload begins with ``cbor:``, the following string will be
199 evaluated as Python literal and the resulting object will be fed into
199 evaluated as Python literal and the resulting object will be fed into
200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
201 byte string literal.
201 byte string literal.
202 """
202 """
203 fields = s.split(b' ', 5)
203 fields = s.split(b' ', 5)
204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
205
205
206 requestid = int(requestid)
206 requestid = int(requestid)
207 streamid = int(streamid)
207 streamid = int(streamid)
208
208
209 finalstreamflags = 0
209 finalstreamflags = 0
210 for flag in streamflags.split(b'|'):
210 for flag in streamflags.split(b'|'):
211 if flag in STREAM_FLAGS:
211 if flag in STREAM_FLAGS:
212 finalstreamflags |= STREAM_FLAGS[flag]
212 finalstreamflags |= STREAM_FLAGS[flag]
213 else:
213 else:
214 finalstreamflags |= int(flag)
214 finalstreamflags |= int(flag)
215
215
216 if frametype in FRAME_TYPES:
216 if frametype in FRAME_TYPES:
217 frametype = FRAME_TYPES[frametype]
217 frametype = FRAME_TYPES[frametype]
218 else:
218 else:
219 frametype = int(frametype)
219 frametype = int(frametype)
220
220
221 finalflags = 0
221 finalflags = 0
222 validflags = FRAME_TYPE_FLAGS[frametype]
222 validflags = FRAME_TYPE_FLAGS[frametype]
223 for flag in frameflags.split(b'|'):
223 for flag in frameflags.split(b'|'):
224 if flag in validflags:
224 if flag in validflags:
225 finalflags |= validflags[flag]
225 finalflags |= validflags[flag]
226 else:
226 else:
227 finalflags |= int(flag)
227 finalflags |= int(flag)
228
228
229 if payload.startswith(b'cbor:'):
229 if payload.startswith(b'cbor:'):
230 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
230 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
231 canonical=True)
231 canonical=True)
232
232
233 else:
233 else:
234 payload = stringutil.unescapestr(payload)
234 payload = stringutil.unescapestr(payload)
235
235
236 return makeframe(requestid=requestid, streamid=streamid,
236 return makeframe(requestid=requestid, streamid=streamid,
237 streamflags=finalstreamflags, typeid=frametype,
237 streamflags=finalstreamflags, typeid=frametype,
238 flags=finalflags, payload=payload)
238 flags=finalflags, payload=payload)
239
239
240 def parseheader(data):
240 def parseheader(data):
241 """Parse a unified framing protocol frame header from a buffer.
241 """Parse a unified framing protocol frame header from a buffer.
242
242
243 The header is expected to be in the buffer at offset 0 and the
243 The header is expected to be in the buffer at offset 0 and the
244 buffer is expected to be large enough to hold a full header.
244 buffer is expected to be large enough to hold a full header.
245 """
245 """
246 # 24 bits payload length (little endian)
246 # 24 bits payload length (little endian)
247 # 16 bits request ID
247 # 16 bits request ID
248 # 8 bits stream ID
248 # 8 bits stream ID
249 # 8 bits stream flags
249 # 8 bits stream flags
250 # 4 bits frame type
250 # 4 bits frame type
251 # 4 bits frame flags
251 # 4 bits frame flags
252 # ... payload
252 # ... payload
253 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 framelength = data[0] + 256 * data[1] + 16384 * data[2]
254 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
255 typeflags = data[7]
255 typeflags = data[7]
256
256
257 frametype = (typeflags & 0xf0) >> 4
257 frametype = (typeflags & 0xf0) >> 4
258 frameflags = typeflags & 0x0f
258 frameflags = typeflags & 0x0f
259
259
260 return frameheader(framelength, requestid, streamid, streamflags,
260 return frameheader(framelength, requestid, streamid, streamflags,
261 frametype, frameflags)
261 frametype, frameflags)
262
262
263 def readframe(fh):
263 def readframe(fh):
264 """Read a unified framing protocol frame from a file object.
264 """Read a unified framing protocol frame from a file object.
265
265
266 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
267 None if no frame is available. May raise if a malformed frame is
267 None if no frame is available. May raise if a malformed frame is
268 seen.
268 seen.
269 """
269 """
270 header = bytearray(FRAME_HEADER_SIZE)
270 header = bytearray(FRAME_HEADER_SIZE)
271
271
272 readcount = fh.readinto(header)
272 readcount = fh.readinto(header)
273
273
274 if readcount == 0:
274 if readcount == 0:
275 return None
275 return None
276
276
277 if readcount != FRAME_HEADER_SIZE:
277 if readcount != FRAME_HEADER_SIZE:
278 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
279 (readcount, header))
279 (readcount, header))
280
280
281 h = parseheader(header)
281 h = parseheader(header)
282
282
283 payload = fh.read(h.length)
283 payload = fh.read(h.length)
284 if len(payload) != h.length:
284 if len(payload) != h.length:
285 raise error.Abort(_('frame length error: expected %d; got %d') %
285 raise error.Abort(_('frame length error: expected %d; got %d') %
286 (h.length, len(payload)))
286 (h.length, len(payload)))
287
287
288 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
289 payload)
289 payload)
290
290
291 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 def createcommandframes(stream, requestid, cmd, args, datafh=None,
292 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 maxframesize=DEFAULT_MAX_FRAME_SIZE):
293 """Create frames necessary to transmit a request to run a command.
293 """Create frames necessary to transmit a request to run a command.
294
294
295 This is a generator of bytearrays. Each item represents a frame
295 This is a generator of bytearrays. Each item represents a frame
296 ready to be sent over the wire to a peer.
296 ready to be sent over the wire to a peer.
297 """
297 """
298 data = {b'name': cmd}
298 data = {b'name': cmd}
299 if args:
299 if args:
300 data[b'args'] = args
300 data[b'args'] = args
301
301
302 data = cbor.dumps(data, canonical=True)
302 data = cbor.dumps(data, canonical=True)
303
303
304 offset = 0
304 offset = 0
305
305
306 while True:
306 while True:
307 flags = 0
307 flags = 0
308
308
309 # Must set new or continuation flag.
309 # Must set new or continuation flag.
310 if not offset:
310 if not offset:
311 flags |= FLAG_COMMAND_REQUEST_NEW
311 flags |= FLAG_COMMAND_REQUEST_NEW
312 else:
312 else:
313 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
314
314
315 # Data frames is set on all frames.
315 # Data frames is set on all frames.
316 if datafh:
316 if datafh:
317 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
318
318
319 payload = data[offset:offset + maxframesize]
319 payload = data[offset:offset + maxframesize]
320 offset += len(payload)
320 offset += len(payload)
321
321
322 if len(payload) == maxframesize and offset < len(data):
322 if len(payload) == maxframesize and offset < len(data):
323 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
324
324
325 yield stream.makeframe(requestid=requestid,
325 yield stream.makeframe(requestid=requestid,
326 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 typeid=FRAME_TYPE_COMMAND_REQUEST,
327 flags=flags,
327 flags=flags,
328 payload=payload)
328 payload=payload)
329
329
330 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
331 break
331 break
332
332
333 if datafh:
333 if datafh:
334 while True:
334 while True:
335 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
336
336
337 done = False
337 done = False
338 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 if len(data) == DEFAULT_MAX_FRAME_SIZE:
339 flags = FLAG_COMMAND_DATA_CONTINUATION
339 flags = FLAG_COMMAND_DATA_CONTINUATION
340 else:
340 else:
341 flags = FLAG_COMMAND_DATA_EOS
341 flags = FLAG_COMMAND_DATA_EOS
342 assert datafh.read(1) == b''
342 assert datafh.read(1) == b''
343 done = True
343 done = True
344
344
345 yield stream.makeframe(requestid=requestid,
345 yield stream.makeframe(requestid=requestid,
346 typeid=FRAME_TYPE_COMMAND_DATA,
346 typeid=FRAME_TYPE_COMMAND_DATA,
347 flags=flags,
347 flags=flags,
348 payload=data)
348 payload=data)
349
349
350 if done:
350 if done:
351 break
351 break
352
352
353 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
353 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
354 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 maxframesize=DEFAULT_MAX_FRAME_SIZE):
355 """Create a raw frame to send a bytes response from static bytes input.
355 """Create a raw frame to send a bytes response from static bytes input.
356
356
357 Returns a generator of bytearrays.
357 Returns a generator of bytearrays.
358 """
358 """
359
359
360 # Simple case of a single frame.
360 # Simple case of a single frame.
361 if len(data) <= maxframesize:
361 if len(data) <= maxframesize:
362 flags = FLAG_BYTES_RESPONSE_EOS
362 flags = FLAG_BYTES_RESPONSE_EOS
363 if iscbor:
363 if iscbor:
364 flags |= FLAG_BYTES_RESPONSE_CBOR
364 flags |= FLAG_BYTES_RESPONSE_CBOR
365
365
366 yield stream.makeframe(requestid=requestid,
366 yield stream.makeframe(requestid=requestid,
367 typeid=FRAME_TYPE_BYTES_RESPONSE,
367 typeid=FRAME_TYPE_BYTES_RESPONSE,
368 flags=flags,
368 flags=flags,
369 payload=data)
369 payload=data)
370 return
370 return
371
371
372 offset = 0
372 offset = 0
373 while True:
373 while True:
374 chunk = data[offset:offset + maxframesize]
374 chunk = data[offset:offset + maxframesize]
375 offset += len(chunk)
375 offset += len(chunk)
376 done = offset == len(data)
376 done = offset == len(data)
377
377
378 if done:
378 if done:
379 flags = FLAG_BYTES_RESPONSE_EOS
379 flags = FLAG_BYTES_RESPONSE_EOS
380 else:
380 else:
381 flags = FLAG_BYTES_RESPONSE_CONTINUATION
381 flags = FLAG_BYTES_RESPONSE_CONTINUATION
382
382
383 if iscbor:
383 if iscbor:
384 flags |= FLAG_BYTES_RESPONSE_CBOR
384 flags |= FLAG_BYTES_RESPONSE_CBOR
385
385
386 yield stream.makeframe(requestid=requestid,
386 yield stream.makeframe(requestid=requestid,
387 typeid=FRAME_TYPE_BYTES_RESPONSE,
387 typeid=FRAME_TYPE_BYTES_RESPONSE,
388 flags=flags,
388 flags=flags,
389 payload=chunk)
389 payload=chunk)
390
390
391 if done:
391 if done:
392 break
392 break
393
393
394 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
394 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
395 # TODO properly handle frame size limits.
395 # TODO properly handle frame size limits.
396 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
396 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
397
397
398 flags = 0
398 flags = 0
399 if protocol:
399 if protocol:
400 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
400 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
401 if application:
401 if application:
402 flags |= FLAG_ERROR_RESPONSE_APPLICATION
402 flags |= FLAG_ERROR_RESPONSE_APPLICATION
403
403
404 yield stream.makeframe(requestid=requestid,
404 yield stream.makeframe(requestid=requestid,
405 typeid=FRAME_TYPE_ERROR_RESPONSE,
405 typeid=FRAME_TYPE_ERROR_RESPONSE,
406 flags=flags,
406 flags=flags,
407 payload=msg)
407 payload=msg)
408
408
409 def createtextoutputframe(stream, requestid, atoms,
409 def createtextoutputframe(stream, requestid, atoms,
410 maxframesize=DEFAULT_MAX_FRAME_SIZE):
410 maxframesize=DEFAULT_MAX_FRAME_SIZE):
411 """Create a text output frame to render text to people.
411 """Create a text output frame to render text to people.
412
412
413 ``atoms`` is a 3-tuple of (formatting string, args, labels).
413 ``atoms`` is a 3-tuple of (formatting string, args, labels).
414
414
415 The formatting string contains ``%s`` tokens to be replaced by the
415 The formatting string contains ``%s`` tokens to be replaced by the
416 corresponding indexed entry in ``args``. ``labels`` is an iterable of
416 corresponding indexed entry in ``args``. ``labels`` is an iterable of
417 formatters to be applied at rendering time. In terms of the ``ui``
417 formatters to be applied at rendering time. In terms of the ``ui``
418 class, each atom corresponds to a ``ui.write()``.
418 class, each atom corresponds to a ``ui.write()``.
419 """
419 """
420 atomdicts = []
420 atomdicts = []
421
421
422 for (formatting, args, labels) in atoms:
422 for (formatting, args, labels) in atoms:
423 # TODO look for localstr, other types here?
423 # TODO look for localstr, other types here?
424
424
425 if not isinstance(formatting, bytes):
425 if not isinstance(formatting, bytes):
426 raise ValueError('must use bytes formatting strings')
426 raise ValueError('must use bytes formatting strings')
427 for arg in args:
427 for arg in args:
428 if not isinstance(arg, bytes):
428 if not isinstance(arg, bytes):
429 raise ValueError('must use bytes for arguments')
429 raise ValueError('must use bytes for arguments')
430 for label in labels:
430 for label in labels:
431 if not isinstance(label, bytes):
431 if not isinstance(label, bytes):
432 raise ValueError('must use bytes for labels')
432 raise ValueError('must use bytes for labels')
433
433
434 # Formatting string must be ASCII.
434 # Formatting string must be ASCII.
435 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
435 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
436
436
437 # Arguments must be UTF-8.
437 # Arguments must be UTF-8.
438 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
438 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
439
439
440 # Labels must be ASCII.
440 # Labels must be ASCII.
441 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
441 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
442 for l in labels]
442 for l in labels]
443
443
444 atom = {b'msg': formatting}
444 atom = {b'msg': formatting}
445 if args:
445 if args:
446 atom[b'args'] = args
446 atom[b'args'] = args
447 if labels:
447 if labels:
448 atom[b'labels'] = labels
448 atom[b'labels'] = labels
449
449
450 atomdicts.append(atom)
450 atomdicts.append(atom)
451
451
452 payload = cbor.dumps(atomdicts, canonical=True)
452 payload = cbor.dumps(atomdicts, canonical=True)
453
453
454 if len(payload) > maxframesize:
454 if len(payload) > maxframesize:
455 raise ValueError('cannot encode data in a single frame')
455 raise ValueError('cannot encode data in a single frame')
456
456
457 yield stream.makeframe(requestid=requestid,
457 yield stream.makeframe(requestid=requestid,
458 typeid=FRAME_TYPE_TEXT_OUTPUT,
458 typeid=FRAME_TYPE_TEXT_OUTPUT,
459 flags=0,
459 flags=0,
460 payload=payload)
460 payload=payload)
461
461
462 class stream(object):
462 class stream(object):
463 """Represents a logical unidirectional series of frames."""
463 """Represents a logical unidirectional series of frames."""
464
464
465 def __init__(self, streamid, active=False):
465 def __init__(self, streamid, active=False):
466 self.streamid = streamid
466 self.streamid = streamid
467 self._active = active
467 self._active = active
468
468
469 def makeframe(self, requestid, typeid, flags, payload):
469 def makeframe(self, requestid, typeid, flags, payload):
470 """Create a frame to be sent out over this stream.
470 """Create a frame to be sent out over this stream.
471
471
472 Only returns the frame instance. Does not actually send it.
472 Only returns the frame instance. Does not actually send it.
473 """
473 """
474 streamflags = 0
474 streamflags = 0
475 if not self._active:
475 if not self._active:
476 streamflags |= STREAM_FLAG_BEGIN_STREAM
476 streamflags |= STREAM_FLAG_BEGIN_STREAM
477 self._active = True
477 self._active = True
478
478
479 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
479 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
480 payload)
480 payload)
481
481
482 def ensureserverstream(stream):
482 def ensureserverstream(stream):
483 if stream.streamid % 2:
483 if stream.streamid % 2:
484 raise error.ProgrammingError('server should only write to even '
484 raise error.ProgrammingError('server should only write to even '
485 'numbered streams; %d is not even' %
485 'numbered streams; %d is not even' %
486 stream.streamid)
486 stream.streamid)
487
487
488 class serverreactor(object):
488 class serverreactor(object):
489 """Holds state of a server handling frame-based protocol requests.
489 """Holds state of a server handling frame-based protocol requests.
490
490
491 This class is the "brain" of the unified frame-based protocol server
491 This class is the "brain" of the unified frame-based protocol server
492 component. While the protocol is stateless from the perspective of
492 component. While the protocol is stateless from the perspective of
493 requests/commands, something needs to track which frames have been
493 requests/commands, something needs to track which frames have been
494 received, what frames to expect, etc. This class is that thing.
494 received, what frames to expect, etc. This class is that thing.
495
495
496 Instances are modeled as a state machine of sorts. Instances are also
496 Instances are modeled as a state machine of sorts. Instances are also
497 reactionary to external events. The point of this class is to encapsulate
497 reactionary to external events. The point of this class is to encapsulate
498 the state of the connection and the exchange of frames, not to perform
498 the state of the connection and the exchange of frames, not to perform
499 work. Instead, callers tell this class when something occurs, like a
499 work. Instead, callers tell this class when something occurs, like a
500 frame arriving. If that activity is worthy of a follow-up action (say
500 frame arriving. If that activity is worthy of a follow-up action (say
501 *run a command*), the return value of that handler will say so.
501 *run a command*), the return value of that handler will say so.
502
502
503 I/O and CPU intensive operations are purposefully delegated outside of
503 I/O and CPU intensive operations are purposefully delegated outside of
504 this class.
504 this class.
505
505
506 Consumers are expected to tell instances when events occur. They do so by
506 Consumers are expected to tell instances when events occur. They do so by
507 calling the various ``on*`` methods. These methods return a 2-tuple
507 calling the various ``on*`` methods. These methods return a 2-tuple
508 describing any follow-up action(s) to take. The first element is the
508 describing any follow-up action(s) to take. The first element is the
509 name of an action to perform. The second is a data structure (usually
509 name of an action to perform. The second is a data structure (usually
510 a dict) specific to that action that contains more information. e.g.
510 a dict) specific to that action that contains more information. e.g.
511 if the server wants to send frames back to the client, the data structure
511 if the server wants to send frames back to the client, the data structure
512 will contain a reference to those frames.
512 will contain a reference to those frames.
513
513
514 Valid actions that consumers can be instructed to take are:
514 Valid actions that consumers can be instructed to take are:
515
515
516 sendframes
516 sendframes
517 Indicates that frames should be sent to the client. The ``framegen``
517 Indicates that frames should be sent to the client. The ``framegen``
518 key contains a generator of frames that should be sent. The server
518 key contains a generator of frames that should be sent. The server
519 assumes that all frames are sent to the client.
519 assumes that all frames are sent to the client.
520
520
521 error
521 error
522 Indicates that an error occurred. Consumer should probably abort.
522 Indicates that an error occurred. Consumer should probably abort.
523
523
524 runcommand
524 runcommand
525 Indicates that the consumer should run a wire protocol command. Details
525 Indicates that the consumer should run a wire protocol command. Details
526 of the command to run are given in the data structure.
526 of the command to run are given in the data structure.
527
527
528 wantframe
528 wantframe
529 Indicates that nothing of interest happened and the server is waiting on
529 Indicates that nothing of interest happened and the server is waiting on
530 more frames from the client before anything interesting can be done.
530 more frames from the client before anything interesting can be done.
531
531
532 noop
532 noop
533 Indicates no additional action is required.
533 Indicates no additional action is required.
534
534
535 Known Issues
535 Known Issues
536 ------------
536 ------------
537
537
538 There are no limits to the number of partially received commands or their
538 There are no limits to the number of partially received commands or their
539 size. A malicious client could stream command request data and exhaust the
539 size. A malicious client could stream command request data and exhaust the
540 server's memory.
540 server's memory.
541
541
542 Partially received commands are not acted upon when end of input is
542 Partially received commands are not acted upon when end of input is
543 reached. Should the server error if it receives a partial request?
543 reached. Should the server error if it receives a partial request?
544 Should the client send a message to abort a partially transmitted request
544 Should the client send a message to abort a partially transmitted request
545 to facilitate graceful shutdown?
545 to facilitate graceful shutdown?
546
546
547 Active requests that haven't been responded to aren't tracked. This means
547 Active requests that haven't been responded to aren't tracked. This means
548 that if we receive a command and instruct its dispatch, another command
548 that if we receive a command and instruct its dispatch, another command
549 with its request ID can come in over the wire and there will be a race
549 with its request ID can come in over the wire and there will be a race
550 between who responds to what.
550 between who responds to what.
551 """
551 """
552
552
553 def __init__(self, deferoutput=False):
553 def __init__(self, deferoutput=False):
554 """Construct a new server reactor.
554 """Construct a new server reactor.
555
555
556 ``deferoutput`` can be used to indicate that no output frames should be
556 ``deferoutput`` can be used to indicate that no output frames should be
557 instructed to be sent until input has been exhausted. In this mode,
557 instructed to be sent until input has been exhausted. In this mode,
558 events that would normally generate output frames (such as a command
558 events that would normally generate output frames (such as a command
559 response being ready) will instead defer instructing the consumer to
559 response being ready) will instead defer instructing the consumer to
560 send those frames. This is useful for half-duplex transports where the
560 send those frames. This is useful for half-duplex transports where the
561 sender cannot receive until all data has been transmitted.
561 sender cannot receive until all data has been transmitted.
562 """
562 """
563 self._deferoutput = deferoutput
563 self._deferoutput = deferoutput
564 self._state = 'idle'
564 self._state = 'idle'
565 self._nextoutgoingstreamid = 2
565 self._nextoutgoingstreamid = 2
566 self._bufferedframegens = []
566 self._bufferedframegens = []
567 # stream id -> stream instance for all active streams from the client.
567 # stream id -> stream instance for all active streams from the client.
568 self._incomingstreams = {}
568 self._incomingstreams = {}
569 self._outgoingstreams = {}
569 self._outgoingstreams = {}
570 # request id -> dict of commands that are actively being received.
570 # request id -> dict of commands that are actively being received.
571 self._receivingcommands = {}
571 self._receivingcommands = {}
572 # Request IDs that have been received and are actively being processed.
572 # Request IDs that have been received and are actively being processed.
573 # Once all output for a request has been sent, it is removed from this
573 # Once all output for a request has been sent, it is removed from this
574 # set.
574 # set.
575 self._activecommands = set()
575 self._activecommands = set()
576
576
577 def onframerecv(self, frame):
577 def onframerecv(self, frame):
578 """Process a frame that has been received off the wire.
578 """Process a frame that has been received off the wire.
579
579
580 Returns a dict with an ``action`` key that details what action,
580 Returns a dict with an ``action`` key that details what action,
581 if any, the consumer should take next.
581 if any, the consumer should take next.
582 """
582 """
583 if not frame.streamid % 2:
583 if not frame.streamid % 2:
584 self._state = 'errored'
584 self._state = 'errored'
585 return self._makeerrorresult(
585 return self._makeerrorresult(
586 _('received frame with even numbered stream ID: %d') %
586 _('received frame with even numbered stream ID: %d') %
587 frame.streamid)
587 frame.streamid)
588
588
589 if frame.streamid not in self._incomingstreams:
589 if frame.streamid not in self._incomingstreams:
590 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
590 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
591 self._state = 'errored'
591 self._state = 'errored'
592 return self._makeerrorresult(
592 return self._makeerrorresult(
593 _('received frame on unknown inactive stream without '
593 _('received frame on unknown inactive stream without '
594 'beginning of stream flag set'))
594 'beginning of stream flag set'))
595
595
596 self._incomingstreams[frame.streamid] = stream(frame.streamid)
596 self._incomingstreams[frame.streamid] = stream(frame.streamid)
597
597
598 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
598 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
599 # TODO handle decoding frames
599 # TODO handle decoding frames
600 self._state = 'errored'
600 self._state = 'errored'
601 raise error.ProgrammingError('support for decoding stream payloads '
601 raise error.ProgrammingError('support for decoding stream payloads '
602 'not yet implemented')
602 'not yet implemented')
603
603
604 if frame.streamflags & STREAM_FLAG_END_STREAM:
604 if frame.streamflags & STREAM_FLAG_END_STREAM:
605 del self._incomingstreams[frame.streamid]
605 del self._incomingstreams[frame.streamid]
606
606
607 handlers = {
607 handlers = {
608 'idle': self._onframeidle,
608 'idle': self._onframeidle,
609 'command-receiving': self._onframecommandreceiving,
609 'command-receiving': self._onframecommandreceiving,
610 'errored': self._onframeerrored,
610 'errored': self._onframeerrored,
611 }
611 }
612
612
613 meth = handlers.get(self._state)
613 meth = handlers.get(self._state)
614 if not meth:
614 if not meth:
615 raise error.ProgrammingError('unhandled state: %s' % self._state)
615 raise error.ProgrammingError('unhandled state: %s' % self._state)
616
616
617 return meth(frame)
617 return meth(frame)
618
618
619 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
619 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
620 """Signal that a bytes response is ready to be sent to the client.
620 """Signal that a bytes response is ready to be sent to the client.
621
621
622 The raw bytes response is passed as an argument.
622 The raw bytes response is passed as an argument.
623 """
623 """
624 ensureserverstream(stream)
624 ensureserverstream(stream)
625
625
626 def sendframes():
626 def sendframes():
627 for frame in createbytesresponseframesfrombytes(stream, requestid,
627 for frame in createbytesresponseframesfrombytes(stream, requestid,
628 data,
628 data,
629 iscbor=iscbor):
629 iscbor=iscbor):
630 yield frame
630 yield frame
631
631
632 self._activecommands.remove(requestid)
632 self._activecommands.remove(requestid)
633
633
634 result = sendframes()
634 result = sendframes()
635
635
636 if self._deferoutput:
636 if self._deferoutput:
637 self._bufferedframegens.append(result)
637 self._bufferedframegens.append(result)
638 return 'noop', {}
638 return 'noop', {}
639 else:
639 else:
640 return 'sendframes', {
640 return 'sendframes', {
641 'framegen': result,
641 'framegen': result,
642 }
642 }
643
643
644 def oninputeof(self):
644 def oninputeof(self):
645 """Signals that end of input has been received.
645 """Signals that end of input has been received.
646
646
647 No more frames will be received. All pending activity should be
647 No more frames will be received. All pending activity should be
648 completed.
648 completed.
649 """
649 """
650 # TODO should we do anything about in-flight commands?
650 # TODO should we do anything about in-flight commands?
651
651
652 if not self._deferoutput or not self._bufferedframegens:
652 if not self._deferoutput or not self._bufferedframegens:
653 return 'noop', {}
653 return 'noop', {}
654
654
655 # If we buffered all our responses, emit those.
655 # If we buffered all our responses, emit those.
656 def makegen():
656 def makegen():
657 for gen in self._bufferedframegens:
657 for gen in self._bufferedframegens:
658 for frame in gen:
658 for frame in gen:
659 yield frame
659 yield frame
660
660
661 return 'sendframes', {
661 return 'sendframes', {
662 'framegen': makegen(),
662 'framegen': makegen(),
663 }
663 }
664
664
665 def onapplicationerror(self, stream, requestid, msg):
665 def onapplicationerror(self, stream, requestid, msg):
666 ensureserverstream(stream)
666 ensureserverstream(stream)
667
667
668 return 'sendframes', {
668 return 'sendframes', {
669 'framegen': createerrorframe(stream, requestid, msg,
669 'framegen': createerrorframe(stream, requestid, msg,
670 application=True),
670 application=True),
671 }
671 }
672
672
673 def makeoutputstream(self):
673 def makeoutputstream(self):
674 """Create a stream to be used for sending data to the client."""
674 """Create a stream to be used for sending data to the client."""
675 streamid = self._nextoutgoingstreamid
675 streamid = self._nextoutgoingstreamid
676 self._nextoutgoingstreamid += 2
676 self._nextoutgoingstreamid += 2
677
677
678 s = stream(streamid)
678 s = stream(streamid)
679 self._outgoingstreams[streamid] = s
679 self._outgoingstreams[streamid] = s
680
680
681 return s
681 return s
682
682
683 def _makeerrorresult(self, msg):
683 def _makeerrorresult(self, msg):
684 return 'error', {
684 return 'error', {
685 'message': msg,
685 'message': msg,
686 }
686 }
687
687
688 def _makeruncommandresult(self, requestid):
688 def _makeruncommandresult(self, requestid):
689 entry = self._receivingcommands[requestid]
689 entry = self._receivingcommands[requestid]
690
690
691 if not entry['requestdone']:
691 if not entry['requestdone']:
692 self._state = 'errored'
692 self._state = 'errored'
693 raise error.ProgrammingError('should not be called without '
693 raise error.ProgrammingError('should not be called without '
694 'requestdone set')
694 'requestdone set')
695
695
696 del self._receivingcommands[requestid]
696 del self._receivingcommands[requestid]
697
697
698 if self._receivingcommands:
698 if self._receivingcommands:
699 self._state = 'command-receiving'
699 self._state = 'command-receiving'
700 else:
700 else:
701 self._state = 'idle'
701 self._state = 'idle'
702
702
703 # Decode the payloads as CBOR.
703 # Decode the payloads as CBOR.
704 entry['payload'].seek(0)
704 entry['payload'].seek(0)
705 request = cbor.load(entry['payload'])
705 request = cbor.load(entry['payload'])
706
706
707 if b'name' not in request:
707 if b'name' not in request:
708 self._state = 'errored'
708 self._state = 'errored'
709 return self._makeerrorresult(
709 return self._makeerrorresult(
710 _('command request missing "name" field'))
710 _('command request missing "name" field'))
711
711
712 if b'args' not in request:
712 if b'args' not in request:
713 request[b'args'] = {}
713 request[b'args'] = {}
714
714
715 assert requestid not in self._activecommands
715 assert requestid not in self._activecommands
716 self._activecommands.add(requestid)
716 self._activecommands.add(requestid)
717
717
718 return 'runcommand', {
718 return 'runcommand', {
719 'requestid': requestid,
719 'requestid': requestid,
720 'command': request[b'name'],
720 'command': request[b'name'],
721 'args': request[b'args'],
721 'args': request[b'args'],
722 'data': entry['data'].getvalue() if entry['data'] else None,
722 'data': entry['data'].getvalue() if entry['data'] else None,
723 }
723 }
724
724
725 def _makewantframeresult(self):
725 def _makewantframeresult(self):
726 return 'wantframe', {
726 return 'wantframe', {
727 'state': self._state,
727 'state': self._state,
728 }
728 }
729
729
730 def _validatecommandrequestframe(self, frame):
730 def _validatecommandrequestframe(self, frame):
731 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
731 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
732 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
732 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
733
733
734 if new and continuation:
734 if new and continuation:
735 self._state = 'errored'
735 self._state = 'errored'
736 return self._makeerrorresult(
736 return self._makeerrorresult(
737 _('received command request frame with both new and '
737 _('received command request frame with both new and '
738 'continuation flags set'))
738 'continuation flags set'))
739
739
740 if not new and not continuation:
740 if not new and not continuation:
741 self._state = 'errored'
741 self._state = 'errored'
742 return self._makeerrorresult(
742 return self._makeerrorresult(
743 _('received command request frame with neither new nor '
743 _('received command request frame with neither new nor '
744 'continuation flags set'))
744 'continuation flags set'))
745
745
746 def _onframeidle(self, frame):
746 def _onframeidle(self, frame):
747 # The only frame type that should be received in this state is a
747 # The only frame type that should be received in this state is a
748 # command request.
748 # command request.
749 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
749 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
750 self._state = 'errored'
750 self._state = 'errored'
751 return self._makeerrorresult(
751 return self._makeerrorresult(
752 _('expected command request frame; got %d') % frame.typeid)
752 _('expected command request frame; got %d') % frame.typeid)
753
753
754 res = self._validatecommandrequestframe(frame)
754 res = self._validatecommandrequestframe(frame)
755 if res:
755 if res:
756 return res
756 return res
757
757
758 if frame.requestid in self._receivingcommands:
758 if frame.requestid in self._receivingcommands:
759 self._state = 'errored'
759 self._state = 'errored'
760 return self._makeerrorresult(
760 return self._makeerrorresult(
761 _('request with ID %d already received') % frame.requestid)
761 _('request with ID %d already received') % frame.requestid)
762
762
763 if frame.requestid in self._activecommands:
763 if frame.requestid in self._activecommands:
764 self._state = 'errored'
764 self._state = 'errored'
765 return self._makeerrorresult(
765 return self._makeerrorresult(
766 _('request with ID %d is already active') % frame.requestid)
766 _('request with ID %d is already active') % frame.requestid)
767
767
768 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
768 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
769 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
769 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
770 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
770 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
771
771
772 if not new:
772 if not new:
773 self._state = 'errored'
773 self._state = 'errored'
774 return self._makeerrorresult(
774 return self._makeerrorresult(
775 _('received command request frame without new flag set'))
775 _('received command request frame without new flag set'))
776
776
777 payload = util.bytesio()
777 payload = util.bytesio()
778 payload.write(frame.payload)
778 payload.write(frame.payload)
779
779
780 self._receivingcommands[frame.requestid] = {
780 self._receivingcommands[frame.requestid] = {
781 'payload': payload,
781 'payload': payload,
782 'data': None,
782 'data': None,
783 'requestdone': not moreframes,
783 'requestdone': not moreframes,
784 'expectingdata': bool(expectingdata),
784 'expectingdata': bool(expectingdata),
785 }
785 }
786
786
787 # This is the final frame for this request. Dispatch it.
787 # This is the final frame for this request. Dispatch it.
788 if not moreframes and not expectingdata:
788 if not moreframes and not expectingdata:
789 return self._makeruncommandresult(frame.requestid)
789 return self._makeruncommandresult(frame.requestid)
790
790
791 assert moreframes or expectingdata
791 assert moreframes or expectingdata
792 self._state = 'command-receiving'
792 self._state = 'command-receiving'
793 return self._makewantframeresult()
793 return self._makewantframeresult()
794
794
795 def _onframecommandreceiving(self, frame):
795 def _onframecommandreceiving(self, frame):
796 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
796 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
797 # Process new command requests as such.
797 # Process new command requests as such.
798 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
798 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
799 return self._onframeidle(frame)
799 return self._onframeidle(frame)
800
800
801 res = self._validatecommandrequestframe(frame)
801 res = self._validatecommandrequestframe(frame)
802 if res:
802 if res:
803 return res
803 return res
804
804
805 # All other frames should be related to a command that is currently
805 # All other frames should be related to a command that is currently
806 # receiving but is not active.
806 # receiving but is not active.
807 if frame.requestid in self._activecommands:
807 if frame.requestid in self._activecommands:
808 self._state = 'errored'
808 self._state = 'errored'
809 return self._makeerrorresult(
809 return self._makeerrorresult(
810 _('received frame for request that is still active: %d') %
810 _('received frame for request that is still active: %d') %
811 frame.requestid)
811 frame.requestid)
812
812
813 if frame.requestid not in self._receivingcommands:
813 if frame.requestid not in self._receivingcommands:
814 self._state = 'errored'
814 self._state = 'errored'
815 return self._makeerrorresult(
815 return self._makeerrorresult(
816 _('received frame for request that is not receiving: %d') %
816 _('received frame for request that is not receiving: %d') %
817 frame.requestid)
817 frame.requestid)
818
818
819 entry = self._receivingcommands[frame.requestid]
819 entry = self._receivingcommands[frame.requestid]
820
820
821 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
821 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
822 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
822 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
823 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
823 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
824
824
825 if entry['requestdone']:
825 if entry['requestdone']:
826 self._state = 'errored'
826 self._state = 'errored'
827 return self._makeerrorresult(
827 return self._makeerrorresult(
828 _('received command request frame when request frames '
828 _('received command request frame when request frames '
829 'were supposedly done'))
829 'were supposedly done'))
830
830
831 if expectingdata != entry['expectingdata']:
831 if expectingdata != entry['expectingdata']:
832 self._state = 'errored'
832 self._state = 'errored'
833 return self._makeerrorresult(
833 return self._makeerrorresult(
834 _('mismatch between expect data flag and previous frame'))
834 _('mismatch between expect data flag and previous frame'))
835
835
836 entry['payload'].write(frame.payload)
836 entry['payload'].write(frame.payload)
837
837
838 if not moreframes:
838 if not moreframes:
839 entry['requestdone'] = True
839 entry['requestdone'] = True
840
840
841 if not moreframes and not expectingdata:
841 if not moreframes and not expectingdata:
842 return self._makeruncommandresult(frame.requestid)
842 return self._makeruncommandresult(frame.requestid)
843
843
844 return self._makewantframeresult()
844 return self._makewantframeresult()
845
845
846 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
846 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
847 if not entry['expectingdata']:
847 if not entry['expectingdata']:
848 self._state = 'errored'
848 self._state = 'errored'
849 return self._makeerrorresult(_(
849 return self._makeerrorresult(_(
850 'received command data frame for request that is not '
850 'received command data frame for request that is not '
851 'expecting data: %d') % frame.requestid)
851 'expecting data: %d') % frame.requestid)
852
852
853 if entry['data'] is None:
853 if entry['data'] is None:
854 entry['data'] = util.bytesio()
854 entry['data'] = util.bytesio()
855
855
856 return self._handlecommanddataframe(frame, entry)
856 return self._handlecommanddataframe(frame, entry)
857 else:
857 else:
858 self._state = 'errored'
858 self._state = 'errored'
859 return self._makeerrorresult(_(
859 return self._makeerrorresult(_(
860 'received unexpected frame type: %d') % frame.typeid)
860 'received unexpected frame type: %d') % frame.typeid)
861
861
862 def _handlecommanddataframe(self, frame, entry):
862 def _handlecommanddataframe(self, frame, entry):
863 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
863 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
864
864
865 # TODO support streaming data instead of buffering it.
865 # TODO support streaming data instead of buffering it.
866 entry['data'].write(frame.payload)
866 entry['data'].write(frame.payload)
867
867
868 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
868 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
869 return self._makewantframeresult()
869 return self._makewantframeresult()
870 elif frame.flags & FLAG_COMMAND_DATA_EOS:
870 elif frame.flags & FLAG_COMMAND_DATA_EOS:
871 entry['data'].seek(0)
871 entry['data'].seek(0)
872 return self._makeruncommandresult(frame.requestid)
872 return self._makeruncommandresult(frame.requestid)
873 else:
873 else:
874 self._state = 'errored'
874 self._state = 'errored'
875 return self._makeerrorresult(_('command data frame without '
875 return self._makeerrorresult(_('command data frame without '
876 'flags'))
876 'flags'))
877
877
878 def _onframeerrored(self, frame):
878 def _onframeerrored(self, frame):
879 return self._makeerrorresult(_('server already errored'))
879 return self._makeerrorresult(_('server already errored'))
880
880
881 class commandrequest(object):
881 class commandrequest(object):
882 """Represents a request to run a command."""
882 """Represents a request to run a command."""
883
883
884 def __init__(self, requestid, name, args, datafh=None):
884 def __init__(self, requestid, name, args, datafh=None):
885 self.requestid = requestid
885 self.requestid = requestid
886 self.name = name
886 self.name = name
887 self.args = args
887 self.args = args
888 self.datafh = datafh
888 self.datafh = datafh
889 self.state = 'pending'
889 self.state = 'pending'
890
890
891 class clientreactor(object):
891 class clientreactor(object):
892 """Holds state of a client issuing frame-based protocol requests.
892 """Holds state of a client issuing frame-based protocol requests.
893
893
894 This is like ``serverreactor`` but for client-side state.
894 This is like ``serverreactor`` but for client-side state.
895
895
896 Each instance is bound to the lifetime of a connection. For persistent
896 Each instance is bound to the lifetime of a connection. For persistent
897 connection transports using e.g. TCP sockets and speaking the raw
897 connection transports using e.g. TCP sockets and speaking the raw
898 framing protocol, there will be a single instance for the lifetime of
898 framing protocol, there will be a single instance for the lifetime of
899 the TCP socket. For transports where there are multiple discrete
899 the TCP socket. For transports where there are multiple discrete
900 interactions (say tunneled within in HTTP request), there will be a
900 interactions (say tunneled within in HTTP request), there will be a
901 separate instance for each distinct interaction.
901 separate instance for each distinct interaction.
902 """
902 """
903 def __init__(self, hasmultiplesend=False, buffersends=True):
903 def __init__(self, hasmultiplesend=False, buffersends=True):
904 """Create a new instance.
904 """Create a new instance.
905
905
906 ``hasmultiplesend`` indicates whether multiple sends are supported
906 ``hasmultiplesend`` indicates whether multiple sends are supported
907 by the transport. When True, it is possible to send commands immediately
907 by the transport. When True, it is possible to send commands immediately
908 instead of buffering until the caller signals an intent to finish a
908 instead of buffering until the caller signals an intent to finish a
909 send operation.
909 send operation.
910
910
911 ``buffercommands`` indicates whether sends should be buffered until the
911 ``buffercommands`` indicates whether sends should be buffered until the
912 last request has been issued.
912 last request has been issued.
913 """
913 """
914 self._hasmultiplesend = hasmultiplesend
914 self._hasmultiplesend = hasmultiplesend
915 self._buffersends = buffersends
915 self._buffersends = buffersends
916
916
917 self._canissuecommands = True
917 self._canissuecommands = True
918 self._cansend = True
918 self._cansend = True
919
919
920 self._nextrequestid = 1
920 self._nextrequestid = 1
921 # We only support a single outgoing stream for now.
921 # We only support a single outgoing stream for now.
922 self._outgoingstream = stream(1)
922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque()
923 self._pendingrequests = collections.deque()
924 self._activerequests = {}
924 self._activerequests = {}
925 self._incomingstreams = {}
925 self._incomingstreams = {}
926
926
927 def callcommand(self, name, args, datafh=None):
927 def callcommand(self, name, args, datafh=None):
928 """Request that a command be executed.
928 """Request that a command be executed.
929
929
930 Receives the command name, a dict of arguments to pass to the command,
930 Receives the command name, a dict of arguments to pass to the command,
931 and an optional file object containing the raw data for the command.
931 and an optional file object containing the raw data for the command.
932
932
933 Returns a 3-tuple of (request, action, action data).
933 Returns a 3-tuple of (request, action, action data).
934 """
934 """
935 if not self._canissuecommands:
935 if not self._canissuecommands:
936 raise error.ProgrammingError('cannot issue new commands')
936 raise error.ProgrammingError('cannot issue new commands')
937
937
938 requestid = self._nextrequestid
938 requestid = self._nextrequestid
939 self._nextrequestid += 2
939 self._nextrequestid += 2
940
940
941 request = commandrequest(requestid, name, args, datafh=datafh)
941 request = commandrequest(requestid, name, args, datafh=datafh)
942
942
943 if self._buffersends:
943 if self._buffersends:
944 self._pendingrequests.append(request)
944 self._pendingrequests.append(request)
945 return request, 'noop', {}
945 return request, 'noop', {}
946 else:
946 else:
947 if not self._cansend:
947 if not self._cansend:
948 raise error.ProgrammingError('sends cannot be performed on '
948 raise error.ProgrammingError('sends cannot be performed on '
949 'this instance')
949 'this instance')
950
950
951 if not self._hasmultiplesend:
951 if not self._hasmultiplesend:
952 self._cansend = False
952 self._cansend = False
953 self._canissuecommands = False
953 self._canissuecommands = False
954
954
955 return request, 'sendframes', {
955 return request, 'sendframes', {
956 'framegen': self._makecommandframes(request),
956 'framegen': self._makecommandframes(request),
957 }
957 }
958
958
959 def flushcommands(self):
959 def flushcommands(self):
960 """Request that all queued commands be sent.
960 """Request that all queued commands be sent.
961
961
962 If any commands are buffered, this will instruct the caller to send
962 If any commands are buffered, this will instruct the caller to send
963 them over the wire. If no commands are buffered it instructs the client
963 them over the wire. If no commands are buffered it instructs the client
964 to no-op.
964 to no-op.
965
965
966 If instances aren't configured for multiple sends, no new command
966 If instances aren't configured for multiple sends, no new command
967 requests are allowed after this is called.
967 requests are allowed after this is called.
968 """
968 """
969 if not self._pendingrequests:
969 if not self._pendingrequests:
970 return 'noop', {}
970 return 'noop', {}
971
971
972 if not self._cansend:
972 if not self._cansend:
973 raise error.ProgrammingError('sends cannot be performed on this '
973 raise error.ProgrammingError('sends cannot be performed on this '
974 'instance')
974 'instance')
975
975
976 # If the instance only allows sending once, mark that we have fired
976 # If the instance only allows sending once, mark that we have fired
977 # our one shot.
977 # our one shot.
978 if not self._hasmultiplesend:
978 if not self._hasmultiplesend:
979 self._canissuecommands = False
979 self._canissuecommands = False
980 self._cansend = False
980 self._cansend = False
981
981
982 def makeframes():
982 def makeframes():
983 while self._pendingrequests:
983 while self._pendingrequests:
984 request = self._pendingrequests.popleft()
984 request = self._pendingrequests.popleft()
985 for frame in self._makecommandframes(request):
985 for frame in self._makecommandframes(request):
986 yield frame
986 yield frame
987
987
988 return 'sendframes', {
988 return 'sendframes', {
989 'framegen': makeframes(),
989 'framegen': makeframes(),
990 }
990 }
991
991
992 def _makecommandframes(self, request):
992 def _makecommandframes(self, request):
993 """Emit frames to issue a command request.
993 """Emit frames to issue a command request.
994
994
995 As a side-effect, update request accounting to reflect its changed
995 As a side-effect, update request accounting to reflect its changed
996 state.
996 state.
997 """
997 """
998 self._activerequests[request.requestid] = request
998 self._activerequests[request.requestid] = request
999 request.state = 'sending'
999 request.state = 'sending'
1000
1000
1001 res = createcommandframes(self._outgoingstream,
1001 res = createcommandframes(self._outgoingstream,
1002 request.requestid,
1002 request.requestid,
1003 request.name,
1003 request.name,
1004 request.args,
1004 request.args,
1005 request.datafh)
1005 request.datafh)
1006
1006
1007 for frame in res:
1007 for frame in res:
1008 yield frame
1008 yield frame
1009
1009
1010 request.state = 'sent'
1010 request.state = 'sent'
1011
1011
1012 def onframerecv(self, frame):
1012 def onframerecv(self, frame):
1013 """Process a frame that has been received off the wire.
1013 """Process a frame that has been received off the wire.
1014
1014
1015 Returns a 2-tuple of (action, meta) describing further action the
1015 Returns a 2-tuple of (action, meta) describing further action the
1016 caller needs to take as a result of receiving this frame.
1016 caller needs to take as a result of receiving this frame.
1017 """
1017 """
1018 if frame.streamid % 2:
1018 if frame.streamid % 2:
1019 return 'error', {
1019 return 'error', {
1020 'message': (
1020 'message': (
1021 _('received frame with odd numbered stream ID: %d') %
1021 _('received frame with odd numbered stream ID: %d') %
1022 frame.streamid),
1022 frame.streamid),
1023 }
1023 }
1024
1024
1025 if frame.streamid not in self._incomingstreams:
1025 if frame.streamid not in self._incomingstreams:
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1027 return 'error', {
1027 return 'error', {
1028 'message': _('received frame on unknown stream '
1028 'message': _('received frame on unknown stream '
1029 'without beginning of stream flag set'),
1029 'without beginning of stream flag set'),
1030 }
1030 }
1031
1031
1032 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1033
1032 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1034 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1033 raise error.ProgrammingError('support for decoding stream '
1035 raise error.ProgrammingError('support for decoding stream '
1034 'payloads not yet implemneted')
1036 'payloads not yet implemneted')
1035
1037
1036 if frame.streamflags & STREAM_FLAG_END_STREAM:
1038 if frame.streamflags & STREAM_FLAG_END_STREAM:
1037 del self._incomingstreams[frame.streamid]
1039 del self._incomingstreams[frame.streamid]
1038
1040
1039 if frame.requestid not in self._activerequests:
1041 if frame.requestid not in self._activerequests:
1040 return 'error', {
1042 return 'error', {
1041 'message': (_('received frame for inactive request ID: %d') %
1043 'message': (_('received frame for inactive request ID: %d') %
1042 frame.requestid),
1044 frame.requestid),
1043 }
1045 }
1044
1046
1045 request = self._activerequests[frame.requestid]
1047 request = self._activerequests[frame.requestid]
1046 request.state = 'receiving'
1048 request.state = 'receiving'
1047
1049
1048 handlers = {
1050 handlers = {
1049 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1051 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1050 }
1052 }
1051
1053
1052 meth = handlers.get(frame.typeid)
1054 meth = handlers.get(frame.typeid)
1053 if not meth:
1055 if not meth:
1054 raise error.ProgrammingError('unhandled frame type: %d' %
1056 raise error.ProgrammingError('unhandled frame type: %d' %
1055 frame.typeid)
1057 frame.typeid)
1056
1058
1057 return meth(request, frame)
1059 return meth(request, frame)
1058
1060
1059 def _onbytesresponseframe(self, request, frame):
1061 def _onbytesresponseframe(self, request, frame):
1060 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1062 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1061 request.state = 'received'
1063 request.state = 'received'
1062 del self._activerequests[request.requestid]
1064 del self._activerequests[request.requestid]
1063
1065
1064 return 'responsedata', {
1066 return 'responsedata', {
1065 'request': request,
1067 'request': request,
1066 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1068 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1067 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1069 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1068 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1070 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1069 'data': frame.payload,
1071 'data': frame.payload,
1070 }
1072 }
@@ -1,110 +1,130 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial import (
5 from mercurial import (
6 error,
6 error,
7 wireprotoframing as framing,
7 wireprotoframing as framing,
8 )
8 )
9
9
10 ffs = framing.makeframefromhumanstring
10 ffs = framing.makeframefromhumanstring
11
11
12 def sendframe(reactor, frame):
12 def sendframe(reactor, frame):
13 """Send a frame bytearray to a reactor."""
13 """Send a frame bytearray to a reactor."""
14 header = framing.parseheader(frame)
14 header = framing.parseheader(frame)
15 payload = frame[framing.FRAME_HEADER_SIZE:]
15 payload = frame[framing.FRAME_HEADER_SIZE:]
16 assert len(payload) == header.length
16 assert len(payload) == header.length
17
17
18 return reactor.onframerecv(framing.frame(header.requestid,
18 return reactor.onframerecv(framing.frame(header.requestid,
19 header.streamid,
19 header.streamid,
20 header.streamflags,
20 header.streamflags,
21 header.typeid,
21 header.typeid,
22 header.flags,
22 header.flags,
23 payload))
23 payload))
24
24
25 class SingleSendTests(unittest.TestCase):
25 class SingleSendTests(unittest.TestCase):
26 """A reactor that can only send once rejects subsequent sends."""
26 """A reactor that can only send once rejects subsequent sends."""
27 def testbasic(self):
27 def testbasic(self):
28 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
28 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
29
29
30 request, action, meta = reactor.callcommand(b'foo', {})
30 request, action, meta = reactor.callcommand(b'foo', {})
31 self.assertEqual(request.state, 'pending')
31 self.assertEqual(request.state, 'pending')
32 self.assertEqual(action, 'noop')
32 self.assertEqual(action, 'noop')
33
33
34 action, meta = reactor.flushcommands()
34 action, meta = reactor.flushcommands()
35 self.assertEqual(action, 'sendframes')
35 self.assertEqual(action, 'sendframes')
36
36
37 for frame in meta['framegen']:
37 for frame in meta['framegen']:
38 self.assertEqual(request.state, 'sending')
38 self.assertEqual(request.state, 'sending')
39
39
40 self.assertEqual(request.state, 'sent')
40 self.assertEqual(request.state, 'sent')
41
41
42 with self.assertRaisesRegexp(error.ProgrammingError,
42 with self.assertRaisesRegexp(error.ProgrammingError,
43 'cannot issue new commands'):
43 'cannot issue new commands'):
44 reactor.callcommand(b'foo', {})
44 reactor.callcommand(b'foo', {})
45
45
46 with self.assertRaisesRegexp(error.ProgrammingError,
46 with self.assertRaisesRegexp(error.ProgrammingError,
47 'cannot issue new commands'):
47 'cannot issue new commands'):
48 reactor.callcommand(b'foo', {})
48 reactor.callcommand(b'foo', {})
49
49
50 class NoBufferTests(unittest.TestCase):
50 class NoBufferTests(unittest.TestCase):
51 """A reactor without send buffering sends requests immediately."""
51 """A reactor without send buffering sends requests immediately."""
52 def testbasic(self):
52 def testbasic(self):
53 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
53 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
54
54
55 request, action, meta = reactor.callcommand(b'command1', {})
55 request, action, meta = reactor.callcommand(b'command1', {})
56 self.assertEqual(request.requestid, 1)
56 self.assertEqual(request.requestid, 1)
57 self.assertEqual(action, 'sendframes')
57 self.assertEqual(action, 'sendframes')
58
58
59 self.assertEqual(request.state, 'pending')
59 self.assertEqual(request.state, 'pending')
60
60
61 for frame in meta['framegen']:
61 for frame in meta['framegen']:
62 self.assertEqual(request.state, 'sending')
62 self.assertEqual(request.state, 'sending')
63
63
64 self.assertEqual(request.state, 'sent')
64 self.assertEqual(request.state, 'sent')
65
65
66 action, meta = reactor.flushcommands()
66 action, meta = reactor.flushcommands()
67 self.assertEqual(action, 'noop')
67 self.assertEqual(action, 'noop')
68
68
69 # And we can send another command.
69 # And we can send another command.
70 request, action, meta = reactor.callcommand(b'command2', {})
70 request, action, meta = reactor.callcommand(b'command2', {})
71 self.assertEqual(request.requestid, 3)
71 self.assertEqual(request.requestid, 3)
72 self.assertEqual(action, 'sendframes')
72 self.assertEqual(action, 'sendframes')
73
73
74 for frame in meta['framegen']:
74 for frame in meta['framegen']:
75 self.assertEqual(request.state, 'sending')
75 self.assertEqual(request.state, 'sending')
76
76
77 self.assertEqual(request.state, 'sent')
77 self.assertEqual(request.state, 'sent')
78
78
79 class BadFrameRecvTests(unittest.TestCase):
79 class BadFrameRecvTests(unittest.TestCase):
80 def testoddstream(self):
80 def testoddstream(self):
81 reactor = framing.clientreactor()
81 reactor = framing.clientreactor()
82
82
83 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
83 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
84 self.assertEqual(action, 'error')
84 self.assertEqual(action, 'error')
85 self.assertEqual(meta['message'],
85 self.assertEqual(meta['message'],
86 'received frame with odd numbered stream ID: 1')
86 'received frame with odd numbered stream ID: 1')
87
87
88 def testunknownstream(self):
88 def testunknownstream(self):
89 reactor = framing.clientreactor()
89 reactor = framing.clientreactor()
90
90
91 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
91 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
92 self.assertEqual(action, 'error')
92 self.assertEqual(action, 'error')
93 self.assertEqual(meta['message'],
93 self.assertEqual(meta['message'],
94 'received frame on unknown stream without beginning '
94 'received frame on unknown stream without beginning '
95 'of stream flag set')
95 'of stream flag set')
96
96
97 def testunhandledframetype(self):
97 def testunhandledframetype(self):
98 reactor = framing.clientreactor(buffersends=False)
98 reactor = framing.clientreactor(buffersends=False)
99
99
100 request, action, meta = reactor.callcommand(b'foo', {})
100 request, action, meta = reactor.callcommand(b'foo', {})
101 for frame in meta['framegen']:
101 for frame in meta['framegen']:
102 pass
102 pass
103
103
104 with self.assertRaisesRegexp(error.ProgrammingError,
104 with self.assertRaisesRegexp(error.ProgrammingError,
105 'unhandled frame type'):
105 'unhandled frame type'):
106 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
106 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
107
107
108 class StreamTests(unittest.TestCase):
109 def testmultipleresponseframes(self):
110 reactor = framing.clientreactor(buffersends=False)
111
112 request, action, meta = reactor.callcommand(b'foo', {})
113
114 self.assertEqual(action, 'sendframes')
115 for f in meta['framegen']:
116 pass
117
118 action, meta = sendframe(
119 reactor,
120 ffs(b'%d 0 stream-begin 4 0 foo' % request.requestid))
121 self.assertEqual(action, 'responsedata')
122
123 action, meta = sendframe(
124 reactor,
125 ffs(b'%d 0 0 4 eos bar' % request.requestid))
126 self.assertEqual(action, 'responsedata')
127
108 if __name__ == '__main__':
128 if __name__ == '__main__':
109 import silenttestrunner
129 import silenttestrunner
110 silenttestrunner.main(__name__)
130 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now