##// END OF EJS Templates
wireprotov2peer: properly format errors...
Gregory Szorc -
r39522:43d92d68 default
parent child Browse files
Show More
@@ -1,1167 +1,1169 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 util,
24 util,
25 )
25 )
26 from .utils import (
26 from .utils import (
27 cborutil,
27 cborutil,
28 stringutil,
28 stringutil,
29 )
29 )
30
30
31 FRAME_HEADER_SIZE = 8
31 FRAME_HEADER_SIZE = 8
32 DEFAULT_MAX_FRAME_SIZE = 32768
32 DEFAULT_MAX_FRAME_SIZE = 32768
33
33
34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_END_STREAM = 0x02
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37
37
38 STREAM_FLAGS = {
38 STREAM_FLAGS = {
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 }
42 }
43
43
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 FRAME_TYPE_COMMAND_DATA = 0x02
45 FRAME_TYPE_COMMAND_DATA = 0x02
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_PROGRESS = 0x07
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51
51
52 FRAME_TYPES = {
52 FRAME_TYPES = {
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 b'progress': FRAME_TYPE_PROGRESS,
58 b'progress': FRAME_TYPE_PROGRESS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 }
60 }
61
61
62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66
66
67 FLAGS_COMMAND_REQUEST = {
67 FLAGS_COMMAND_REQUEST = {
68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 }
72 }
73
73
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 FLAG_COMMAND_DATA_EOS = 0x02
75 FLAG_COMMAND_DATA_EOS = 0x02
76
76
77 FLAGS_COMMAND_DATA = {
77 FLAGS_COMMAND_DATA = {
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 b'eos': FLAG_COMMAND_DATA_EOS,
79 b'eos': FLAG_COMMAND_DATA_EOS,
80 }
80 }
81
81
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
84
84
85 FLAGS_COMMAND_RESPONSE = {
85 FLAGS_COMMAND_RESPONSE = {
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
88 }
88 }
89
89
90 # Maps frame types to their available flags.
90 # Maps frame types to their available flags.
91 FRAME_TYPE_FLAGS = {
91 FRAME_TYPE_FLAGS = {
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
95 FRAME_TYPE_ERROR_RESPONSE: {},
95 FRAME_TYPE_ERROR_RESPONSE: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
97 FRAME_TYPE_PROGRESS: {},
97 FRAME_TYPE_PROGRESS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
99 }
99 }
100
100
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
102
102
103 def humanflags(mapping, value):
103 def humanflags(mapping, value):
104 """Convert a numeric flags value to a human value, using a mapping table."""
104 """Convert a numeric flags value to a human value, using a mapping table."""
105 namemap = {v: k for k, v in mapping.iteritems()}
105 namemap = {v: k for k, v in mapping.iteritems()}
106 flags = []
106 flags = []
107 val = 1
107 val = 1
108 while value >= val:
108 while value >= val:
109 if value & val:
109 if value & val:
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
111 val <<= 1
111 val <<= 1
112
112
113 return b'|'.join(flags)
113 return b'|'.join(flags)
114
114
115 @attr.s(slots=True)
115 @attr.s(slots=True)
116 class frameheader(object):
116 class frameheader(object):
117 """Represents the data in a frame header."""
117 """Represents the data in a frame header."""
118
118
119 length = attr.ib()
119 length = attr.ib()
120 requestid = attr.ib()
120 requestid = attr.ib()
121 streamid = attr.ib()
121 streamid = attr.ib()
122 streamflags = attr.ib()
122 streamflags = attr.ib()
123 typeid = attr.ib()
123 typeid = attr.ib()
124 flags = attr.ib()
124 flags = attr.ib()
125
125
126 @attr.s(slots=True, repr=False)
126 @attr.s(slots=True, repr=False)
127 class frame(object):
127 class frame(object):
128 """Represents a parsed frame."""
128 """Represents a parsed frame."""
129
129
130 requestid = attr.ib()
130 requestid = attr.ib()
131 streamid = attr.ib()
131 streamid = attr.ib()
132 streamflags = attr.ib()
132 streamflags = attr.ib()
133 typeid = attr.ib()
133 typeid = attr.ib()
134 flags = attr.ib()
134 flags = attr.ib()
135 payload = attr.ib()
135 payload = attr.ib()
136
136
137 @encoding.strmethod
137 @encoding.strmethod
138 def __repr__(self):
138 def __repr__(self):
139 typename = '<unknown 0x%02x>' % self.typeid
139 typename = '<unknown 0x%02x>' % self.typeid
140 for name, value in FRAME_TYPES.iteritems():
140 for name, value in FRAME_TYPES.iteritems():
141 if value == self.typeid:
141 if value == self.typeid:
142 typename = name
142 typename = name
143 break
143 break
144
144
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
146 'type=%s; flags=%s)' % (
146 'type=%s; flags=%s)' % (
147 len(self.payload), self.requestid, self.streamid,
147 len(self.payload), self.requestid, self.streamid,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
150
150
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
152 """Assemble a frame into a byte array."""
152 """Assemble a frame into a byte array."""
153 # TODO assert size of payload.
153 # TODO assert size of payload.
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
155
155
156 # 24 bits length
156 # 24 bits length
157 # 16 bits request id
157 # 16 bits request id
158 # 8 bits stream id
158 # 8 bits stream id
159 # 8 bits stream flags
159 # 8 bits stream flags
160 # 4 bits type
160 # 4 bits type
161 # 4 bits flags
161 # 4 bits flags
162
162
163 l = struct.pack(r'<I', len(payload))
163 l = struct.pack(r'<I', len(payload))
164 frame[0:3] = l[0:3]
164 frame[0:3] = l[0:3]
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
166 frame[7] = (typeid << 4) | flags
166 frame[7] = (typeid << 4) | flags
167 frame[8:] = payload
167 frame[8:] = payload
168
168
169 return frame
169 return frame
170
170
171 def makeframefromhumanstring(s):
171 def makeframefromhumanstring(s):
172 """Create a frame from a human readable string
172 """Create a frame from a human readable string
173
173
174 Strings have the form:
174 Strings have the form:
175
175
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
177
177
178 This can be used by user-facing applications and tests for creating
178 This can be used by user-facing applications and tests for creating
179 frames easily without having to type out a bunch of constants.
179 frames easily without having to type out a bunch of constants.
180
180
181 Request ID and stream IDs are integers.
181 Request ID and stream IDs are integers.
182
182
183 Stream flags, frame type, and flags can be specified by integer or
183 Stream flags, frame type, and flags can be specified by integer or
184 named constant.
184 named constant.
185
185
186 Flags can be delimited by `|` to bitwise OR them together.
186 Flags can be delimited by `|` to bitwise OR them together.
187
187
188 If the payload begins with ``cbor:``, the following string will be
188 If the payload begins with ``cbor:``, the following string will be
189 evaluated as Python literal and the resulting object will be fed into
189 evaluated as Python literal and the resulting object will be fed into
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
191 byte string literal.
191 byte string literal.
192 """
192 """
193 fields = s.split(b' ', 5)
193 fields = s.split(b' ', 5)
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
195
195
196 requestid = int(requestid)
196 requestid = int(requestid)
197 streamid = int(streamid)
197 streamid = int(streamid)
198
198
199 finalstreamflags = 0
199 finalstreamflags = 0
200 for flag in streamflags.split(b'|'):
200 for flag in streamflags.split(b'|'):
201 if flag in STREAM_FLAGS:
201 if flag in STREAM_FLAGS:
202 finalstreamflags |= STREAM_FLAGS[flag]
202 finalstreamflags |= STREAM_FLAGS[flag]
203 else:
203 else:
204 finalstreamflags |= int(flag)
204 finalstreamflags |= int(flag)
205
205
206 if frametype in FRAME_TYPES:
206 if frametype in FRAME_TYPES:
207 frametype = FRAME_TYPES[frametype]
207 frametype = FRAME_TYPES[frametype]
208 else:
208 else:
209 frametype = int(frametype)
209 frametype = int(frametype)
210
210
211 finalflags = 0
211 finalflags = 0
212 validflags = FRAME_TYPE_FLAGS[frametype]
212 validflags = FRAME_TYPE_FLAGS[frametype]
213 for flag in frameflags.split(b'|'):
213 for flag in frameflags.split(b'|'):
214 if flag in validflags:
214 if flag in validflags:
215 finalflags |= validflags[flag]
215 finalflags |= validflags[flag]
216 else:
216 else:
217 finalflags |= int(flag)
217 finalflags |= int(flag)
218
218
219 if payload.startswith(b'cbor:'):
219 if payload.startswith(b'cbor:'):
220 payload = b''.join(cborutil.streamencode(
220 payload = b''.join(cborutil.streamencode(
221 stringutil.evalpythonliteral(payload[5:])))
221 stringutil.evalpythonliteral(payload[5:])))
222
222
223 else:
223 else:
224 payload = stringutil.unescapestr(payload)
224 payload = stringutil.unescapestr(payload)
225
225
226 return makeframe(requestid=requestid, streamid=streamid,
226 return makeframe(requestid=requestid, streamid=streamid,
227 streamflags=finalstreamflags, typeid=frametype,
227 streamflags=finalstreamflags, typeid=frametype,
228 flags=finalflags, payload=payload)
228 flags=finalflags, payload=payload)
229
229
230 def parseheader(data):
230 def parseheader(data):
231 """Parse a unified framing protocol frame header from a buffer.
231 """Parse a unified framing protocol frame header from a buffer.
232
232
233 The header is expected to be in the buffer at offset 0 and the
233 The header is expected to be in the buffer at offset 0 and the
234 buffer is expected to be large enough to hold a full header.
234 buffer is expected to be large enough to hold a full header.
235 """
235 """
236 # 24 bits payload length (little endian)
236 # 24 bits payload length (little endian)
237 # 16 bits request ID
237 # 16 bits request ID
238 # 8 bits stream ID
238 # 8 bits stream ID
239 # 8 bits stream flags
239 # 8 bits stream flags
240 # 4 bits frame type
240 # 4 bits frame type
241 # 4 bits frame flags
241 # 4 bits frame flags
242 # ... payload
242 # ... payload
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
245 typeflags = data[7]
245 typeflags = data[7]
246
246
247 frametype = (typeflags & 0xf0) >> 4
247 frametype = (typeflags & 0xf0) >> 4
248 frameflags = typeflags & 0x0f
248 frameflags = typeflags & 0x0f
249
249
250 return frameheader(framelength, requestid, streamid, streamflags,
250 return frameheader(framelength, requestid, streamid, streamflags,
251 frametype, frameflags)
251 frametype, frameflags)
252
252
253 def readframe(fh):
253 def readframe(fh):
254 """Read a unified framing protocol frame from a file object.
254 """Read a unified framing protocol frame from a file object.
255
255
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
257 None if no frame is available. May raise if a malformed frame is
257 None if no frame is available. May raise if a malformed frame is
258 seen.
258 seen.
259 """
259 """
260 header = bytearray(FRAME_HEADER_SIZE)
260 header = bytearray(FRAME_HEADER_SIZE)
261
261
262 readcount = fh.readinto(header)
262 readcount = fh.readinto(header)
263
263
264 if readcount == 0:
264 if readcount == 0:
265 return None
265 return None
266
266
267 if readcount != FRAME_HEADER_SIZE:
267 if readcount != FRAME_HEADER_SIZE:
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
269 (readcount, header))
269 (readcount, header))
270
270
271 h = parseheader(header)
271 h = parseheader(header)
272
272
273 payload = fh.read(h.length)
273 payload = fh.read(h.length)
274 if len(payload) != h.length:
274 if len(payload) != h.length:
275 raise error.Abort(_('frame length error: expected %d; got %d') %
275 raise error.Abort(_('frame length error: expected %d; got %d') %
276 (h.length, len(payload)))
276 (h.length, len(payload)))
277
277
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
279 payload)
279 payload)
280
280
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
283 """Create frames necessary to transmit a request to run a command.
283 """Create frames necessary to transmit a request to run a command.
284
284
285 This is a generator of bytearrays. Each item represents a frame
285 This is a generator of bytearrays. Each item represents a frame
286 ready to be sent over the wire to a peer.
286 ready to be sent over the wire to a peer.
287 """
287 """
288 data = {b'name': cmd}
288 data = {b'name': cmd}
289 if args:
289 if args:
290 data[b'args'] = args
290 data[b'args'] = args
291
291
292 data = b''.join(cborutil.streamencode(data))
292 data = b''.join(cborutil.streamencode(data))
293
293
294 offset = 0
294 offset = 0
295
295
296 while True:
296 while True:
297 flags = 0
297 flags = 0
298
298
299 # Must set new or continuation flag.
299 # Must set new or continuation flag.
300 if not offset:
300 if not offset:
301 flags |= FLAG_COMMAND_REQUEST_NEW
301 flags |= FLAG_COMMAND_REQUEST_NEW
302 else:
302 else:
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
304
304
305 # Data frames is set on all frames.
305 # Data frames is set on all frames.
306 if datafh:
306 if datafh:
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
308
308
309 payload = data[offset:offset + maxframesize]
309 payload = data[offset:offset + maxframesize]
310 offset += len(payload)
310 offset += len(payload)
311
311
312 if len(payload) == maxframesize and offset < len(data):
312 if len(payload) == maxframesize and offset < len(data):
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
314
314
315 yield stream.makeframe(requestid=requestid,
315 yield stream.makeframe(requestid=requestid,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
317 flags=flags,
317 flags=flags,
318 payload=payload)
318 payload=payload)
319
319
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
321 break
321 break
322
322
323 if datafh:
323 if datafh:
324 while True:
324 while True:
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
326
326
327 done = False
327 done = False
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
329 flags = FLAG_COMMAND_DATA_CONTINUATION
329 flags = FLAG_COMMAND_DATA_CONTINUATION
330 else:
330 else:
331 flags = FLAG_COMMAND_DATA_EOS
331 flags = FLAG_COMMAND_DATA_EOS
332 assert datafh.read(1) == b''
332 assert datafh.read(1) == b''
333 done = True
333 done = True
334
334
335 yield stream.makeframe(requestid=requestid,
335 yield stream.makeframe(requestid=requestid,
336 typeid=FRAME_TYPE_COMMAND_DATA,
336 typeid=FRAME_TYPE_COMMAND_DATA,
337 flags=flags,
337 flags=flags,
338 payload=data)
338 payload=data)
339
339
340 if done:
340 if done:
341 break
341 break
342
342
343 def createcommandresponseframesfrombytes(stream, requestid, data,
343 def createcommandresponseframesfrombytes(stream, requestid, data,
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
345 """Create a raw frame to send a bytes response from static bytes input.
345 """Create a raw frame to send a bytes response from static bytes input.
346
346
347 Returns a generator of bytearrays.
347 Returns a generator of bytearrays.
348 """
348 """
349 # Automatically send the overall CBOR response map.
349 # Automatically send the overall CBOR response map.
350 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
350 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
351 if len(overall) > maxframesize:
351 if len(overall) > maxframesize:
352 raise error.ProgrammingError('not yet implemented')
352 raise error.ProgrammingError('not yet implemented')
353
353
354 # Simple case where we can fit the full response in a single frame.
354 # Simple case where we can fit the full response in a single frame.
355 if len(overall) + len(data) <= maxframesize:
355 if len(overall) + len(data) <= maxframesize:
356 flags = FLAG_COMMAND_RESPONSE_EOS
356 flags = FLAG_COMMAND_RESPONSE_EOS
357 yield stream.makeframe(requestid=requestid,
357 yield stream.makeframe(requestid=requestid,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
359 flags=flags,
359 flags=flags,
360 payload=overall + data)
360 payload=overall + data)
361 return
361 return
362
362
363 # It's easier to send the overall CBOR map in its own frame than to track
363 # It's easier to send the overall CBOR map in its own frame than to track
364 # offsets.
364 # offsets.
365 yield stream.makeframe(requestid=requestid,
365 yield stream.makeframe(requestid=requestid,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
368 payload=overall)
368 payload=overall)
369
369
370 offset = 0
370 offset = 0
371 while True:
371 while True:
372 chunk = data[offset:offset + maxframesize]
372 chunk = data[offset:offset + maxframesize]
373 offset += len(chunk)
373 offset += len(chunk)
374 done = offset == len(data)
374 done = offset == len(data)
375
375
376 if done:
376 if done:
377 flags = FLAG_COMMAND_RESPONSE_EOS
377 flags = FLAG_COMMAND_RESPONSE_EOS
378 else:
378 else:
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
380
380
381 yield stream.makeframe(requestid=requestid,
381 yield stream.makeframe(requestid=requestid,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 flags=flags,
383 flags=flags,
384 payload=chunk)
384 payload=chunk)
385
385
386 if done:
386 if done:
387 break
387 break
388
388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
391 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
392
392
393 yield stream.makeframe(requestid=requestid,
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
396 payload=overall)
396 payload=overall)
397
397
398 cb = util.chunkbuffer(gen)
398 cb = util.chunkbuffer(gen)
399
399
400 flags = 0
400 flags = 0
401
401
402 while True:
402 while True:
403 chunk = cb.read(maxframesize)
403 chunk = cb.read(maxframesize)
404 if not chunk:
404 if not chunk:
405 break
405 break
406
406
407 yield stream.makeframe(requestid=requestid,
407 yield stream.makeframe(requestid=requestid,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
409 flags=flags,
409 flags=flags,
410 payload=chunk)
410 payload=chunk)
411
411
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413
413
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
415 flags |= FLAG_COMMAND_RESPONSE_EOS
415 flags |= FLAG_COMMAND_RESPONSE_EOS
416 yield stream.makeframe(requestid=requestid,
416 yield stream.makeframe(requestid=requestid,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
418 flags=flags,
418 flags=flags,
419 payload=b'')
419 payload=b'')
420
420
421 def createcommanderrorresponse(stream, requestid, message, args=None):
421 def createcommanderrorresponse(stream, requestid, message, args=None):
422 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
423 # formatting works consistently?
422 m = {
424 m = {
423 b'status': b'error',
425 b'status': b'error',
424 b'error': {
426 b'error': {
425 b'message': message,
427 b'message': message,
426 }
428 }
427 }
429 }
428
430
429 if args:
431 if args:
430 m[b'error'][b'args'] = args
432 m[b'error'][b'args'] = args
431
433
432 overall = b''.join(cborutil.streamencode(m))
434 overall = b''.join(cborutil.streamencode(m))
433
435
434 yield stream.makeframe(requestid=requestid,
436 yield stream.makeframe(requestid=requestid,
435 typeid=FRAME_TYPE_COMMAND_RESPONSE,
437 typeid=FRAME_TYPE_COMMAND_RESPONSE,
436 flags=FLAG_COMMAND_RESPONSE_EOS,
438 flags=FLAG_COMMAND_RESPONSE_EOS,
437 payload=overall)
439 payload=overall)
438
440
439 def createerrorframe(stream, requestid, msg, errtype):
441 def createerrorframe(stream, requestid, msg, errtype):
440 # TODO properly handle frame size limits.
442 # TODO properly handle frame size limits.
441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
443 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
442
444
443 payload = b''.join(cborutil.streamencode({
445 payload = b''.join(cborutil.streamencode({
444 b'type': errtype,
446 b'type': errtype,
445 b'message': [{b'msg': msg}],
447 b'message': [{b'msg': msg}],
446 }))
448 }))
447
449
448 yield stream.makeframe(requestid=requestid,
450 yield stream.makeframe(requestid=requestid,
449 typeid=FRAME_TYPE_ERROR_RESPONSE,
451 typeid=FRAME_TYPE_ERROR_RESPONSE,
450 flags=0,
452 flags=0,
451 payload=payload)
453 payload=payload)
452
454
453 def createtextoutputframe(stream, requestid, atoms,
455 def createtextoutputframe(stream, requestid, atoms,
454 maxframesize=DEFAULT_MAX_FRAME_SIZE):
456 maxframesize=DEFAULT_MAX_FRAME_SIZE):
455 """Create a text output frame to render text to people.
457 """Create a text output frame to render text to people.
456
458
457 ``atoms`` is a 3-tuple of (formatting string, args, labels).
459 ``atoms`` is a 3-tuple of (formatting string, args, labels).
458
460
459 The formatting string contains ``%s`` tokens to be replaced by the
461 The formatting string contains ``%s`` tokens to be replaced by the
460 corresponding indexed entry in ``args``. ``labels`` is an iterable of
462 corresponding indexed entry in ``args``. ``labels`` is an iterable of
461 formatters to be applied at rendering time. In terms of the ``ui``
463 formatters to be applied at rendering time. In terms of the ``ui``
462 class, each atom corresponds to a ``ui.write()``.
464 class, each atom corresponds to a ``ui.write()``.
463 """
465 """
464 atomdicts = []
466 atomdicts = []
465
467
466 for (formatting, args, labels) in atoms:
468 for (formatting, args, labels) in atoms:
467 # TODO look for localstr, other types here?
469 # TODO look for localstr, other types here?
468
470
469 if not isinstance(formatting, bytes):
471 if not isinstance(formatting, bytes):
470 raise ValueError('must use bytes formatting strings')
472 raise ValueError('must use bytes formatting strings')
471 for arg in args:
473 for arg in args:
472 if not isinstance(arg, bytes):
474 if not isinstance(arg, bytes):
473 raise ValueError('must use bytes for arguments')
475 raise ValueError('must use bytes for arguments')
474 for label in labels:
476 for label in labels:
475 if not isinstance(label, bytes):
477 if not isinstance(label, bytes):
476 raise ValueError('must use bytes for labels')
478 raise ValueError('must use bytes for labels')
477
479
478 # Formatting string must be ASCII.
480 # Formatting string must be ASCII.
479 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
481 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
480
482
481 # Arguments must be UTF-8.
483 # Arguments must be UTF-8.
482 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
484 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
483
485
484 # Labels must be ASCII.
486 # Labels must be ASCII.
485 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
487 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
486 for l in labels]
488 for l in labels]
487
489
488 atom = {b'msg': formatting}
490 atom = {b'msg': formatting}
489 if args:
491 if args:
490 atom[b'args'] = args
492 atom[b'args'] = args
491 if labels:
493 if labels:
492 atom[b'labels'] = labels
494 atom[b'labels'] = labels
493
495
494 atomdicts.append(atom)
496 atomdicts.append(atom)
495
497
496 payload = b''.join(cborutil.streamencode(atomdicts))
498 payload = b''.join(cborutil.streamencode(atomdicts))
497
499
498 if len(payload) > maxframesize:
500 if len(payload) > maxframesize:
499 raise ValueError('cannot encode data in a single frame')
501 raise ValueError('cannot encode data in a single frame')
500
502
501 yield stream.makeframe(requestid=requestid,
503 yield stream.makeframe(requestid=requestid,
502 typeid=FRAME_TYPE_TEXT_OUTPUT,
504 typeid=FRAME_TYPE_TEXT_OUTPUT,
503 flags=0,
505 flags=0,
504 payload=payload)
506 payload=payload)
505
507
506 class stream(object):
508 class stream(object):
507 """Represents a logical unidirectional series of frames."""
509 """Represents a logical unidirectional series of frames."""
508
510
509 def __init__(self, streamid, active=False):
511 def __init__(self, streamid, active=False):
510 self.streamid = streamid
512 self.streamid = streamid
511 self._active = active
513 self._active = active
512
514
513 def makeframe(self, requestid, typeid, flags, payload):
515 def makeframe(self, requestid, typeid, flags, payload):
514 """Create a frame to be sent out over this stream.
516 """Create a frame to be sent out over this stream.
515
517
516 Only returns the frame instance. Does not actually send it.
518 Only returns the frame instance. Does not actually send it.
517 """
519 """
518 streamflags = 0
520 streamflags = 0
519 if not self._active:
521 if not self._active:
520 streamflags |= STREAM_FLAG_BEGIN_STREAM
522 streamflags |= STREAM_FLAG_BEGIN_STREAM
521 self._active = True
523 self._active = True
522
524
523 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
525 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
524 payload)
526 payload)
525
527
526 def ensureserverstream(stream):
528 def ensureserverstream(stream):
527 if stream.streamid % 2:
529 if stream.streamid % 2:
528 raise error.ProgrammingError('server should only write to even '
530 raise error.ProgrammingError('server should only write to even '
529 'numbered streams; %d is not even' %
531 'numbered streams; %d is not even' %
530 stream.streamid)
532 stream.streamid)
531
533
532 class serverreactor(object):
534 class serverreactor(object):
533 """Holds state of a server handling frame-based protocol requests.
535 """Holds state of a server handling frame-based protocol requests.
534
536
535 This class is the "brain" of the unified frame-based protocol server
537 This class is the "brain" of the unified frame-based protocol server
536 component. While the protocol is stateless from the perspective of
538 component. While the protocol is stateless from the perspective of
537 requests/commands, something needs to track which frames have been
539 requests/commands, something needs to track which frames have been
538 received, what frames to expect, etc. This class is that thing.
540 received, what frames to expect, etc. This class is that thing.
539
541
540 Instances are modeled as a state machine of sorts. Instances are also
542 Instances are modeled as a state machine of sorts. Instances are also
541 reactionary to external events. The point of this class is to encapsulate
543 reactionary to external events. The point of this class is to encapsulate
542 the state of the connection and the exchange of frames, not to perform
544 the state of the connection and the exchange of frames, not to perform
543 work. Instead, callers tell this class when something occurs, like a
545 work. Instead, callers tell this class when something occurs, like a
544 frame arriving. If that activity is worthy of a follow-up action (say
546 frame arriving. If that activity is worthy of a follow-up action (say
545 *run a command*), the return value of that handler will say so.
547 *run a command*), the return value of that handler will say so.
546
548
547 I/O and CPU intensive operations are purposefully delegated outside of
549 I/O and CPU intensive operations are purposefully delegated outside of
548 this class.
550 this class.
549
551
550 Consumers are expected to tell instances when events occur. They do so by
552 Consumers are expected to tell instances when events occur. They do so by
551 calling the various ``on*`` methods. These methods return a 2-tuple
553 calling the various ``on*`` methods. These methods return a 2-tuple
552 describing any follow-up action(s) to take. The first element is the
554 describing any follow-up action(s) to take. The first element is the
553 name of an action to perform. The second is a data structure (usually
555 name of an action to perform. The second is a data structure (usually
554 a dict) specific to that action that contains more information. e.g.
556 a dict) specific to that action that contains more information. e.g.
555 if the server wants to send frames back to the client, the data structure
557 if the server wants to send frames back to the client, the data structure
556 will contain a reference to those frames.
558 will contain a reference to those frames.
557
559
558 Valid actions that consumers can be instructed to take are:
560 Valid actions that consumers can be instructed to take are:
559
561
560 sendframes
562 sendframes
561 Indicates that frames should be sent to the client. The ``framegen``
563 Indicates that frames should be sent to the client. The ``framegen``
562 key contains a generator of frames that should be sent. The server
564 key contains a generator of frames that should be sent. The server
563 assumes that all frames are sent to the client.
565 assumes that all frames are sent to the client.
564
566
565 error
567 error
566 Indicates that an error occurred. Consumer should probably abort.
568 Indicates that an error occurred. Consumer should probably abort.
567
569
568 runcommand
570 runcommand
569 Indicates that the consumer should run a wire protocol command. Details
571 Indicates that the consumer should run a wire protocol command. Details
570 of the command to run are given in the data structure.
572 of the command to run are given in the data structure.
571
573
572 wantframe
574 wantframe
573 Indicates that nothing of interest happened and the server is waiting on
575 Indicates that nothing of interest happened and the server is waiting on
574 more frames from the client before anything interesting can be done.
576 more frames from the client before anything interesting can be done.
575
577
576 noop
578 noop
577 Indicates no additional action is required.
579 Indicates no additional action is required.
578
580
579 Known Issues
581 Known Issues
580 ------------
582 ------------
581
583
582 There are no limits to the number of partially received commands or their
584 There are no limits to the number of partially received commands or their
583 size. A malicious client could stream command request data and exhaust the
585 size. A malicious client could stream command request data and exhaust the
584 server's memory.
586 server's memory.
585
587
586 Partially received commands are not acted upon when end of input is
588 Partially received commands are not acted upon when end of input is
587 reached. Should the server error if it receives a partial request?
589 reached. Should the server error if it receives a partial request?
588 Should the client send a message to abort a partially transmitted request
590 Should the client send a message to abort a partially transmitted request
589 to facilitate graceful shutdown?
591 to facilitate graceful shutdown?
590
592
591 Active requests that haven't been responded to aren't tracked. This means
593 Active requests that haven't been responded to aren't tracked. This means
592 that if we receive a command and instruct its dispatch, another command
594 that if we receive a command and instruct its dispatch, another command
593 with its request ID can come in over the wire and there will be a race
595 with its request ID can come in over the wire and there will be a race
594 between who responds to what.
596 between who responds to what.
595 """
597 """
596
598
597 def __init__(self, deferoutput=False):
599 def __init__(self, deferoutput=False):
598 """Construct a new server reactor.
600 """Construct a new server reactor.
599
601
600 ``deferoutput`` can be used to indicate that no output frames should be
602 ``deferoutput`` can be used to indicate that no output frames should be
601 instructed to be sent until input has been exhausted. In this mode,
603 instructed to be sent until input has been exhausted. In this mode,
602 events that would normally generate output frames (such as a command
604 events that would normally generate output frames (such as a command
603 response being ready) will instead defer instructing the consumer to
605 response being ready) will instead defer instructing the consumer to
604 send those frames. This is useful for half-duplex transports where the
606 send those frames. This is useful for half-duplex transports where the
605 sender cannot receive until all data has been transmitted.
607 sender cannot receive until all data has been transmitted.
606 """
608 """
607 self._deferoutput = deferoutput
609 self._deferoutput = deferoutput
608 self._state = 'idle'
610 self._state = 'idle'
609 self._nextoutgoingstreamid = 2
611 self._nextoutgoingstreamid = 2
610 self._bufferedframegens = []
612 self._bufferedframegens = []
611 # stream id -> stream instance for all active streams from the client.
613 # stream id -> stream instance for all active streams from the client.
612 self._incomingstreams = {}
614 self._incomingstreams = {}
613 self._outgoingstreams = {}
615 self._outgoingstreams = {}
614 # request id -> dict of commands that are actively being received.
616 # request id -> dict of commands that are actively being received.
615 self._receivingcommands = {}
617 self._receivingcommands = {}
616 # Request IDs that have been received and are actively being processed.
618 # Request IDs that have been received and are actively being processed.
617 # Once all output for a request has been sent, it is removed from this
619 # Once all output for a request has been sent, it is removed from this
618 # set.
620 # set.
619 self._activecommands = set()
621 self._activecommands = set()
620
622
621 def onframerecv(self, frame):
623 def onframerecv(self, frame):
622 """Process a frame that has been received off the wire.
624 """Process a frame that has been received off the wire.
623
625
624 Returns a dict with an ``action`` key that details what action,
626 Returns a dict with an ``action`` key that details what action,
625 if any, the consumer should take next.
627 if any, the consumer should take next.
626 """
628 """
627 if not frame.streamid % 2:
629 if not frame.streamid % 2:
628 self._state = 'errored'
630 self._state = 'errored'
629 return self._makeerrorresult(
631 return self._makeerrorresult(
630 _('received frame with even numbered stream ID: %d') %
632 _('received frame with even numbered stream ID: %d') %
631 frame.streamid)
633 frame.streamid)
632
634
633 if frame.streamid not in self._incomingstreams:
635 if frame.streamid not in self._incomingstreams:
634 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
636 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
635 self._state = 'errored'
637 self._state = 'errored'
636 return self._makeerrorresult(
638 return self._makeerrorresult(
637 _('received frame on unknown inactive stream without '
639 _('received frame on unknown inactive stream without '
638 'beginning of stream flag set'))
640 'beginning of stream flag set'))
639
641
640 self._incomingstreams[frame.streamid] = stream(frame.streamid)
642 self._incomingstreams[frame.streamid] = stream(frame.streamid)
641
643
642 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
644 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
643 # TODO handle decoding frames
645 # TODO handle decoding frames
644 self._state = 'errored'
646 self._state = 'errored'
645 raise error.ProgrammingError('support for decoding stream payloads '
647 raise error.ProgrammingError('support for decoding stream payloads '
646 'not yet implemented')
648 'not yet implemented')
647
649
648 if frame.streamflags & STREAM_FLAG_END_STREAM:
650 if frame.streamflags & STREAM_FLAG_END_STREAM:
649 del self._incomingstreams[frame.streamid]
651 del self._incomingstreams[frame.streamid]
650
652
651 handlers = {
653 handlers = {
652 'idle': self._onframeidle,
654 'idle': self._onframeidle,
653 'command-receiving': self._onframecommandreceiving,
655 'command-receiving': self._onframecommandreceiving,
654 'errored': self._onframeerrored,
656 'errored': self._onframeerrored,
655 }
657 }
656
658
657 meth = handlers.get(self._state)
659 meth = handlers.get(self._state)
658 if not meth:
660 if not meth:
659 raise error.ProgrammingError('unhandled state: %s' % self._state)
661 raise error.ProgrammingError('unhandled state: %s' % self._state)
660
662
661 return meth(frame)
663 return meth(frame)
662
664
663 def oncommandresponseready(self, stream, requestid, data):
665 def oncommandresponseready(self, stream, requestid, data):
664 """Signal that a bytes response is ready to be sent to the client.
666 """Signal that a bytes response is ready to be sent to the client.
665
667
666 The raw bytes response is passed as an argument.
668 The raw bytes response is passed as an argument.
667 """
669 """
668 ensureserverstream(stream)
670 ensureserverstream(stream)
669
671
670 def sendframes():
672 def sendframes():
671 for frame in createcommandresponseframesfrombytes(stream, requestid,
673 for frame in createcommandresponseframesfrombytes(stream, requestid,
672 data):
674 data):
673 yield frame
675 yield frame
674
676
675 self._activecommands.remove(requestid)
677 self._activecommands.remove(requestid)
676
678
677 result = sendframes()
679 result = sendframes()
678
680
679 if self._deferoutput:
681 if self._deferoutput:
680 self._bufferedframegens.append(result)
682 self._bufferedframegens.append(result)
681 return 'noop', {}
683 return 'noop', {}
682 else:
684 else:
683 return 'sendframes', {
685 return 'sendframes', {
684 'framegen': result,
686 'framegen': result,
685 }
687 }
686
688
687 def oncommandresponsereadygen(self, stream, requestid, gen):
689 def oncommandresponsereadygen(self, stream, requestid, gen):
688 """Signal that a bytes response is ready, with data as a generator."""
690 """Signal that a bytes response is ready, with data as a generator."""
689 ensureserverstream(stream)
691 ensureserverstream(stream)
690
692
691 def sendframes():
693 def sendframes():
692 for frame in createbytesresponseframesfromgen(stream, requestid,
694 for frame in createbytesresponseframesfromgen(stream, requestid,
693 gen):
695 gen):
694 yield frame
696 yield frame
695
697
696 self._activecommands.remove(requestid)
698 self._activecommands.remove(requestid)
697
699
698 return self._handlesendframes(sendframes())
700 return self._handlesendframes(sendframes())
699
701
700 def oninputeof(self):
702 def oninputeof(self):
701 """Signals that end of input has been received.
703 """Signals that end of input has been received.
702
704
703 No more frames will be received. All pending activity should be
705 No more frames will be received. All pending activity should be
704 completed.
706 completed.
705 """
707 """
706 # TODO should we do anything about in-flight commands?
708 # TODO should we do anything about in-flight commands?
707
709
708 if not self._deferoutput or not self._bufferedframegens:
710 if not self._deferoutput or not self._bufferedframegens:
709 return 'noop', {}
711 return 'noop', {}
710
712
711 # If we buffered all our responses, emit those.
713 # If we buffered all our responses, emit those.
712 def makegen():
714 def makegen():
713 for gen in self._bufferedframegens:
715 for gen in self._bufferedframegens:
714 for frame in gen:
716 for frame in gen:
715 yield frame
717 yield frame
716
718
717 return 'sendframes', {
719 return 'sendframes', {
718 'framegen': makegen(),
720 'framegen': makegen(),
719 }
721 }
720
722
721 def _handlesendframes(self, framegen):
723 def _handlesendframes(self, framegen):
722 if self._deferoutput:
724 if self._deferoutput:
723 self._bufferedframegens.append(framegen)
725 self._bufferedframegens.append(framegen)
724 return 'noop', {}
726 return 'noop', {}
725 else:
727 else:
726 return 'sendframes', {
728 return 'sendframes', {
727 'framegen': framegen,
729 'framegen': framegen,
728 }
730 }
729
731
730 def onservererror(self, stream, requestid, msg):
732 def onservererror(self, stream, requestid, msg):
731 ensureserverstream(stream)
733 ensureserverstream(stream)
732
734
733 def sendframes():
735 def sendframes():
734 for frame in createerrorframe(stream, requestid, msg,
736 for frame in createerrorframe(stream, requestid, msg,
735 errtype='server'):
737 errtype='server'):
736 yield frame
738 yield frame
737
739
738 self._activecommands.remove(requestid)
740 self._activecommands.remove(requestid)
739
741
740 return self._handlesendframes(sendframes())
742 return self._handlesendframes(sendframes())
741
743
742 def oncommanderror(self, stream, requestid, message, args=None):
744 def oncommanderror(self, stream, requestid, message, args=None):
743 """Called when a command encountered an error before sending output."""
745 """Called when a command encountered an error before sending output."""
744 ensureserverstream(stream)
746 ensureserverstream(stream)
745
747
746 def sendframes():
748 def sendframes():
747 for frame in createcommanderrorresponse(stream, requestid, message,
749 for frame in createcommanderrorresponse(stream, requestid, message,
748 args):
750 args):
749 yield frame
751 yield frame
750
752
751 self._activecommands.remove(requestid)
753 self._activecommands.remove(requestid)
752
754
753 return self._handlesendframes(sendframes())
755 return self._handlesendframes(sendframes())
754
756
755 def makeoutputstream(self):
757 def makeoutputstream(self):
756 """Create a stream to be used for sending data to the client."""
758 """Create a stream to be used for sending data to the client."""
757 streamid = self._nextoutgoingstreamid
759 streamid = self._nextoutgoingstreamid
758 self._nextoutgoingstreamid += 2
760 self._nextoutgoingstreamid += 2
759
761
760 s = stream(streamid)
762 s = stream(streamid)
761 self._outgoingstreams[streamid] = s
763 self._outgoingstreams[streamid] = s
762
764
763 return s
765 return s
764
766
765 def _makeerrorresult(self, msg):
767 def _makeerrorresult(self, msg):
766 return 'error', {
768 return 'error', {
767 'message': msg,
769 'message': msg,
768 }
770 }
769
771
770 def _makeruncommandresult(self, requestid):
772 def _makeruncommandresult(self, requestid):
771 entry = self._receivingcommands[requestid]
773 entry = self._receivingcommands[requestid]
772
774
773 if not entry['requestdone']:
775 if not entry['requestdone']:
774 self._state = 'errored'
776 self._state = 'errored'
775 raise error.ProgrammingError('should not be called without '
777 raise error.ProgrammingError('should not be called without '
776 'requestdone set')
778 'requestdone set')
777
779
778 del self._receivingcommands[requestid]
780 del self._receivingcommands[requestid]
779
781
780 if self._receivingcommands:
782 if self._receivingcommands:
781 self._state = 'command-receiving'
783 self._state = 'command-receiving'
782 else:
784 else:
783 self._state = 'idle'
785 self._state = 'idle'
784
786
785 # Decode the payloads as CBOR.
787 # Decode the payloads as CBOR.
786 entry['payload'].seek(0)
788 entry['payload'].seek(0)
787 request = cborutil.decodeall(entry['payload'].getvalue())[0]
789 request = cborutil.decodeall(entry['payload'].getvalue())[0]
788
790
789 if b'name' not in request:
791 if b'name' not in request:
790 self._state = 'errored'
792 self._state = 'errored'
791 return self._makeerrorresult(
793 return self._makeerrorresult(
792 _('command request missing "name" field'))
794 _('command request missing "name" field'))
793
795
794 if b'args' not in request:
796 if b'args' not in request:
795 request[b'args'] = {}
797 request[b'args'] = {}
796
798
797 assert requestid not in self._activecommands
799 assert requestid not in self._activecommands
798 self._activecommands.add(requestid)
800 self._activecommands.add(requestid)
799
801
800 return 'runcommand', {
802 return 'runcommand', {
801 'requestid': requestid,
803 'requestid': requestid,
802 'command': request[b'name'],
804 'command': request[b'name'],
803 'args': request[b'args'],
805 'args': request[b'args'],
804 'data': entry['data'].getvalue() if entry['data'] else None,
806 'data': entry['data'].getvalue() if entry['data'] else None,
805 }
807 }
806
808
807 def _makewantframeresult(self):
809 def _makewantframeresult(self):
808 return 'wantframe', {
810 return 'wantframe', {
809 'state': self._state,
811 'state': self._state,
810 }
812 }
811
813
812 def _validatecommandrequestframe(self, frame):
814 def _validatecommandrequestframe(self, frame):
813 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
815 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
814 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
816 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
815
817
816 if new and continuation:
818 if new and continuation:
817 self._state = 'errored'
819 self._state = 'errored'
818 return self._makeerrorresult(
820 return self._makeerrorresult(
819 _('received command request frame with both new and '
821 _('received command request frame with both new and '
820 'continuation flags set'))
822 'continuation flags set'))
821
823
822 if not new and not continuation:
824 if not new and not continuation:
823 self._state = 'errored'
825 self._state = 'errored'
824 return self._makeerrorresult(
826 return self._makeerrorresult(
825 _('received command request frame with neither new nor '
827 _('received command request frame with neither new nor '
826 'continuation flags set'))
828 'continuation flags set'))
827
829
828 def _onframeidle(self, frame):
830 def _onframeidle(self, frame):
829 # The only frame type that should be received in this state is a
831 # The only frame type that should be received in this state is a
830 # command request.
832 # command request.
831 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
833 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
832 self._state = 'errored'
834 self._state = 'errored'
833 return self._makeerrorresult(
835 return self._makeerrorresult(
834 _('expected command request frame; got %d') % frame.typeid)
836 _('expected command request frame; got %d') % frame.typeid)
835
837
836 res = self._validatecommandrequestframe(frame)
838 res = self._validatecommandrequestframe(frame)
837 if res:
839 if res:
838 return res
840 return res
839
841
840 if frame.requestid in self._receivingcommands:
842 if frame.requestid in self._receivingcommands:
841 self._state = 'errored'
843 self._state = 'errored'
842 return self._makeerrorresult(
844 return self._makeerrorresult(
843 _('request with ID %d already received') % frame.requestid)
845 _('request with ID %d already received') % frame.requestid)
844
846
845 if frame.requestid in self._activecommands:
847 if frame.requestid in self._activecommands:
846 self._state = 'errored'
848 self._state = 'errored'
847 return self._makeerrorresult(
849 return self._makeerrorresult(
848 _('request with ID %d is already active') % frame.requestid)
850 _('request with ID %d is already active') % frame.requestid)
849
851
850 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
852 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
851 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
853 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
852 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
854 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
853
855
854 if not new:
856 if not new:
855 self._state = 'errored'
857 self._state = 'errored'
856 return self._makeerrorresult(
858 return self._makeerrorresult(
857 _('received command request frame without new flag set'))
859 _('received command request frame without new flag set'))
858
860
859 payload = util.bytesio()
861 payload = util.bytesio()
860 payload.write(frame.payload)
862 payload.write(frame.payload)
861
863
862 self._receivingcommands[frame.requestid] = {
864 self._receivingcommands[frame.requestid] = {
863 'payload': payload,
865 'payload': payload,
864 'data': None,
866 'data': None,
865 'requestdone': not moreframes,
867 'requestdone': not moreframes,
866 'expectingdata': bool(expectingdata),
868 'expectingdata': bool(expectingdata),
867 }
869 }
868
870
869 # This is the final frame for this request. Dispatch it.
871 # This is the final frame for this request. Dispatch it.
870 if not moreframes and not expectingdata:
872 if not moreframes and not expectingdata:
871 return self._makeruncommandresult(frame.requestid)
873 return self._makeruncommandresult(frame.requestid)
872
874
873 assert moreframes or expectingdata
875 assert moreframes or expectingdata
874 self._state = 'command-receiving'
876 self._state = 'command-receiving'
875 return self._makewantframeresult()
877 return self._makewantframeresult()
876
878
877 def _onframecommandreceiving(self, frame):
879 def _onframecommandreceiving(self, frame):
878 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
880 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
879 # Process new command requests as such.
881 # Process new command requests as such.
880 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
882 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
881 return self._onframeidle(frame)
883 return self._onframeidle(frame)
882
884
883 res = self._validatecommandrequestframe(frame)
885 res = self._validatecommandrequestframe(frame)
884 if res:
886 if res:
885 return res
887 return res
886
888
887 # All other frames should be related to a command that is currently
889 # All other frames should be related to a command that is currently
888 # receiving but is not active.
890 # receiving but is not active.
889 if frame.requestid in self._activecommands:
891 if frame.requestid in self._activecommands:
890 self._state = 'errored'
892 self._state = 'errored'
891 return self._makeerrorresult(
893 return self._makeerrorresult(
892 _('received frame for request that is still active: %d') %
894 _('received frame for request that is still active: %d') %
893 frame.requestid)
895 frame.requestid)
894
896
895 if frame.requestid not in self._receivingcommands:
897 if frame.requestid not in self._receivingcommands:
896 self._state = 'errored'
898 self._state = 'errored'
897 return self._makeerrorresult(
899 return self._makeerrorresult(
898 _('received frame for request that is not receiving: %d') %
900 _('received frame for request that is not receiving: %d') %
899 frame.requestid)
901 frame.requestid)
900
902
901 entry = self._receivingcommands[frame.requestid]
903 entry = self._receivingcommands[frame.requestid]
902
904
903 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
905 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
904 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
906 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
905 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
907 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
906
908
907 if entry['requestdone']:
909 if entry['requestdone']:
908 self._state = 'errored'
910 self._state = 'errored'
909 return self._makeerrorresult(
911 return self._makeerrorresult(
910 _('received command request frame when request frames '
912 _('received command request frame when request frames '
911 'were supposedly done'))
913 'were supposedly done'))
912
914
913 if expectingdata != entry['expectingdata']:
915 if expectingdata != entry['expectingdata']:
914 self._state = 'errored'
916 self._state = 'errored'
915 return self._makeerrorresult(
917 return self._makeerrorresult(
916 _('mismatch between expect data flag and previous frame'))
918 _('mismatch between expect data flag and previous frame'))
917
919
918 entry['payload'].write(frame.payload)
920 entry['payload'].write(frame.payload)
919
921
920 if not moreframes:
922 if not moreframes:
921 entry['requestdone'] = True
923 entry['requestdone'] = True
922
924
923 if not moreframes and not expectingdata:
925 if not moreframes and not expectingdata:
924 return self._makeruncommandresult(frame.requestid)
926 return self._makeruncommandresult(frame.requestid)
925
927
926 return self._makewantframeresult()
928 return self._makewantframeresult()
927
929
928 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
930 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
929 if not entry['expectingdata']:
931 if not entry['expectingdata']:
930 self._state = 'errored'
932 self._state = 'errored'
931 return self._makeerrorresult(_(
933 return self._makeerrorresult(_(
932 'received command data frame for request that is not '
934 'received command data frame for request that is not '
933 'expecting data: %d') % frame.requestid)
935 'expecting data: %d') % frame.requestid)
934
936
935 if entry['data'] is None:
937 if entry['data'] is None:
936 entry['data'] = util.bytesio()
938 entry['data'] = util.bytesio()
937
939
938 return self._handlecommanddataframe(frame, entry)
940 return self._handlecommanddataframe(frame, entry)
939 else:
941 else:
940 self._state = 'errored'
942 self._state = 'errored'
941 return self._makeerrorresult(_(
943 return self._makeerrorresult(_(
942 'received unexpected frame type: %d') % frame.typeid)
944 'received unexpected frame type: %d') % frame.typeid)
943
945
944 def _handlecommanddataframe(self, frame, entry):
946 def _handlecommanddataframe(self, frame, entry):
945 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
947 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
946
948
947 # TODO support streaming data instead of buffering it.
949 # TODO support streaming data instead of buffering it.
948 entry['data'].write(frame.payload)
950 entry['data'].write(frame.payload)
949
951
950 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
952 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
951 return self._makewantframeresult()
953 return self._makewantframeresult()
952 elif frame.flags & FLAG_COMMAND_DATA_EOS:
954 elif frame.flags & FLAG_COMMAND_DATA_EOS:
953 entry['data'].seek(0)
955 entry['data'].seek(0)
954 return self._makeruncommandresult(frame.requestid)
956 return self._makeruncommandresult(frame.requestid)
955 else:
957 else:
956 self._state = 'errored'
958 self._state = 'errored'
957 return self._makeerrorresult(_('command data frame without '
959 return self._makeerrorresult(_('command data frame without '
958 'flags'))
960 'flags'))
959
961
960 def _onframeerrored(self, frame):
962 def _onframeerrored(self, frame):
961 return self._makeerrorresult(_('server already errored'))
963 return self._makeerrorresult(_('server already errored'))
962
964
963 class commandrequest(object):
965 class commandrequest(object):
964 """Represents a request to run a command."""
966 """Represents a request to run a command."""
965
967
966 def __init__(self, requestid, name, args, datafh=None):
968 def __init__(self, requestid, name, args, datafh=None):
967 self.requestid = requestid
969 self.requestid = requestid
968 self.name = name
970 self.name = name
969 self.args = args
971 self.args = args
970 self.datafh = datafh
972 self.datafh = datafh
971 self.state = 'pending'
973 self.state = 'pending'
972
974
973 class clientreactor(object):
975 class clientreactor(object):
974 """Holds state of a client issuing frame-based protocol requests.
976 """Holds state of a client issuing frame-based protocol requests.
975
977
976 This is like ``serverreactor`` but for client-side state.
978 This is like ``serverreactor`` but for client-side state.
977
979
978 Each instance is bound to the lifetime of a connection. For persistent
980 Each instance is bound to the lifetime of a connection. For persistent
979 connection transports using e.g. TCP sockets and speaking the raw
981 connection transports using e.g. TCP sockets and speaking the raw
980 framing protocol, there will be a single instance for the lifetime of
982 framing protocol, there will be a single instance for the lifetime of
981 the TCP socket. For transports where there are multiple discrete
983 the TCP socket. For transports where there are multiple discrete
982 interactions (say tunneled within in HTTP request), there will be a
984 interactions (say tunneled within in HTTP request), there will be a
983 separate instance for each distinct interaction.
985 separate instance for each distinct interaction.
984 """
986 """
985 def __init__(self, hasmultiplesend=False, buffersends=True):
987 def __init__(self, hasmultiplesend=False, buffersends=True):
986 """Create a new instance.
988 """Create a new instance.
987
989
988 ``hasmultiplesend`` indicates whether multiple sends are supported
990 ``hasmultiplesend`` indicates whether multiple sends are supported
989 by the transport. When True, it is possible to send commands immediately
991 by the transport. When True, it is possible to send commands immediately
990 instead of buffering until the caller signals an intent to finish a
992 instead of buffering until the caller signals an intent to finish a
991 send operation.
993 send operation.
992
994
993 ``buffercommands`` indicates whether sends should be buffered until the
995 ``buffercommands`` indicates whether sends should be buffered until the
994 last request has been issued.
996 last request has been issued.
995 """
997 """
996 self._hasmultiplesend = hasmultiplesend
998 self._hasmultiplesend = hasmultiplesend
997 self._buffersends = buffersends
999 self._buffersends = buffersends
998
1000
999 self._canissuecommands = True
1001 self._canissuecommands = True
1000 self._cansend = True
1002 self._cansend = True
1001
1003
1002 self._nextrequestid = 1
1004 self._nextrequestid = 1
1003 # We only support a single outgoing stream for now.
1005 # We only support a single outgoing stream for now.
1004 self._outgoingstream = stream(1)
1006 self._outgoingstream = stream(1)
1005 self._pendingrequests = collections.deque()
1007 self._pendingrequests = collections.deque()
1006 self._activerequests = {}
1008 self._activerequests = {}
1007 self._incomingstreams = {}
1009 self._incomingstreams = {}
1008
1010
1009 def callcommand(self, name, args, datafh=None):
1011 def callcommand(self, name, args, datafh=None):
1010 """Request that a command be executed.
1012 """Request that a command be executed.
1011
1013
1012 Receives the command name, a dict of arguments to pass to the command,
1014 Receives the command name, a dict of arguments to pass to the command,
1013 and an optional file object containing the raw data for the command.
1015 and an optional file object containing the raw data for the command.
1014
1016
1015 Returns a 3-tuple of (request, action, action data).
1017 Returns a 3-tuple of (request, action, action data).
1016 """
1018 """
1017 if not self._canissuecommands:
1019 if not self._canissuecommands:
1018 raise error.ProgrammingError('cannot issue new commands')
1020 raise error.ProgrammingError('cannot issue new commands')
1019
1021
1020 requestid = self._nextrequestid
1022 requestid = self._nextrequestid
1021 self._nextrequestid += 2
1023 self._nextrequestid += 2
1022
1024
1023 request = commandrequest(requestid, name, args, datafh=datafh)
1025 request = commandrequest(requestid, name, args, datafh=datafh)
1024
1026
1025 if self._buffersends:
1027 if self._buffersends:
1026 self._pendingrequests.append(request)
1028 self._pendingrequests.append(request)
1027 return request, 'noop', {}
1029 return request, 'noop', {}
1028 else:
1030 else:
1029 if not self._cansend:
1031 if not self._cansend:
1030 raise error.ProgrammingError('sends cannot be performed on '
1032 raise error.ProgrammingError('sends cannot be performed on '
1031 'this instance')
1033 'this instance')
1032
1034
1033 if not self._hasmultiplesend:
1035 if not self._hasmultiplesend:
1034 self._cansend = False
1036 self._cansend = False
1035 self._canissuecommands = False
1037 self._canissuecommands = False
1036
1038
1037 return request, 'sendframes', {
1039 return request, 'sendframes', {
1038 'framegen': self._makecommandframes(request),
1040 'framegen': self._makecommandframes(request),
1039 }
1041 }
1040
1042
1041 def flushcommands(self):
1043 def flushcommands(self):
1042 """Request that all queued commands be sent.
1044 """Request that all queued commands be sent.
1043
1045
1044 If any commands are buffered, this will instruct the caller to send
1046 If any commands are buffered, this will instruct the caller to send
1045 them over the wire. If no commands are buffered it instructs the client
1047 them over the wire. If no commands are buffered it instructs the client
1046 to no-op.
1048 to no-op.
1047
1049
1048 If instances aren't configured for multiple sends, no new command
1050 If instances aren't configured for multiple sends, no new command
1049 requests are allowed after this is called.
1051 requests are allowed after this is called.
1050 """
1052 """
1051 if not self._pendingrequests:
1053 if not self._pendingrequests:
1052 return 'noop', {}
1054 return 'noop', {}
1053
1055
1054 if not self._cansend:
1056 if not self._cansend:
1055 raise error.ProgrammingError('sends cannot be performed on this '
1057 raise error.ProgrammingError('sends cannot be performed on this '
1056 'instance')
1058 'instance')
1057
1059
1058 # If the instance only allows sending once, mark that we have fired
1060 # If the instance only allows sending once, mark that we have fired
1059 # our one shot.
1061 # our one shot.
1060 if not self._hasmultiplesend:
1062 if not self._hasmultiplesend:
1061 self._canissuecommands = False
1063 self._canissuecommands = False
1062 self._cansend = False
1064 self._cansend = False
1063
1065
1064 def makeframes():
1066 def makeframes():
1065 while self._pendingrequests:
1067 while self._pendingrequests:
1066 request = self._pendingrequests.popleft()
1068 request = self._pendingrequests.popleft()
1067 for frame in self._makecommandframes(request):
1069 for frame in self._makecommandframes(request):
1068 yield frame
1070 yield frame
1069
1071
1070 return 'sendframes', {
1072 return 'sendframes', {
1071 'framegen': makeframes(),
1073 'framegen': makeframes(),
1072 }
1074 }
1073
1075
1074 def _makecommandframes(self, request):
1076 def _makecommandframes(self, request):
1075 """Emit frames to issue a command request.
1077 """Emit frames to issue a command request.
1076
1078
1077 As a side-effect, update request accounting to reflect its changed
1079 As a side-effect, update request accounting to reflect its changed
1078 state.
1080 state.
1079 """
1081 """
1080 self._activerequests[request.requestid] = request
1082 self._activerequests[request.requestid] = request
1081 request.state = 'sending'
1083 request.state = 'sending'
1082
1084
1083 res = createcommandframes(self._outgoingstream,
1085 res = createcommandframes(self._outgoingstream,
1084 request.requestid,
1086 request.requestid,
1085 request.name,
1087 request.name,
1086 request.args,
1088 request.args,
1087 request.datafh)
1089 request.datafh)
1088
1090
1089 for frame in res:
1091 for frame in res:
1090 yield frame
1092 yield frame
1091
1093
1092 request.state = 'sent'
1094 request.state = 'sent'
1093
1095
1094 def onframerecv(self, frame):
1096 def onframerecv(self, frame):
1095 """Process a frame that has been received off the wire.
1097 """Process a frame that has been received off the wire.
1096
1098
1097 Returns a 2-tuple of (action, meta) describing further action the
1099 Returns a 2-tuple of (action, meta) describing further action the
1098 caller needs to take as a result of receiving this frame.
1100 caller needs to take as a result of receiving this frame.
1099 """
1101 """
1100 if frame.streamid % 2:
1102 if frame.streamid % 2:
1101 return 'error', {
1103 return 'error', {
1102 'message': (
1104 'message': (
1103 _('received frame with odd numbered stream ID: %d') %
1105 _('received frame with odd numbered stream ID: %d') %
1104 frame.streamid),
1106 frame.streamid),
1105 }
1107 }
1106
1108
1107 if frame.streamid not in self._incomingstreams:
1109 if frame.streamid not in self._incomingstreams:
1108 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1110 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1109 return 'error', {
1111 return 'error', {
1110 'message': _('received frame on unknown stream '
1112 'message': _('received frame on unknown stream '
1111 'without beginning of stream flag set'),
1113 'without beginning of stream flag set'),
1112 }
1114 }
1113
1115
1114 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1116 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1115
1117
1116 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1118 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1117 raise error.ProgrammingError('support for decoding stream '
1119 raise error.ProgrammingError('support for decoding stream '
1118 'payloads not yet implemneted')
1120 'payloads not yet implemneted')
1119
1121
1120 if frame.streamflags & STREAM_FLAG_END_STREAM:
1122 if frame.streamflags & STREAM_FLAG_END_STREAM:
1121 del self._incomingstreams[frame.streamid]
1123 del self._incomingstreams[frame.streamid]
1122
1124
1123 if frame.requestid not in self._activerequests:
1125 if frame.requestid not in self._activerequests:
1124 return 'error', {
1126 return 'error', {
1125 'message': (_('received frame for inactive request ID: %d') %
1127 'message': (_('received frame for inactive request ID: %d') %
1126 frame.requestid),
1128 frame.requestid),
1127 }
1129 }
1128
1130
1129 request = self._activerequests[frame.requestid]
1131 request = self._activerequests[frame.requestid]
1130 request.state = 'receiving'
1132 request.state = 'receiving'
1131
1133
1132 handlers = {
1134 handlers = {
1133 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1135 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1134 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1136 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1135 }
1137 }
1136
1138
1137 meth = handlers.get(frame.typeid)
1139 meth = handlers.get(frame.typeid)
1138 if not meth:
1140 if not meth:
1139 raise error.ProgrammingError('unhandled frame type: %d' %
1141 raise error.ProgrammingError('unhandled frame type: %d' %
1140 frame.typeid)
1142 frame.typeid)
1141
1143
1142 return meth(request, frame)
1144 return meth(request, frame)
1143
1145
1144 def _oncommandresponseframe(self, request, frame):
1146 def _oncommandresponseframe(self, request, frame):
1145 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1147 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1146 request.state = 'received'
1148 request.state = 'received'
1147 del self._activerequests[request.requestid]
1149 del self._activerequests[request.requestid]
1148
1150
1149 return 'responsedata', {
1151 return 'responsedata', {
1150 'request': request,
1152 'request': request,
1151 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1153 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1152 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1154 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1153 'data': frame.payload,
1155 'data': frame.payload,
1154 }
1156 }
1155
1157
1156 def _onerrorresponseframe(self, request, frame):
1158 def _onerrorresponseframe(self, request, frame):
1157 request.state = 'errored'
1159 request.state = 'errored'
1158 del self._activerequests[request.requestid]
1160 del self._activerequests[request.requestid]
1159
1161
1160 # The payload should be a CBOR map.
1162 # The payload should be a CBOR map.
1161 m = cborutil.decodeall(frame.payload)[0]
1163 m = cborutil.decodeall(frame.payload)[0]
1162
1164
1163 return 'error', {
1165 return 'error', {
1164 'request': request,
1166 'request': request,
1165 'type': m['type'],
1167 'type': m['type'],
1166 'message': m['message'],
1168 'message': m['message'],
1167 }
1169 }
@@ -1,208 +1,210 b''
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 from .i18n import _
10 from .i18n import _
11 from . import (
11 from . import (
12 encoding,
12 encoding,
13 error,
13 error,
14 util,
14 util,
15 wireprotoframing,
15 wireprotoframing,
16 )
16 )
17 from .utils import (
17 from .utils import (
18 cborutil,
18 cborutil,
19 )
19 )
20
20
21 def formatrichmessage(atoms):
21 def formatrichmessage(atoms):
22 """Format an encoded message from the framing protocol."""
22 """Format an encoded message from the framing protocol."""
23
23
24 chunks = []
24 chunks = []
25
25
26 for atom in atoms:
26 for atom in atoms:
27 msg = _(atom[b'msg'])
27 msg = _(atom[b'msg'])
28
28
29 if b'args' in atom:
29 if b'args' in atom:
30 msg = msg % atom[b'args']
30 msg = msg % tuple(atom[b'args'])
31
31
32 chunks.append(msg)
32 chunks.append(msg)
33
33
34 return b''.join(chunks)
34 return b''.join(chunks)
35
35
36 class commandresponse(object):
36 class commandresponse(object):
37 """Represents the response to a command request."""
37 """Represents the response to a command request."""
38
38
39 def __init__(self, requestid, command):
39 def __init__(self, requestid, command):
40 self.requestid = requestid
40 self.requestid = requestid
41 self.command = command
41 self.command = command
42
42
43 self.b = util.bytesio()
43 self.b = util.bytesio()
44
44
45 def cborobjects(self):
45 def cborobjects(self):
46 """Obtain decoded CBOR objects from this response."""
46 """Obtain decoded CBOR objects from this response."""
47 self.b.seek(0)
47 self.b.seek(0)
48
48
49 for v in cborutil.decodeall(self.b.getvalue()):
49 for v in cborutil.decodeall(self.b.getvalue()):
50 yield v
50 yield v
51
51
52 class clienthandler(object):
52 class clienthandler(object):
53 """Object to handle higher-level client activities.
53 """Object to handle higher-level client activities.
54
54
55 The ``clientreactor`` is used to hold low-level state about the frame-based
55 The ``clientreactor`` is used to hold low-level state about the frame-based
56 protocol, such as which requests and streams are active. This type is used
56 protocol, such as which requests and streams are active. This type is used
57 for higher-level operations, such as reading frames from a socket, exposing
57 for higher-level operations, such as reading frames from a socket, exposing
58 and managing a higher-level primitive for representing command responses,
58 and managing a higher-level primitive for representing command responses,
59 etc. This class is what peers should probably use to bridge wire activity
59 etc. This class is what peers should probably use to bridge wire activity
60 with the higher-level peer API.
60 with the higher-level peer API.
61 """
61 """
62
62
63 def __init__(self, ui, clientreactor):
63 def __init__(self, ui, clientreactor):
64 self._ui = ui
64 self._ui = ui
65 self._reactor = clientreactor
65 self._reactor = clientreactor
66 self._requests = {}
66 self._requests = {}
67 self._futures = {}
67 self._futures = {}
68 self._responses = {}
68 self._responses = {}
69
69
70 def callcommand(self, command, args, f):
70 def callcommand(self, command, args, f):
71 """Register a request to call a command.
71 """Register a request to call a command.
72
72
73 Returns an iterable of frames that should be sent over the wire.
73 Returns an iterable of frames that should be sent over the wire.
74 """
74 """
75 request, action, meta = self._reactor.callcommand(command, args)
75 request, action, meta = self._reactor.callcommand(command, args)
76
76
77 if action != 'noop':
77 if action != 'noop':
78 raise error.ProgrammingError('%s not yet supported' % action)
78 raise error.ProgrammingError('%s not yet supported' % action)
79
79
80 rid = request.requestid
80 rid = request.requestid
81 self._requests[rid] = request
81 self._requests[rid] = request
82 self._futures[rid] = f
82 self._futures[rid] = f
83 self._responses[rid] = commandresponse(rid, command)
83 self._responses[rid] = commandresponse(rid, command)
84
84
85 return iter(())
85 return iter(())
86
86
87 def flushcommands(self):
87 def flushcommands(self):
88 """Flush all queued commands.
88 """Flush all queued commands.
89
89
90 Returns an iterable of frames that should be sent over the wire.
90 Returns an iterable of frames that should be sent over the wire.
91 """
91 """
92 action, meta = self._reactor.flushcommands()
92 action, meta = self._reactor.flushcommands()
93
93
94 if action != 'sendframes':
94 if action != 'sendframes':
95 raise error.ProgrammingError('%s not yet supported' % action)
95 raise error.ProgrammingError('%s not yet supported' % action)
96
96
97 return meta['framegen']
97 return meta['framegen']
98
98
99 def readframe(self, fh):
99 def readframe(self, fh):
100 """Attempt to read and process a frame.
100 """Attempt to read and process a frame.
101
101
102 Returns None if no frame was read. Presumably this means EOF.
102 Returns None if no frame was read. Presumably this means EOF.
103 """
103 """
104 frame = wireprotoframing.readframe(fh)
104 frame = wireprotoframing.readframe(fh)
105 if frame is None:
105 if frame is None:
106 # TODO tell reactor?
106 # TODO tell reactor?
107 return
107 return
108
108
109 self._ui.note(_('received %r\n') % frame)
109 self._ui.note(_('received %r\n') % frame)
110 self._processframe(frame)
110 self._processframe(frame)
111
111
112 return True
112 return True
113
113
114 def _processframe(self, frame):
114 def _processframe(self, frame):
115 """Process a single read frame."""
115 """Process a single read frame."""
116
116
117 action, meta = self._reactor.onframerecv(frame)
117 action, meta = self._reactor.onframerecv(frame)
118
118
119 if action == 'error':
119 if action == 'error':
120 e = error.RepoError(meta['message'])
120 e = error.RepoError(meta['message'])
121
121
122 if frame.requestid in self._futures:
122 if frame.requestid in self._futures:
123 self._futures[frame.requestid].set_exception(e)
123 self._futures[frame.requestid].set_exception(e)
124 else:
124 else:
125 raise e
125 raise e
126
126
127 if frame.requestid not in self._requests:
127 if frame.requestid not in self._requests:
128 raise error.ProgrammingError(
128 raise error.ProgrammingError(
129 'received frame for unknown request; this is either a bug in '
129 'received frame for unknown request; this is either a bug in '
130 'the clientreactor not screening for this or this instance was '
130 'the clientreactor not screening for this or this instance was '
131 'never told about this request: %r' % frame)
131 'never told about this request: %r' % frame)
132
132
133 response = self._responses[frame.requestid]
133 response = self._responses[frame.requestid]
134
134
135 if action == 'responsedata':
135 if action == 'responsedata':
136 # Any failures processing this frame should bubble up to the
136 # Any failures processing this frame should bubble up to the
137 # future tracking the request.
137 # future tracking the request.
138 try:
138 try:
139 self._processresponsedata(frame, meta, response)
139 self._processresponsedata(frame, meta, response)
140 except BaseException as e:
140 except BaseException as e:
141 self._futures[frame.requestid].set_exception(e)
141 self._futures[frame.requestid].set_exception(e)
142 else:
142 else:
143 raise error.ProgrammingError(
143 raise error.ProgrammingError(
144 'unhandled action from clientreactor: %s' % action)
144 'unhandled action from clientreactor: %s' % action)
145
145
146 def _processresponsedata(self, frame, meta, response):
146 def _processresponsedata(self, frame, meta, response):
147 # This buffers all data until end of stream is received. This
147 # This buffers all data until end of stream is received. This
148 # is bad for performance.
148 # is bad for performance.
149 # TODO make response data streamable
149 # TODO make response data streamable
150 response.b.write(meta['data'])
150 response.b.write(meta['data'])
151
151
152 if meta['eos']:
152 if meta['eos']:
153 # If the command has a decoder, resolve the future to the
153 # If the command has a decoder, resolve the future to the
154 # decoded value. Otherwise resolve to the rich response object.
154 # decoded value. Otherwise resolve to the rich response object.
155 decoder = COMMAND_DECODERS.get(response.command)
155 decoder = COMMAND_DECODERS.get(response.command)
156
156
157 # TODO consider always resolving the overall status map.
157 # TODO consider always resolving the overall status map.
158 if decoder:
158 if decoder:
159 objs = response.cborobjects()
159 objs = response.cborobjects()
160
160
161 overall = next(objs)
161 overall = next(objs)
162
162
163 if overall['status'] == 'ok':
163 if overall['status'] == 'ok':
164 self._futures[frame.requestid].set_result(decoder(objs))
164 self._futures[frame.requestid].set_result(decoder(objs))
165 else:
165 else:
166 e = error.RepoError(
166 atoms = [{'msg': overall['error']['message']}]
167 formatrichmessage(overall['error']['message']))
167 if 'args' in overall['error']:
168 atoms[0]['args'] = overall['error']['args']
169 e = error.RepoError(formatrichmessage(atoms))
168 self._futures[frame.requestid].set_exception(e)
170 self._futures[frame.requestid].set_exception(e)
169 else:
171 else:
170 self._futures[frame.requestid].set_result(response)
172 self._futures[frame.requestid].set_result(response)
171
173
172 del self._requests[frame.requestid]
174 del self._requests[frame.requestid]
173 del self._futures[frame.requestid]
175 del self._futures[frame.requestid]
174
176
175 def decodebranchmap(objs):
177 def decodebranchmap(objs):
176 # Response should be a single CBOR map of branch name to array of nodes.
178 # Response should be a single CBOR map of branch name to array of nodes.
177 bm = next(objs)
179 bm = next(objs)
178
180
179 return {encoding.tolocal(k): v for k, v in bm.items()}
181 return {encoding.tolocal(k): v for k, v in bm.items()}
180
182
181 def decodeheads(objs):
183 def decodeheads(objs):
182 # Array of node bytestrings.
184 # Array of node bytestrings.
183 return next(objs)
185 return next(objs)
184
186
185 def decodeknown(objs):
187 def decodeknown(objs):
186 # Bytestring where each byte is a 0 or 1.
188 # Bytestring where each byte is a 0 or 1.
187 raw = next(objs)
189 raw = next(objs)
188
190
189 return [True if c == '1' else False for c in raw]
191 return [True if c == '1' else False for c in raw]
190
192
191 def decodelistkeys(objs):
193 def decodelistkeys(objs):
192 # Map with bytestring keys and values.
194 # Map with bytestring keys and values.
193 return next(objs)
195 return next(objs)
194
196
195 def decodelookup(objs):
197 def decodelookup(objs):
196 return next(objs)
198 return next(objs)
197
199
198 def decodepushkey(objs):
200 def decodepushkey(objs):
199 return next(objs)
201 return next(objs)
200
202
201 COMMAND_DECODERS = {
203 COMMAND_DECODERS = {
202 'branchmap': decodebranchmap,
204 'branchmap': decodebranchmap,
203 'heads': decodeheads,
205 'heads': decodeheads,
204 'known': decodeknown,
206 'known': decodeknown,
205 'listkeys': decodelistkeys,
207 'listkeys': decodelistkeys,
206 'lookup': decodelookup,
208 'lookup': decodelookup,
207 'pushkey': decodepushkey,
209 'pushkey': decodepushkey,
208 }
210 }
General Comments 0
You need to be logged in to leave comments. Login now