##// END OF EJS Templates
wireprotov2: add support for more response types...
Gregory Szorc -
r37746:564a3eec default
parent child Browse files
Show More
@@ -1,1078 +1,1167
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 cbor,
20 cbor,
21 )
21 )
22 from . import (
22 from . import (
23 encoding,
23 encoding,
24 error,
24 error,
25 util,
25 util,
26 )
26 )
27 from .utils import (
27 from .utils import (
28 stringutil,
28 stringutil,
29 )
29 )
30
30
31 FRAME_HEADER_SIZE = 8
31 FRAME_HEADER_SIZE = 8
32 DEFAULT_MAX_FRAME_SIZE = 32768
32 DEFAULT_MAX_FRAME_SIZE = 32768
33
33
34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_END_STREAM = 0x02
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37
37
38 STREAM_FLAGS = {
38 STREAM_FLAGS = {
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 }
42 }
43
43
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 FRAME_TYPE_COMMAND_DATA = 0x02
45 FRAME_TYPE_COMMAND_DATA = 0x02
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
46 FRAME_TYPE_COMMAND_RESPONSE = 0x03
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_PROGRESS = 0x07
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51
51
52 FRAME_TYPES = {
52 FRAME_TYPES = {
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
55 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 b'progress': FRAME_TYPE_PROGRESS,
58 b'progress': FRAME_TYPE_PROGRESS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 }
60 }
61
61
62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66
66
67 FLAGS_COMMAND_REQUEST = {
67 FLAGS_COMMAND_REQUEST = {
68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 }
72 }
73
73
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 FLAG_COMMAND_DATA_EOS = 0x02
75 FLAG_COMMAND_DATA_EOS = 0x02
76
76
77 FLAGS_COMMAND_DATA = {
77 FLAGS_COMMAND_DATA = {
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 b'eos': FLAG_COMMAND_DATA_EOS,
79 b'eos': FLAG_COMMAND_DATA_EOS,
80 }
80 }
81
81
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
82 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
83 FLAG_COMMAND_RESPONSE_EOS = 0x02
84
84
85 FLAGS_COMMAND_RESPONSE = {
85 FLAGS_COMMAND_RESPONSE = {
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
86 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
87 b'eos': FLAG_COMMAND_RESPONSE_EOS,
88 }
88 }
89
89
90 # Maps frame types to their available flags.
90 # Maps frame types to their available flags.
91 FRAME_TYPE_FLAGS = {
91 FRAME_TYPE_FLAGS = {
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
92 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
94 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
95 FRAME_TYPE_ERROR_RESPONSE: {},
95 FRAME_TYPE_ERROR_RESPONSE: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
97 FRAME_TYPE_PROGRESS: {},
97 FRAME_TYPE_PROGRESS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
98 FRAME_TYPE_STREAM_SETTINGS: {},
99 }
99 }
100
100
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
101 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
102
102
103 def humanflags(mapping, value):
103 def humanflags(mapping, value):
104 """Convert a numeric flags value to a human value, using a mapping table."""
104 """Convert a numeric flags value to a human value, using a mapping table."""
105 namemap = {v: k for k, v in mapping.iteritems()}
105 namemap = {v: k for k, v in mapping.iteritems()}
106 flags = []
106 flags = []
107 val = 1
107 val = 1
108 while value >= val:
108 while value >= val:
109 if value & val:
109 if value & val:
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
110 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
111 val <<= 1
111 val <<= 1
112
112
113 return b'|'.join(flags)
113 return b'|'.join(flags)
114
114
115 @attr.s(slots=True)
115 @attr.s(slots=True)
116 class frameheader(object):
116 class frameheader(object):
117 """Represents the data in a frame header."""
117 """Represents the data in a frame header."""
118
118
119 length = attr.ib()
119 length = attr.ib()
120 requestid = attr.ib()
120 requestid = attr.ib()
121 streamid = attr.ib()
121 streamid = attr.ib()
122 streamflags = attr.ib()
122 streamflags = attr.ib()
123 typeid = attr.ib()
123 typeid = attr.ib()
124 flags = attr.ib()
124 flags = attr.ib()
125
125
126 @attr.s(slots=True, repr=False)
126 @attr.s(slots=True, repr=False)
127 class frame(object):
127 class frame(object):
128 """Represents a parsed frame."""
128 """Represents a parsed frame."""
129
129
130 requestid = attr.ib()
130 requestid = attr.ib()
131 streamid = attr.ib()
131 streamid = attr.ib()
132 streamflags = attr.ib()
132 streamflags = attr.ib()
133 typeid = attr.ib()
133 typeid = attr.ib()
134 flags = attr.ib()
134 flags = attr.ib()
135 payload = attr.ib()
135 payload = attr.ib()
136
136
137 @encoding.strmethod
137 @encoding.strmethod
138 def __repr__(self):
138 def __repr__(self):
139 typename = '<unknown 0x%02x>' % self.typeid
139 typename = '<unknown 0x%02x>' % self.typeid
140 for name, value in FRAME_TYPES.iteritems():
140 for name, value in FRAME_TYPES.iteritems():
141 if value == self.typeid:
141 if value == self.typeid:
142 typename = name
142 typename = name
143 break
143 break
144
144
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
145 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
146 'type=%s; flags=%s)' % (
146 'type=%s; flags=%s)' % (
147 len(self.payload), self.requestid, self.streamid,
147 len(self.payload), self.requestid, self.streamid,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
148 humanflags(STREAM_FLAGS, self.streamflags), typename,
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
149 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
150
150
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
151 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
152 """Assemble a frame into a byte array."""
152 """Assemble a frame into a byte array."""
153 # TODO assert size of payload.
153 # TODO assert size of payload.
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
154 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
155
155
156 # 24 bits length
156 # 24 bits length
157 # 16 bits request id
157 # 16 bits request id
158 # 8 bits stream id
158 # 8 bits stream id
159 # 8 bits stream flags
159 # 8 bits stream flags
160 # 4 bits type
160 # 4 bits type
161 # 4 bits flags
161 # 4 bits flags
162
162
163 l = struct.pack(r'<I', len(payload))
163 l = struct.pack(r'<I', len(payload))
164 frame[0:3] = l[0:3]
164 frame[0:3] = l[0:3]
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
165 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
166 frame[7] = (typeid << 4) | flags
166 frame[7] = (typeid << 4) | flags
167 frame[8:] = payload
167 frame[8:] = payload
168
168
169 return frame
169 return frame
170
170
171 def makeframefromhumanstring(s):
171 def makeframefromhumanstring(s):
172 """Create a frame from a human readable string
172 """Create a frame from a human readable string
173
173
174 Strings have the form:
174 Strings have the form:
175
175
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
176 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
177
177
178 This can be used by user-facing applications and tests for creating
178 This can be used by user-facing applications and tests for creating
179 frames easily without having to type out a bunch of constants.
179 frames easily without having to type out a bunch of constants.
180
180
181 Request ID and stream IDs are integers.
181 Request ID and stream IDs are integers.
182
182
183 Stream flags, frame type, and flags can be specified by integer or
183 Stream flags, frame type, and flags can be specified by integer or
184 named constant.
184 named constant.
185
185
186 Flags can be delimited by `|` to bitwise OR them together.
186 Flags can be delimited by `|` to bitwise OR them together.
187
187
188 If the payload begins with ``cbor:``, the following string will be
188 If the payload begins with ``cbor:``, the following string will be
189 evaluated as Python literal and the resulting object will be fed into
189 evaluated as Python literal and the resulting object will be fed into
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
190 a CBOR encoder. Otherwise, the payload is interpreted as a Python
191 byte string literal.
191 byte string literal.
192 """
192 """
193 fields = s.split(b' ', 5)
193 fields = s.split(b' ', 5)
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
194 requestid, streamid, streamflags, frametype, frameflags, payload = fields
195
195
196 requestid = int(requestid)
196 requestid = int(requestid)
197 streamid = int(streamid)
197 streamid = int(streamid)
198
198
199 finalstreamflags = 0
199 finalstreamflags = 0
200 for flag in streamflags.split(b'|'):
200 for flag in streamflags.split(b'|'):
201 if flag in STREAM_FLAGS:
201 if flag in STREAM_FLAGS:
202 finalstreamflags |= STREAM_FLAGS[flag]
202 finalstreamflags |= STREAM_FLAGS[flag]
203 else:
203 else:
204 finalstreamflags |= int(flag)
204 finalstreamflags |= int(flag)
205
205
206 if frametype in FRAME_TYPES:
206 if frametype in FRAME_TYPES:
207 frametype = FRAME_TYPES[frametype]
207 frametype = FRAME_TYPES[frametype]
208 else:
208 else:
209 frametype = int(frametype)
209 frametype = int(frametype)
210
210
211 finalflags = 0
211 finalflags = 0
212 validflags = FRAME_TYPE_FLAGS[frametype]
212 validflags = FRAME_TYPE_FLAGS[frametype]
213 for flag in frameflags.split(b'|'):
213 for flag in frameflags.split(b'|'):
214 if flag in validflags:
214 if flag in validflags:
215 finalflags |= validflags[flag]
215 finalflags |= validflags[flag]
216 else:
216 else:
217 finalflags |= int(flag)
217 finalflags |= int(flag)
218
218
219 if payload.startswith(b'cbor:'):
219 if payload.startswith(b'cbor:'):
220 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
220 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
221 canonical=True)
221 canonical=True)
222
222
223 else:
223 else:
224 payload = stringutil.unescapestr(payload)
224 payload = stringutil.unescapestr(payload)
225
225
226 return makeframe(requestid=requestid, streamid=streamid,
226 return makeframe(requestid=requestid, streamid=streamid,
227 streamflags=finalstreamflags, typeid=frametype,
227 streamflags=finalstreamflags, typeid=frametype,
228 flags=finalflags, payload=payload)
228 flags=finalflags, payload=payload)
229
229
230 def parseheader(data):
230 def parseheader(data):
231 """Parse a unified framing protocol frame header from a buffer.
231 """Parse a unified framing protocol frame header from a buffer.
232
232
233 The header is expected to be in the buffer at offset 0 and the
233 The header is expected to be in the buffer at offset 0 and the
234 buffer is expected to be large enough to hold a full header.
234 buffer is expected to be large enough to hold a full header.
235 """
235 """
236 # 24 bits payload length (little endian)
236 # 24 bits payload length (little endian)
237 # 16 bits request ID
237 # 16 bits request ID
238 # 8 bits stream ID
238 # 8 bits stream ID
239 # 8 bits stream flags
239 # 8 bits stream flags
240 # 4 bits frame type
240 # 4 bits frame type
241 # 4 bits frame flags
241 # 4 bits frame flags
242 # ... payload
242 # ... payload
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
243 framelength = data[0] + 256 * data[1] + 16384 * data[2]
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
244 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
245 typeflags = data[7]
245 typeflags = data[7]
246
246
247 frametype = (typeflags & 0xf0) >> 4
247 frametype = (typeflags & 0xf0) >> 4
248 frameflags = typeflags & 0x0f
248 frameflags = typeflags & 0x0f
249
249
250 return frameheader(framelength, requestid, streamid, streamflags,
250 return frameheader(framelength, requestid, streamid, streamflags,
251 frametype, frameflags)
251 frametype, frameflags)
252
252
253 def readframe(fh):
253 def readframe(fh):
254 """Read a unified framing protocol frame from a file object.
254 """Read a unified framing protocol frame from a file object.
255
255
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
256 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
257 None if no frame is available. May raise if a malformed frame is
257 None if no frame is available. May raise if a malformed frame is
258 seen.
258 seen.
259 """
259 """
260 header = bytearray(FRAME_HEADER_SIZE)
260 header = bytearray(FRAME_HEADER_SIZE)
261
261
262 readcount = fh.readinto(header)
262 readcount = fh.readinto(header)
263
263
264 if readcount == 0:
264 if readcount == 0:
265 return None
265 return None
266
266
267 if readcount != FRAME_HEADER_SIZE:
267 if readcount != FRAME_HEADER_SIZE:
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
268 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
269 (readcount, header))
269 (readcount, header))
270
270
271 h = parseheader(header)
271 h = parseheader(header)
272
272
273 payload = fh.read(h.length)
273 payload = fh.read(h.length)
274 if len(payload) != h.length:
274 if len(payload) != h.length:
275 raise error.Abort(_('frame length error: expected %d; got %d') %
275 raise error.Abort(_('frame length error: expected %d; got %d') %
276 (h.length, len(payload)))
276 (h.length, len(payload)))
277
277
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
278 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
279 payload)
279 payload)
280
280
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
281 def createcommandframes(stream, requestid, cmd, args, datafh=None,
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
282 maxframesize=DEFAULT_MAX_FRAME_SIZE):
283 """Create frames necessary to transmit a request to run a command.
283 """Create frames necessary to transmit a request to run a command.
284
284
285 This is a generator of bytearrays. Each item represents a frame
285 This is a generator of bytearrays. Each item represents a frame
286 ready to be sent over the wire to a peer.
286 ready to be sent over the wire to a peer.
287 """
287 """
288 data = {b'name': cmd}
288 data = {b'name': cmd}
289 if args:
289 if args:
290 data[b'args'] = args
290 data[b'args'] = args
291
291
292 data = cbor.dumps(data, canonical=True)
292 data = cbor.dumps(data, canonical=True)
293
293
294 offset = 0
294 offset = 0
295
295
296 while True:
296 while True:
297 flags = 0
297 flags = 0
298
298
299 # Must set new or continuation flag.
299 # Must set new or continuation flag.
300 if not offset:
300 if not offset:
301 flags |= FLAG_COMMAND_REQUEST_NEW
301 flags |= FLAG_COMMAND_REQUEST_NEW
302 else:
302 else:
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
303 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
304
304
305 # Data frames is set on all frames.
305 # Data frames is set on all frames.
306 if datafh:
306 if datafh:
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
307 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
308
308
309 payload = data[offset:offset + maxframesize]
309 payload = data[offset:offset + maxframesize]
310 offset += len(payload)
310 offset += len(payload)
311
311
312 if len(payload) == maxframesize and offset < len(data):
312 if len(payload) == maxframesize and offset < len(data):
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
313 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
314
314
315 yield stream.makeframe(requestid=requestid,
315 yield stream.makeframe(requestid=requestid,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
316 typeid=FRAME_TYPE_COMMAND_REQUEST,
317 flags=flags,
317 flags=flags,
318 payload=payload)
318 payload=payload)
319
319
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
320 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
321 break
321 break
322
322
323 if datafh:
323 if datafh:
324 while True:
324 while True:
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
325 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
326
326
327 done = False
327 done = False
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
328 if len(data) == DEFAULT_MAX_FRAME_SIZE:
329 flags = FLAG_COMMAND_DATA_CONTINUATION
329 flags = FLAG_COMMAND_DATA_CONTINUATION
330 else:
330 else:
331 flags = FLAG_COMMAND_DATA_EOS
331 flags = FLAG_COMMAND_DATA_EOS
332 assert datafh.read(1) == b''
332 assert datafh.read(1) == b''
333 done = True
333 done = True
334
334
335 yield stream.makeframe(requestid=requestid,
335 yield stream.makeframe(requestid=requestid,
336 typeid=FRAME_TYPE_COMMAND_DATA,
336 typeid=FRAME_TYPE_COMMAND_DATA,
337 flags=flags,
337 flags=flags,
338 payload=data)
338 payload=data)
339
339
340 if done:
340 if done:
341 break
341 break
342
342
343 def createcommandresponseframesfrombytes(stream, requestid, data,
343 def createcommandresponseframesfrombytes(stream, requestid, data,
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
344 maxframesize=DEFAULT_MAX_FRAME_SIZE):
345 """Create a raw frame to send a bytes response from static bytes input.
345 """Create a raw frame to send a bytes response from static bytes input.
346
346
347 Returns a generator of bytearrays.
347 Returns a generator of bytearrays.
348 """
348 """
349 # Automatically send the overall CBOR response map.
349 # Automatically send the overall CBOR response map.
350 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
350 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
351 if len(overall) > maxframesize:
351 if len(overall) > maxframesize:
352 raise error.ProgrammingError('not yet implemented')
352 raise error.ProgrammingError('not yet implemented')
353
353
354 # Simple case where we can fit the full response in a single frame.
354 # Simple case where we can fit the full response in a single frame.
355 if len(overall) + len(data) <= maxframesize:
355 if len(overall) + len(data) <= maxframesize:
356 flags = FLAG_COMMAND_RESPONSE_EOS
356 flags = FLAG_COMMAND_RESPONSE_EOS
357 yield stream.makeframe(requestid=requestid,
357 yield stream.makeframe(requestid=requestid,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
358 typeid=FRAME_TYPE_COMMAND_RESPONSE,
359 flags=flags,
359 flags=flags,
360 payload=overall + data)
360 payload=overall + data)
361 return
361 return
362
362
363 # It's easier to send the overall CBOR map in its own frame than to track
363 # It's easier to send the overall CBOR map in its own frame than to track
364 # offsets.
364 # offsets.
365 yield stream.makeframe(requestid=requestid,
365 yield stream.makeframe(requestid=requestid,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
366 typeid=FRAME_TYPE_COMMAND_RESPONSE,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
367 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
368 payload=overall)
368 payload=overall)
369
369
370 offset = 0
370 offset = 0
371 while True:
371 while True:
372 chunk = data[offset:offset + maxframesize]
372 chunk = data[offset:offset + maxframesize]
373 offset += len(chunk)
373 offset += len(chunk)
374 done = offset == len(data)
374 done = offset == len(data)
375
375
376 if done:
376 if done:
377 flags = FLAG_COMMAND_RESPONSE_EOS
377 flags = FLAG_COMMAND_RESPONSE_EOS
378 else:
378 else:
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
379 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
380
380
381 yield stream.makeframe(requestid=requestid,
381 yield stream.makeframe(requestid=requestid,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
382 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 flags=flags,
383 flags=flags,
384 payload=chunk)
384 payload=chunk)
385
385
386 if done:
386 if done:
387 break
387 break
388
388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
392
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
396 payload=overall)
397
398 cb = util.chunkbuffer(gen)
399
400 flags = 0
401
402 while True:
403 chunk = cb.read(maxframesize)
404 if not chunk:
405 break
406
407 yield stream.makeframe(requestid=requestid,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
409 flags=flags,
410 payload=chunk)
411
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
415 flags |= FLAG_COMMAND_RESPONSE_EOS
416 yield stream.makeframe(requestid=requestid,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
418 flags=flags,
419 payload=b'')
420
421 def createcommanderrorresponse(stream, requestid, message, args=None):
422 m = {
423 b'status': b'error',
424 b'error': {
425 b'message': message,
426 }
427 }
428
429 if args:
430 m[b'error'][b'args'] = args
431
432 overall = cbor.dumps(m, canonical=True)
433
434 yield stream.makeframe(requestid=requestid,
435 typeid=FRAME_TYPE_COMMAND_RESPONSE,
436 flags=FLAG_COMMAND_RESPONSE_EOS,
437 payload=overall)
438
389 def createerrorframe(stream, requestid, msg, errtype):
439 def createerrorframe(stream, requestid, msg, errtype):
390 # TODO properly handle frame size limits.
440 # TODO properly handle frame size limits.
391 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
392
442
393 payload = cbor.dumps({
443 payload = cbor.dumps({
394 b'type': errtype,
444 b'type': errtype,
395 b'message': [{b'msg': msg}],
445 b'message': [{b'msg': msg}],
396 }, canonical=True)
446 }, canonical=True)
397
447
398 yield stream.makeframe(requestid=requestid,
448 yield stream.makeframe(requestid=requestid,
399 typeid=FRAME_TYPE_ERROR_RESPONSE,
449 typeid=FRAME_TYPE_ERROR_RESPONSE,
400 flags=0,
450 flags=0,
401 payload=payload)
451 payload=payload)
402
452
403 def createtextoutputframe(stream, requestid, atoms,
453 def createtextoutputframe(stream, requestid, atoms,
404 maxframesize=DEFAULT_MAX_FRAME_SIZE):
454 maxframesize=DEFAULT_MAX_FRAME_SIZE):
405 """Create a text output frame to render text to people.
455 """Create a text output frame to render text to people.
406
456
407 ``atoms`` is a 3-tuple of (formatting string, args, labels).
457 ``atoms`` is a 3-tuple of (formatting string, args, labels).
408
458
409 The formatting string contains ``%s`` tokens to be replaced by the
459 The formatting string contains ``%s`` tokens to be replaced by the
410 corresponding indexed entry in ``args``. ``labels`` is an iterable of
460 corresponding indexed entry in ``args``. ``labels`` is an iterable of
411 formatters to be applied at rendering time. In terms of the ``ui``
461 formatters to be applied at rendering time. In terms of the ``ui``
412 class, each atom corresponds to a ``ui.write()``.
462 class, each atom corresponds to a ``ui.write()``.
413 """
463 """
414 atomdicts = []
464 atomdicts = []
415
465
416 for (formatting, args, labels) in atoms:
466 for (formatting, args, labels) in atoms:
417 # TODO look for localstr, other types here?
467 # TODO look for localstr, other types here?
418
468
419 if not isinstance(formatting, bytes):
469 if not isinstance(formatting, bytes):
420 raise ValueError('must use bytes formatting strings')
470 raise ValueError('must use bytes formatting strings')
421 for arg in args:
471 for arg in args:
422 if not isinstance(arg, bytes):
472 if not isinstance(arg, bytes):
423 raise ValueError('must use bytes for arguments')
473 raise ValueError('must use bytes for arguments')
424 for label in labels:
474 for label in labels:
425 if not isinstance(label, bytes):
475 if not isinstance(label, bytes):
426 raise ValueError('must use bytes for labels')
476 raise ValueError('must use bytes for labels')
427
477
428 # Formatting string must be ASCII.
478 # Formatting string must be ASCII.
429 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
479 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
430
480
431 # Arguments must be UTF-8.
481 # Arguments must be UTF-8.
432 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
482 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
433
483
434 # Labels must be ASCII.
484 # Labels must be ASCII.
435 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
485 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
436 for l in labels]
486 for l in labels]
437
487
438 atom = {b'msg': formatting}
488 atom = {b'msg': formatting}
439 if args:
489 if args:
440 atom[b'args'] = args
490 atom[b'args'] = args
441 if labels:
491 if labels:
442 atom[b'labels'] = labels
492 atom[b'labels'] = labels
443
493
444 atomdicts.append(atom)
494 atomdicts.append(atom)
445
495
446 payload = cbor.dumps(atomdicts, canonical=True)
496 payload = cbor.dumps(atomdicts, canonical=True)
447
497
448 if len(payload) > maxframesize:
498 if len(payload) > maxframesize:
449 raise ValueError('cannot encode data in a single frame')
499 raise ValueError('cannot encode data in a single frame')
450
500
451 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
452 typeid=FRAME_TYPE_TEXT_OUTPUT,
502 typeid=FRAME_TYPE_TEXT_OUTPUT,
453 flags=0,
503 flags=0,
454 payload=payload)
504 payload=payload)
455
505
456 class stream(object):
506 class stream(object):
457 """Represents a logical unidirectional series of frames."""
507 """Represents a logical unidirectional series of frames."""
458
508
459 def __init__(self, streamid, active=False):
509 def __init__(self, streamid, active=False):
460 self.streamid = streamid
510 self.streamid = streamid
461 self._active = active
511 self._active = active
462
512
463 def makeframe(self, requestid, typeid, flags, payload):
513 def makeframe(self, requestid, typeid, flags, payload):
464 """Create a frame to be sent out over this stream.
514 """Create a frame to be sent out over this stream.
465
515
466 Only returns the frame instance. Does not actually send it.
516 Only returns the frame instance. Does not actually send it.
467 """
517 """
468 streamflags = 0
518 streamflags = 0
469 if not self._active:
519 if not self._active:
470 streamflags |= STREAM_FLAG_BEGIN_STREAM
520 streamflags |= STREAM_FLAG_BEGIN_STREAM
471 self._active = True
521 self._active = True
472
522
473 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
523 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
474 payload)
524 payload)
475
525
476 def ensureserverstream(stream):
526 def ensureserverstream(stream):
477 if stream.streamid % 2:
527 if stream.streamid % 2:
478 raise error.ProgrammingError('server should only write to even '
528 raise error.ProgrammingError('server should only write to even '
479 'numbered streams; %d is not even' %
529 'numbered streams; %d is not even' %
480 stream.streamid)
530 stream.streamid)
481
531
482 class serverreactor(object):
532 class serverreactor(object):
483 """Holds state of a server handling frame-based protocol requests.
533 """Holds state of a server handling frame-based protocol requests.
484
534
485 This class is the "brain" of the unified frame-based protocol server
535 This class is the "brain" of the unified frame-based protocol server
486 component. While the protocol is stateless from the perspective of
536 component. While the protocol is stateless from the perspective of
487 requests/commands, something needs to track which frames have been
537 requests/commands, something needs to track which frames have been
488 received, what frames to expect, etc. This class is that thing.
538 received, what frames to expect, etc. This class is that thing.
489
539
490 Instances are modeled as a state machine of sorts. Instances are also
540 Instances are modeled as a state machine of sorts. Instances are also
491 reactionary to external events. The point of this class is to encapsulate
541 reactionary to external events. The point of this class is to encapsulate
492 the state of the connection and the exchange of frames, not to perform
542 the state of the connection and the exchange of frames, not to perform
493 work. Instead, callers tell this class when something occurs, like a
543 work. Instead, callers tell this class when something occurs, like a
494 frame arriving. If that activity is worthy of a follow-up action (say
544 frame arriving. If that activity is worthy of a follow-up action (say
495 *run a command*), the return value of that handler will say so.
545 *run a command*), the return value of that handler will say so.
496
546
497 I/O and CPU intensive operations are purposefully delegated outside of
547 I/O and CPU intensive operations are purposefully delegated outside of
498 this class.
548 this class.
499
549
500 Consumers are expected to tell instances when events occur. They do so by
550 Consumers are expected to tell instances when events occur. They do so by
501 calling the various ``on*`` methods. These methods return a 2-tuple
551 calling the various ``on*`` methods. These methods return a 2-tuple
502 describing any follow-up action(s) to take. The first element is the
552 describing any follow-up action(s) to take. The first element is the
503 name of an action to perform. The second is a data structure (usually
553 name of an action to perform. The second is a data structure (usually
504 a dict) specific to that action that contains more information. e.g.
554 a dict) specific to that action that contains more information. e.g.
505 if the server wants to send frames back to the client, the data structure
555 if the server wants to send frames back to the client, the data structure
506 will contain a reference to those frames.
556 will contain a reference to those frames.
507
557
508 Valid actions that consumers can be instructed to take are:
558 Valid actions that consumers can be instructed to take are:
509
559
510 sendframes
560 sendframes
511 Indicates that frames should be sent to the client. The ``framegen``
561 Indicates that frames should be sent to the client. The ``framegen``
512 key contains a generator of frames that should be sent. The server
562 key contains a generator of frames that should be sent. The server
513 assumes that all frames are sent to the client.
563 assumes that all frames are sent to the client.
514
564
515 error
565 error
516 Indicates that an error occurred. Consumer should probably abort.
566 Indicates that an error occurred. Consumer should probably abort.
517
567
518 runcommand
568 runcommand
519 Indicates that the consumer should run a wire protocol command. Details
569 Indicates that the consumer should run a wire protocol command. Details
520 of the command to run are given in the data structure.
570 of the command to run are given in the data structure.
521
571
522 wantframe
572 wantframe
523 Indicates that nothing of interest happened and the server is waiting on
573 Indicates that nothing of interest happened and the server is waiting on
524 more frames from the client before anything interesting can be done.
574 more frames from the client before anything interesting can be done.
525
575
526 noop
576 noop
527 Indicates no additional action is required.
577 Indicates no additional action is required.
528
578
529 Known Issues
579 Known Issues
530 ------------
580 ------------
531
581
532 There are no limits to the number of partially received commands or their
582 There are no limits to the number of partially received commands or their
533 size. A malicious client could stream command request data and exhaust the
583 size. A malicious client could stream command request data and exhaust the
534 server's memory.
584 server's memory.
535
585
536 Partially received commands are not acted upon when end of input is
586 Partially received commands are not acted upon when end of input is
537 reached. Should the server error if it receives a partial request?
587 reached. Should the server error if it receives a partial request?
538 Should the client send a message to abort a partially transmitted request
588 Should the client send a message to abort a partially transmitted request
539 to facilitate graceful shutdown?
589 to facilitate graceful shutdown?
540
590
541 Active requests that haven't been responded to aren't tracked. This means
591 Active requests that haven't been responded to aren't tracked. This means
542 that if we receive a command and instruct its dispatch, another command
592 that if we receive a command and instruct its dispatch, another command
543 with its request ID can come in over the wire and there will be a race
593 with its request ID can come in over the wire and there will be a race
544 between who responds to what.
594 between who responds to what.
545 """
595 """
546
596
547 def __init__(self, deferoutput=False):
597 def __init__(self, deferoutput=False):
548 """Construct a new server reactor.
598 """Construct a new server reactor.
549
599
550 ``deferoutput`` can be used to indicate that no output frames should be
600 ``deferoutput`` can be used to indicate that no output frames should be
551 instructed to be sent until input has been exhausted. In this mode,
601 instructed to be sent until input has been exhausted. In this mode,
552 events that would normally generate output frames (such as a command
602 events that would normally generate output frames (such as a command
553 response being ready) will instead defer instructing the consumer to
603 response being ready) will instead defer instructing the consumer to
554 send those frames. This is useful for half-duplex transports where the
604 send those frames. This is useful for half-duplex transports where the
555 sender cannot receive until all data has been transmitted.
605 sender cannot receive until all data has been transmitted.
556 """
606 """
557 self._deferoutput = deferoutput
607 self._deferoutput = deferoutput
558 self._state = 'idle'
608 self._state = 'idle'
559 self._nextoutgoingstreamid = 2
609 self._nextoutgoingstreamid = 2
560 self._bufferedframegens = []
610 self._bufferedframegens = []
561 # stream id -> stream instance for all active streams from the client.
611 # stream id -> stream instance for all active streams from the client.
562 self._incomingstreams = {}
612 self._incomingstreams = {}
563 self._outgoingstreams = {}
613 self._outgoingstreams = {}
564 # request id -> dict of commands that are actively being received.
614 # request id -> dict of commands that are actively being received.
565 self._receivingcommands = {}
615 self._receivingcommands = {}
566 # Request IDs that have been received and are actively being processed.
616 # Request IDs that have been received and are actively being processed.
567 # Once all output for a request has been sent, it is removed from this
617 # Once all output for a request has been sent, it is removed from this
568 # set.
618 # set.
569 self._activecommands = set()
619 self._activecommands = set()
570
620
571 def onframerecv(self, frame):
621 def onframerecv(self, frame):
572 """Process a frame that has been received off the wire.
622 """Process a frame that has been received off the wire.
573
623
574 Returns a dict with an ``action`` key that details what action,
624 Returns a dict with an ``action`` key that details what action,
575 if any, the consumer should take next.
625 if any, the consumer should take next.
576 """
626 """
577 if not frame.streamid % 2:
627 if not frame.streamid % 2:
578 self._state = 'errored'
628 self._state = 'errored'
579 return self._makeerrorresult(
629 return self._makeerrorresult(
580 _('received frame with even numbered stream ID: %d') %
630 _('received frame with even numbered stream ID: %d') %
581 frame.streamid)
631 frame.streamid)
582
632
583 if frame.streamid not in self._incomingstreams:
633 if frame.streamid not in self._incomingstreams:
584 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
634 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
585 self._state = 'errored'
635 self._state = 'errored'
586 return self._makeerrorresult(
636 return self._makeerrorresult(
587 _('received frame on unknown inactive stream without '
637 _('received frame on unknown inactive stream without '
588 'beginning of stream flag set'))
638 'beginning of stream flag set'))
589
639
590 self._incomingstreams[frame.streamid] = stream(frame.streamid)
640 self._incomingstreams[frame.streamid] = stream(frame.streamid)
591
641
592 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
642 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
593 # TODO handle decoding frames
643 # TODO handle decoding frames
594 self._state = 'errored'
644 self._state = 'errored'
595 raise error.ProgrammingError('support for decoding stream payloads '
645 raise error.ProgrammingError('support for decoding stream payloads '
596 'not yet implemented')
646 'not yet implemented')
597
647
598 if frame.streamflags & STREAM_FLAG_END_STREAM:
648 if frame.streamflags & STREAM_FLAG_END_STREAM:
599 del self._incomingstreams[frame.streamid]
649 del self._incomingstreams[frame.streamid]
600
650
601 handlers = {
651 handlers = {
602 'idle': self._onframeidle,
652 'idle': self._onframeidle,
603 'command-receiving': self._onframecommandreceiving,
653 'command-receiving': self._onframecommandreceiving,
604 'errored': self._onframeerrored,
654 'errored': self._onframeerrored,
605 }
655 }
606
656
607 meth = handlers.get(self._state)
657 meth = handlers.get(self._state)
608 if not meth:
658 if not meth:
609 raise error.ProgrammingError('unhandled state: %s' % self._state)
659 raise error.ProgrammingError('unhandled state: %s' % self._state)
610
660
611 return meth(frame)
661 return meth(frame)
612
662
613 def oncommandresponseready(self, stream, requestid, data):
663 def oncommandresponseready(self, stream, requestid, data):
614 """Signal that a bytes response is ready to be sent to the client.
664 """Signal that a bytes response is ready to be sent to the client.
615
665
616 The raw bytes response is passed as an argument.
666 The raw bytes response is passed as an argument.
617 """
667 """
618 ensureserverstream(stream)
668 ensureserverstream(stream)
619
669
620 def sendframes():
670 def sendframes():
621 for frame in createcommandresponseframesfrombytes(stream, requestid,
671 for frame in createcommandresponseframesfrombytes(stream, requestid,
622 data):
672 data):
623 yield frame
673 yield frame
624
674
625 self._activecommands.remove(requestid)
675 self._activecommands.remove(requestid)
626
676
627 result = sendframes()
677 result = sendframes()
628
678
629 if self._deferoutput:
679 if self._deferoutput:
630 self._bufferedframegens.append(result)
680 self._bufferedframegens.append(result)
631 return 'noop', {}
681 return 'noop', {}
632 else:
682 else:
633 return 'sendframes', {
683 return 'sendframes', {
634 'framegen': result,
684 'framegen': result,
635 }
685 }
636
686
687 def oncommandresponsereadygen(self, stream, requestid, gen):
688 """Signal that a bytes response is ready, with data as a generator."""
689 ensureserverstream(stream)
690
691 def sendframes():
692 for frame in createbytesresponseframesfromgen(stream, requestid,
693 gen):
694 yield frame
695
696 self._activecommands.remove(requestid)
697
698 return self._handlesendframes(sendframes())
699
637 def oninputeof(self):
700 def oninputeof(self):
638 """Signals that end of input has been received.
701 """Signals that end of input has been received.
639
702
640 No more frames will be received. All pending activity should be
703 No more frames will be received. All pending activity should be
641 completed.
704 completed.
642 """
705 """
643 # TODO should we do anything about in-flight commands?
706 # TODO should we do anything about in-flight commands?
644
707
645 if not self._deferoutput or not self._bufferedframegens:
708 if not self._deferoutput or not self._bufferedframegens:
646 return 'noop', {}
709 return 'noop', {}
647
710
648 # If we buffered all our responses, emit those.
711 # If we buffered all our responses, emit those.
649 def makegen():
712 def makegen():
650 for gen in self._bufferedframegens:
713 for gen in self._bufferedframegens:
651 for frame in gen:
714 for frame in gen:
652 yield frame
715 yield frame
653
716
654 return 'sendframes', {
717 return 'sendframes', {
655 'framegen': makegen(),
718 'framegen': makegen(),
656 }
719 }
657
720
721 def _handlesendframes(self, framegen):
722 if self._deferoutput:
723 self._bufferedframegens.append(framegen)
724 return 'noop', {}
725 else:
726 return 'sendframes', {
727 'framegen': framegen,
728 }
729
658 def onservererror(self, stream, requestid, msg):
730 def onservererror(self, stream, requestid, msg):
659 ensureserverstream(stream)
731 ensureserverstream(stream)
660
732
661 return 'sendframes', {
733 def sendframes():
662 'framegen': createerrorframe(stream, requestid, msg,
734 for frame in createerrorframe(stream, requestid, msg,
663 errtype='server'),
735 errtype='server'):
664 }
736 yield frame
737
738 self._activecommands.remove(requestid)
739
740 return self._handlesendframes(sendframes())
741
742 def oncommanderror(self, stream, requestid, message, args=None):
743 """Called when a command encountered an error before sending output."""
744 ensureserverstream(stream)
745
746 def sendframes():
747 for frame in createcommanderrorresponse(stream, requestid, message,
748 args):
749 yield frame
750
751 self._activecommands.remove(requestid)
752
753 return self._handlesendframes(sendframes())
665
754
666 def makeoutputstream(self):
755 def makeoutputstream(self):
667 """Create a stream to be used for sending data to the client."""
756 """Create a stream to be used for sending data to the client."""
668 streamid = self._nextoutgoingstreamid
757 streamid = self._nextoutgoingstreamid
669 self._nextoutgoingstreamid += 2
758 self._nextoutgoingstreamid += 2
670
759
671 s = stream(streamid)
760 s = stream(streamid)
672 self._outgoingstreams[streamid] = s
761 self._outgoingstreams[streamid] = s
673
762
674 return s
763 return s
675
764
676 def _makeerrorresult(self, msg):
765 def _makeerrorresult(self, msg):
677 return 'error', {
766 return 'error', {
678 'message': msg,
767 'message': msg,
679 }
768 }
680
769
681 def _makeruncommandresult(self, requestid):
770 def _makeruncommandresult(self, requestid):
682 entry = self._receivingcommands[requestid]
771 entry = self._receivingcommands[requestid]
683
772
684 if not entry['requestdone']:
773 if not entry['requestdone']:
685 self._state = 'errored'
774 self._state = 'errored'
686 raise error.ProgrammingError('should not be called without '
775 raise error.ProgrammingError('should not be called without '
687 'requestdone set')
776 'requestdone set')
688
777
689 del self._receivingcommands[requestid]
778 del self._receivingcommands[requestid]
690
779
691 if self._receivingcommands:
780 if self._receivingcommands:
692 self._state = 'command-receiving'
781 self._state = 'command-receiving'
693 else:
782 else:
694 self._state = 'idle'
783 self._state = 'idle'
695
784
696 # Decode the payloads as CBOR.
785 # Decode the payloads as CBOR.
697 entry['payload'].seek(0)
786 entry['payload'].seek(0)
698 request = cbor.load(entry['payload'])
787 request = cbor.load(entry['payload'])
699
788
700 if b'name' not in request:
789 if b'name' not in request:
701 self._state = 'errored'
790 self._state = 'errored'
702 return self._makeerrorresult(
791 return self._makeerrorresult(
703 _('command request missing "name" field'))
792 _('command request missing "name" field'))
704
793
705 if b'args' not in request:
794 if b'args' not in request:
706 request[b'args'] = {}
795 request[b'args'] = {}
707
796
708 assert requestid not in self._activecommands
797 assert requestid not in self._activecommands
709 self._activecommands.add(requestid)
798 self._activecommands.add(requestid)
710
799
711 return 'runcommand', {
800 return 'runcommand', {
712 'requestid': requestid,
801 'requestid': requestid,
713 'command': request[b'name'],
802 'command': request[b'name'],
714 'args': request[b'args'],
803 'args': request[b'args'],
715 'data': entry['data'].getvalue() if entry['data'] else None,
804 'data': entry['data'].getvalue() if entry['data'] else None,
716 }
805 }
717
806
718 def _makewantframeresult(self):
807 def _makewantframeresult(self):
719 return 'wantframe', {
808 return 'wantframe', {
720 'state': self._state,
809 'state': self._state,
721 }
810 }
722
811
723 def _validatecommandrequestframe(self, frame):
812 def _validatecommandrequestframe(self, frame):
724 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
813 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
725 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
814 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
726
815
727 if new and continuation:
816 if new and continuation:
728 self._state = 'errored'
817 self._state = 'errored'
729 return self._makeerrorresult(
818 return self._makeerrorresult(
730 _('received command request frame with both new and '
819 _('received command request frame with both new and '
731 'continuation flags set'))
820 'continuation flags set'))
732
821
733 if not new and not continuation:
822 if not new and not continuation:
734 self._state = 'errored'
823 self._state = 'errored'
735 return self._makeerrorresult(
824 return self._makeerrorresult(
736 _('received command request frame with neither new nor '
825 _('received command request frame with neither new nor '
737 'continuation flags set'))
826 'continuation flags set'))
738
827
739 def _onframeidle(self, frame):
828 def _onframeidle(self, frame):
740 # The only frame type that should be received in this state is a
829 # The only frame type that should be received in this state is a
741 # command request.
830 # command request.
742 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
831 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
743 self._state = 'errored'
832 self._state = 'errored'
744 return self._makeerrorresult(
833 return self._makeerrorresult(
745 _('expected command request frame; got %d') % frame.typeid)
834 _('expected command request frame; got %d') % frame.typeid)
746
835
747 res = self._validatecommandrequestframe(frame)
836 res = self._validatecommandrequestframe(frame)
748 if res:
837 if res:
749 return res
838 return res
750
839
751 if frame.requestid in self._receivingcommands:
840 if frame.requestid in self._receivingcommands:
752 self._state = 'errored'
841 self._state = 'errored'
753 return self._makeerrorresult(
842 return self._makeerrorresult(
754 _('request with ID %d already received') % frame.requestid)
843 _('request with ID %d already received') % frame.requestid)
755
844
756 if frame.requestid in self._activecommands:
845 if frame.requestid in self._activecommands:
757 self._state = 'errored'
846 self._state = 'errored'
758 return self._makeerrorresult(
847 return self._makeerrorresult(
759 _('request with ID %d is already active') % frame.requestid)
848 _('request with ID %d is already active') % frame.requestid)
760
849
761 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
850 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
762 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
851 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
763 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
852 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
764
853
765 if not new:
854 if not new:
766 self._state = 'errored'
855 self._state = 'errored'
767 return self._makeerrorresult(
856 return self._makeerrorresult(
768 _('received command request frame without new flag set'))
857 _('received command request frame without new flag set'))
769
858
770 payload = util.bytesio()
859 payload = util.bytesio()
771 payload.write(frame.payload)
860 payload.write(frame.payload)
772
861
773 self._receivingcommands[frame.requestid] = {
862 self._receivingcommands[frame.requestid] = {
774 'payload': payload,
863 'payload': payload,
775 'data': None,
864 'data': None,
776 'requestdone': not moreframes,
865 'requestdone': not moreframes,
777 'expectingdata': bool(expectingdata),
866 'expectingdata': bool(expectingdata),
778 }
867 }
779
868
780 # This is the final frame for this request. Dispatch it.
869 # This is the final frame for this request. Dispatch it.
781 if not moreframes and not expectingdata:
870 if not moreframes and not expectingdata:
782 return self._makeruncommandresult(frame.requestid)
871 return self._makeruncommandresult(frame.requestid)
783
872
784 assert moreframes or expectingdata
873 assert moreframes or expectingdata
785 self._state = 'command-receiving'
874 self._state = 'command-receiving'
786 return self._makewantframeresult()
875 return self._makewantframeresult()
787
876
788 def _onframecommandreceiving(self, frame):
877 def _onframecommandreceiving(self, frame):
789 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
878 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
790 # Process new command requests as such.
879 # Process new command requests as such.
791 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
880 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
792 return self._onframeidle(frame)
881 return self._onframeidle(frame)
793
882
794 res = self._validatecommandrequestframe(frame)
883 res = self._validatecommandrequestframe(frame)
795 if res:
884 if res:
796 return res
885 return res
797
886
798 # All other frames should be related to a command that is currently
887 # All other frames should be related to a command that is currently
799 # receiving but is not active.
888 # receiving but is not active.
800 if frame.requestid in self._activecommands:
889 if frame.requestid in self._activecommands:
801 self._state = 'errored'
890 self._state = 'errored'
802 return self._makeerrorresult(
891 return self._makeerrorresult(
803 _('received frame for request that is still active: %d') %
892 _('received frame for request that is still active: %d') %
804 frame.requestid)
893 frame.requestid)
805
894
806 if frame.requestid not in self._receivingcommands:
895 if frame.requestid not in self._receivingcommands:
807 self._state = 'errored'
896 self._state = 'errored'
808 return self._makeerrorresult(
897 return self._makeerrorresult(
809 _('received frame for request that is not receiving: %d') %
898 _('received frame for request that is not receiving: %d') %
810 frame.requestid)
899 frame.requestid)
811
900
812 entry = self._receivingcommands[frame.requestid]
901 entry = self._receivingcommands[frame.requestid]
813
902
814 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
903 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
815 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
904 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
816 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
905 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
817
906
818 if entry['requestdone']:
907 if entry['requestdone']:
819 self._state = 'errored'
908 self._state = 'errored'
820 return self._makeerrorresult(
909 return self._makeerrorresult(
821 _('received command request frame when request frames '
910 _('received command request frame when request frames '
822 'were supposedly done'))
911 'were supposedly done'))
823
912
824 if expectingdata != entry['expectingdata']:
913 if expectingdata != entry['expectingdata']:
825 self._state = 'errored'
914 self._state = 'errored'
826 return self._makeerrorresult(
915 return self._makeerrorresult(
827 _('mismatch between expect data flag and previous frame'))
916 _('mismatch between expect data flag and previous frame'))
828
917
829 entry['payload'].write(frame.payload)
918 entry['payload'].write(frame.payload)
830
919
831 if not moreframes:
920 if not moreframes:
832 entry['requestdone'] = True
921 entry['requestdone'] = True
833
922
834 if not moreframes and not expectingdata:
923 if not moreframes and not expectingdata:
835 return self._makeruncommandresult(frame.requestid)
924 return self._makeruncommandresult(frame.requestid)
836
925
837 return self._makewantframeresult()
926 return self._makewantframeresult()
838
927
839 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
928 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
840 if not entry['expectingdata']:
929 if not entry['expectingdata']:
841 self._state = 'errored'
930 self._state = 'errored'
842 return self._makeerrorresult(_(
931 return self._makeerrorresult(_(
843 'received command data frame for request that is not '
932 'received command data frame for request that is not '
844 'expecting data: %d') % frame.requestid)
933 'expecting data: %d') % frame.requestid)
845
934
846 if entry['data'] is None:
935 if entry['data'] is None:
847 entry['data'] = util.bytesio()
936 entry['data'] = util.bytesio()
848
937
849 return self._handlecommanddataframe(frame, entry)
938 return self._handlecommanddataframe(frame, entry)
850 else:
939 else:
851 self._state = 'errored'
940 self._state = 'errored'
852 return self._makeerrorresult(_(
941 return self._makeerrorresult(_(
853 'received unexpected frame type: %d') % frame.typeid)
942 'received unexpected frame type: %d') % frame.typeid)
854
943
855 def _handlecommanddataframe(self, frame, entry):
944 def _handlecommanddataframe(self, frame, entry):
856 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
945 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
857
946
858 # TODO support streaming data instead of buffering it.
947 # TODO support streaming data instead of buffering it.
859 entry['data'].write(frame.payload)
948 entry['data'].write(frame.payload)
860
949
861 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
950 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
862 return self._makewantframeresult()
951 return self._makewantframeresult()
863 elif frame.flags & FLAG_COMMAND_DATA_EOS:
952 elif frame.flags & FLAG_COMMAND_DATA_EOS:
864 entry['data'].seek(0)
953 entry['data'].seek(0)
865 return self._makeruncommandresult(frame.requestid)
954 return self._makeruncommandresult(frame.requestid)
866 else:
955 else:
867 self._state = 'errored'
956 self._state = 'errored'
868 return self._makeerrorresult(_('command data frame without '
957 return self._makeerrorresult(_('command data frame without '
869 'flags'))
958 'flags'))
870
959
871 def _onframeerrored(self, frame):
960 def _onframeerrored(self, frame):
872 return self._makeerrorresult(_('server already errored'))
961 return self._makeerrorresult(_('server already errored'))
873
962
874 class commandrequest(object):
963 class commandrequest(object):
875 """Represents a request to run a command."""
964 """Represents a request to run a command."""
876
965
877 def __init__(self, requestid, name, args, datafh=None):
966 def __init__(self, requestid, name, args, datafh=None):
878 self.requestid = requestid
967 self.requestid = requestid
879 self.name = name
968 self.name = name
880 self.args = args
969 self.args = args
881 self.datafh = datafh
970 self.datafh = datafh
882 self.state = 'pending'
971 self.state = 'pending'
883
972
884 class clientreactor(object):
973 class clientreactor(object):
885 """Holds state of a client issuing frame-based protocol requests.
974 """Holds state of a client issuing frame-based protocol requests.
886
975
887 This is like ``serverreactor`` but for client-side state.
976 This is like ``serverreactor`` but for client-side state.
888
977
889 Each instance is bound to the lifetime of a connection. For persistent
978 Each instance is bound to the lifetime of a connection. For persistent
890 connection transports using e.g. TCP sockets and speaking the raw
979 connection transports using e.g. TCP sockets and speaking the raw
891 framing protocol, there will be a single instance for the lifetime of
980 framing protocol, there will be a single instance for the lifetime of
892 the TCP socket. For transports where there are multiple discrete
981 the TCP socket. For transports where there are multiple discrete
893 interactions (say tunneled within in HTTP request), there will be a
982 interactions (say tunneled within in HTTP request), there will be a
894 separate instance for each distinct interaction.
983 separate instance for each distinct interaction.
895 """
984 """
896 def __init__(self, hasmultiplesend=False, buffersends=True):
985 def __init__(self, hasmultiplesend=False, buffersends=True):
897 """Create a new instance.
986 """Create a new instance.
898
987
899 ``hasmultiplesend`` indicates whether multiple sends are supported
988 ``hasmultiplesend`` indicates whether multiple sends are supported
900 by the transport. When True, it is possible to send commands immediately
989 by the transport. When True, it is possible to send commands immediately
901 instead of buffering until the caller signals an intent to finish a
990 instead of buffering until the caller signals an intent to finish a
902 send operation.
991 send operation.
903
992
904 ``buffercommands`` indicates whether sends should be buffered until the
993 ``buffercommands`` indicates whether sends should be buffered until the
905 last request has been issued.
994 last request has been issued.
906 """
995 """
907 self._hasmultiplesend = hasmultiplesend
996 self._hasmultiplesend = hasmultiplesend
908 self._buffersends = buffersends
997 self._buffersends = buffersends
909
998
910 self._canissuecommands = True
999 self._canissuecommands = True
911 self._cansend = True
1000 self._cansend = True
912
1001
913 self._nextrequestid = 1
1002 self._nextrequestid = 1
914 # We only support a single outgoing stream for now.
1003 # We only support a single outgoing stream for now.
915 self._outgoingstream = stream(1)
1004 self._outgoingstream = stream(1)
916 self._pendingrequests = collections.deque()
1005 self._pendingrequests = collections.deque()
917 self._activerequests = {}
1006 self._activerequests = {}
918 self._incomingstreams = {}
1007 self._incomingstreams = {}
919
1008
920 def callcommand(self, name, args, datafh=None):
1009 def callcommand(self, name, args, datafh=None):
921 """Request that a command be executed.
1010 """Request that a command be executed.
922
1011
923 Receives the command name, a dict of arguments to pass to the command,
1012 Receives the command name, a dict of arguments to pass to the command,
924 and an optional file object containing the raw data for the command.
1013 and an optional file object containing the raw data for the command.
925
1014
926 Returns a 3-tuple of (request, action, action data).
1015 Returns a 3-tuple of (request, action, action data).
927 """
1016 """
928 if not self._canissuecommands:
1017 if not self._canissuecommands:
929 raise error.ProgrammingError('cannot issue new commands')
1018 raise error.ProgrammingError('cannot issue new commands')
930
1019
931 requestid = self._nextrequestid
1020 requestid = self._nextrequestid
932 self._nextrequestid += 2
1021 self._nextrequestid += 2
933
1022
934 request = commandrequest(requestid, name, args, datafh=datafh)
1023 request = commandrequest(requestid, name, args, datafh=datafh)
935
1024
936 if self._buffersends:
1025 if self._buffersends:
937 self._pendingrequests.append(request)
1026 self._pendingrequests.append(request)
938 return request, 'noop', {}
1027 return request, 'noop', {}
939 else:
1028 else:
940 if not self._cansend:
1029 if not self._cansend:
941 raise error.ProgrammingError('sends cannot be performed on '
1030 raise error.ProgrammingError('sends cannot be performed on '
942 'this instance')
1031 'this instance')
943
1032
944 if not self._hasmultiplesend:
1033 if not self._hasmultiplesend:
945 self._cansend = False
1034 self._cansend = False
946 self._canissuecommands = False
1035 self._canissuecommands = False
947
1036
948 return request, 'sendframes', {
1037 return request, 'sendframes', {
949 'framegen': self._makecommandframes(request),
1038 'framegen': self._makecommandframes(request),
950 }
1039 }
951
1040
952 def flushcommands(self):
1041 def flushcommands(self):
953 """Request that all queued commands be sent.
1042 """Request that all queued commands be sent.
954
1043
955 If any commands are buffered, this will instruct the caller to send
1044 If any commands are buffered, this will instruct the caller to send
956 them over the wire. If no commands are buffered it instructs the client
1045 them over the wire. If no commands are buffered it instructs the client
957 to no-op.
1046 to no-op.
958
1047
959 If instances aren't configured for multiple sends, no new command
1048 If instances aren't configured for multiple sends, no new command
960 requests are allowed after this is called.
1049 requests are allowed after this is called.
961 """
1050 """
962 if not self._pendingrequests:
1051 if not self._pendingrequests:
963 return 'noop', {}
1052 return 'noop', {}
964
1053
965 if not self._cansend:
1054 if not self._cansend:
966 raise error.ProgrammingError('sends cannot be performed on this '
1055 raise error.ProgrammingError('sends cannot be performed on this '
967 'instance')
1056 'instance')
968
1057
969 # If the instance only allows sending once, mark that we have fired
1058 # If the instance only allows sending once, mark that we have fired
970 # our one shot.
1059 # our one shot.
971 if not self._hasmultiplesend:
1060 if not self._hasmultiplesend:
972 self._canissuecommands = False
1061 self._canissuecommands = False
973 self._cansend = False
1062 self._cansend = False
974
1063
975 def makeframes():
1064 def makeframes():
976 while self._pendingrequests:
1065 while self._pendingrequests:
977 request = self._pendingrequests.popleft()
1066 request = self._pendingrequests.popleft()
978 for frame in self._makecommandframes(request):
1067 for frame in self._makecommandframes(request):
979 yield frame
1068 yield frame
980
1069
981 return 'sendframes', {
1070 return 'sendframes', {
982 'framegen': makeframes(),
1071 'framegen': makeframes(),
983 }
1072 }
984
1073
985 def _makecommandframes(self, request):
1074 def _makecommandframes(self, request):
986 """Emit frames to issue a command request.
1075 """Emit frames to issue a command request.
987
1076
988 As a side-effect, update request accounting to reflect its changed
1077 As a side-effect, update request accounting to reflect its changed
989 state.
1078 state.
990 """
1079 """
991 self._activerequests[request.requestid] = request
1080 self._activerequests[request.requestid] = request
992 request.state = 'sending'
1081 request.state = 'sending'
993
1082
994 res = createcommandframes(self._outgoingstream,
1083 res = createcommandframes(self._outgoingstream,
995 request.requestid,
1084 request.requestid,
996 request.name,
1085 request.name,
997 request.args,
1086 request.args,
998 request.datafh)
1087 request.datafh)
999
1088
1000 for frame in res:
1089 for frame in res:
1001 yield frame
1090 yield frame
1002
1091
1003 request.state = 'sent'
1092 request.state = 'sent'
1004
1093
1005 def onframerecv(self, frame):
1094 def onframerecv(self, frame):
1006 """Process a frame that has been received off the wire.
1095 """Process a frame that has been received off the wire.
1007
1096
1008 Returns a 2-tuple of (action, meta) describing further action the
1097 Returns a 2-tuple of (action, meta) describing further action the
1009 caller needs to take as a result of receiving this frame.
1098 caller needs to take as a result of receiving this frame.
1010 """
1099 """
1011 if frame.streamid % 2:
1100 if frame.streamid % 2:
1012 return 'error', {
1101 return 'error', {
1013 'message': (
1102 'message': (
1014 _('received frame with odd numbered stream ID: %d') %
1103 _('received frame with odd numbered stream ID: %d') %
1015 frame.streamid),
1104 frame.streamid),
1016 }
1105 }
1017
1106
1018 if frame.streamid not in self._incomingstreams:
1107 if frame.streamid not in self._incomingstreams:
1019 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1108 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1020 return 'error', {
1109 return 'error', {
1021 'message': _('received frame on unknown stream '
1110 'message': _('received frame on unknown stream '
1022 'without beginning of stream flag set'),
1111 'without beginning of stream flag set'),
1023 }
1112 }
1024
1113
1025 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1114 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1026
1115
1027 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1116 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1028 raise error.ProgrammingError('support for decoding stream '
1117 raise error.ProgrammingError('support for decoding stream '
1029 'payloads not yet implemneted')
1118 'payloads not yet implemneted')
1030
1119
1031 if frame.streamflags & STREAM_FLAG_END_STREAM:
1120 if frame.streamflags & STREAM_FLAG_END_STREAM:
1032 del self._incomingstreams[frame.streamid]
1121 del self._incomingstreams[frame.streamid]
1033
1122
1034 if frame.requestid not in self._activerequests:
1123 if frame.requestid not in self._activerequests:
1035 return 'error', {
1124 return 'error', {
1036 'message': (_('received frame for inactive request ID: %d') %
1125 'message': (_('received frame for inactive request ID: %d') %
1037 frame.requestid),
1126 frame.requestid),
1038 }
1127 }
1039
1128
1040 request = self._activerequests[frame.requestid]
1129 request = self._activerequests[frame.requestid]
1041 request.state = 'receiving'
1130 request.state = 'receiving'
1042
1131
1043 handlers = {
1132 handlers = {
1044 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1133 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1045 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1134 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1046 }
1135 }
1047
1136
1048 meth = handlers.get(frame.typeid)
1137 meth = handlers.get(frame.typeid)
1049 if not meth:
1138 if not meth:
1050 raise error.ProgrammingError('unhandled frame type: %d' %
1139 raise error.ProgrammingError('unhandled frame type: %d' %
1051 frame.typeid)
1140 frame.typeid)
1052
1141
1053 return meth(request, frame)
1142 return meth(request, frame)
1054
1143
1055 def _oncommandresponseframe(self, request, frame):
1144 def _oncommandresponseframe(self, request, frame):
1056 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1145 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1057 request.state = 'received'
1146 request.state = 'received'
1058 del self._activerequests[request.requestid]
1147 del self._activerequests[request.requestid]
1059
1148
1060 return 'responsedata', {
1149 return 'responsedata', {
1061 'request': request,
1150 'request': request,
1062 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1151 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1063 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1152 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1064 'data': frame.payload,
1153 'data': frame.payload,
1065 }
1154 }
1066
1155
1067 def _onerrorresponseframe(self, request, frame):
1156 def _onerrorresponseframe(self, request, frame):
1068 request.state = 'errored'
1157 request.state = 'errored'
1069 del self._activerequests[request.requestid]
1158 del self._activerequests[request.requestid]
1070
1159
1071 # The payload should be a CBOR map.
1160 # The payload should be a CBOR map.
1072 m = cbor.loads(frame.payload)
1161 m = cbor.loads(frame.payload)
1073
1162
1074 return 'error', {
1163 return 'error', {
1075 'request': request,
1164 'request': request,
1076 'type': m['type'],
1165 'type': m['type'],
1077 'message': m['message'],
1166 'message': m['message'],
1078 }
1167 }
@@ -1,227 +1,243
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 from .node import (
8 from .node import (
9 bin,
9 bin,
10 hex,
10 hex,
11 )
11 )
12 from .thirdparty.zope import (
12 from .thirdparty.zope import (
13 interface as zi,
13 interface as zi,
14 )
14 )
15
15
16 # Names of the SSH protocol implementations.
16 # Names of the SSH protocol implementations.
17 SSHV1 = 'ssh-v1'
17 SSHV1 = 'ssh-v1'
18 # These are advertised over the wire. Increment the counters at the end
18 # These are advertised over the wire. Increment the counters at the end
19 # to reflect BC breakages.
19 # to reflect BC breakages.
20 SSHV2 = 'exp-ssh-v2-0001'
20 SSHV2 = 'exp-ssh-v2-0001'
21 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
21 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
22
22
23 # All available wire protocol transports.
23 # All available wire protocol transports.
24 TRANSPORTS = {
24 TRANSPORTS = {
25 SSHV1: {
25 SSHV1: {
26 'transport': 'ssh',
26 'transport': 'ssh',
27 'version': 1,
27 'version': 1,
28 },
28 },
29 SSHV2: {
29 SSHV2: {
30 'transport': 'ssh',
30 'transport': 'ssh',
31 # TODO mark as version 2 once all commands are implemented.
31 # TODO mark as version 2 once all commands are implemented.
32 'version': 1,
32 'version': 1,
33 },
33 },
34 'http-v1': {
34 'http-v1': {
35 'transport': 'http',
35 'transport': 'http',
36 'version': 1,
36 'version': 1,
37 },
37 },
38 HTTP_WIREPROTO_V2: {
38 HTTP_WIREPROTO_V2: {
39 'transport': 'http',
39 'transport': 'http',
40 'version': 2,
40 'version': 2,
41 }
41 }
42 }
42 }
43
43
44 class bytesresponse(object):
44 class bytesresponse(object):
45 """A wire protocol response consisting of raw bytes."""
45 """A wire protocol response consisting of raw bytes."""
46 def __init__(self, data):
46 def __init__(self, data):
47 self.data = data
47 self.data = data
48
48
49 class ooberror(object):
49 class ooberror(object):
50 """wireproto reply: failure of a batch of operation
50 """wireproto reply: failure of a batch of operation
51
51
52 Something failed during a batch call. The error message is stored in
52 Something failed during a batch call. The error message is stored in
53 `self.message`.
53 `self.message`.
54 """
54 """
55 def __init__(self, message):
55 def __init__(self, message):
56 self.message = message
56 self.message = message
57
57
58 class pushres(object):
58 class pushres(object):
59 """wireproto reply: success with simple integer return
59 """wireproto reply: success with simple integer return
60
60
61 The call was successful and returned an integer contained in `self.res`.
61 The call was successful and returned an integer contained in `self.res`.
62 """
62 """
63 def __init__(self, res, output):
63 def __init__(self, res, output):
64 self.res = res
64 self.res = res
65 self.output = output
65 self.output = output
66
66
67 class pusherr(object):
67 class pusherr(object):
68 """wireproto reply: failure
68 """wireproto reply: failure
69
69
70 The call failed. The `self.res` attribute contains the error message.
70 The call failed. The `self.res` attribute contains the error message.
71 """
71 """
72 def __init__(self, res, output):
72 def __init__(self, res, output):
73 self.res = res
73 self.res = res
74 self.output = output
74 self.output = output
75
75
76 class streamres(object):
76 class streamres(object):
77 """wireproto reply: binary stream
77 """wireproto reply: binary stream
78
78
79 The call was successful and the result is a stream.
79 The call was successful and the result is a stream.
80
80
81 Accepts a generator containing chunks of data to be sent to the client.
81 Accepts a generator containing chunks of data to be sent to the client.
82
82
83 ``prefer_uncompressed`` indicates that the data is expected to be
83 ``prefer_uncompressed`` indicates that the data is expected to be
84 uncompressable and that the stream should therefore use the ``none``
84 uncompressable and that the stream should therefore use the ``none``
85 engine.
85 engine.
86 """
86 """
87 def __init__(self, gen=None, prefer_uncompressed=False):
87 def __init__(self, gen=None, prefer_uncompressed=False):
88 self.gen = gen
88 self.gen = gen
89 self.prefer_uncompressed = prefer_uncompressed
89 self.prefer_uncompressed = prefer_uncompressed
90
90
91 class streamreslegacy(object):
91 class streamreslegacy(object):
92 """wireproto reply: uncompressed binary stream
92 """wireproto reply: uncompressed binary stream
93
93
94 The call was successful and the result is a stream.
94 The call was successful and the result is a stream.
95
95
96 Accepts a generator containing chunks of data to be sent to the client.
96 Accepts a generator containing chunks of data to be sent to the client.
97
97
98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 using the application/mercurial-0.1 media type.
99 using the application/mercurial-0.1 media type.
100 """
100 """
101 def __init__(self, gen=None):
101 def __init__(self, gen=None):
102 self.gen = gen
102 self.gen = gen
103
103
104 class cborresponse(object):
104 class cborresponse(object):
105 """Encode the response value as CBOR."""
105 """Encode the response value as CBOR."""
106 def __init__(self, v):
106 def __init__(self, v):
107 self.value = v
107 self.value = v
108
108
109 class v2errorresponse(object):
110 """Represents a command error for version 2 transports."""
111 def __init__(self, message, args=None):
112 self.message = message
113 self.args = args
114
115 class v2streamingresponse(object):
116 """A response whose data is supplied by a generator.
117
118 The generator can either consist of data structures to CBOR
119 encode or a stream of already-encoded bytes.
120 """
121 def __init__(self, gen, compressible=True):
122 self.gen = gen
123 self.compressible = compressible
124
109 # list of nodes encoding / decoding
125 # list of nodes encoding / decoding
110 def decodelist(l, sep=' '):
126 def decodelist(l, sep=' '):
111 if l:
127 if l:
112 return [bin(v) for v in l.split(sep)]
128 return [bin(v) for v in l.split(sep)]
113 return []
129 return []
114
130
115 def encodelist(l, sep=' '):
131 def encodelist(l, sep=' '):
116 try:
132 try:
117 return sep.join(map(hex, l))
133 return sep.join(map(hex, l))
118 except TypeError:
134 except TypeError:
119 raise
135 raise
120
136
121 # batched call argument encoding
137 # batched call argument encoding
122
138
123 def escapebatcharg(plain):
139 def escapebatcharg(plain):
124 return (plain
140 return (plain
125 .replace(':', ':c')
141 .replace(':', ':c')
126 .replace(',', ':o')
142 .replace(',', ':o')
127 .replace(';', ':s')
143 .replace(';', ':s')
128 .replace('=', ':e'))
144 .replace('=', ':e'))
129
145
130 def unescapebatcharg(escaped):
146 def unescapebatcharg(escaped):
131 return (escaped
147 return (escaped
132 .replace(':e', '=')
148 .replace(':e', '=')
133 .replace(':s', ';')
149 .replace(':s', ';')
134 .replace(':o', ',')
150 .replace(':o', ',')
135 .replace(':c', ':'))
151 .replace(':c', ':'))
136
152
137 # mapping of options accepted by getbundle and their types
153 # mapping of options accepted by getbundle and their types
138 #
154 #
139 # Meant to be extended by extensions. It is extensions responsibility to ensure
155 # Meant to be extended by extensions. It is extensions responsibility to ensure
140 # such options are properly processed in exchange.getbundle.
156 # such options are properly processed in exchange.getbundle.
141 #
157 #
142 # supported types are:
158 # supported types are:
143 #
159 #
144 # :nodes: list of binary nodes
160 # :nodes: list of binary nodes
145 # :csv: list of comma-separated values
161 # :csv: list of comma-separated values
146 # :scsv: list of comma-separated values return as set
162 # :scsv: list of comma-separated values return as set
147 # :plain: string with no transformation needed.
163 # :plain: string with no transformation needed.
148 GETBUNDLE_ARGUMENTS = {
164 GETBUNDLE_ARGUMENTS = {
149 'heads': 'nodes',
165 'heads': 'nodes',
150 'bookmarks': 'boolean',
166 'bookmarks': 'boolean',
151 'common': 'nodes',
167 'common': 'nodes',
152 'obsmarkers': 'boolean',
168 'obsmarkers': 'boolean',
153 'phases': 'boolean',
169 'phases': 'boolean',
154 'bundlecaps': 'scsv',
170 'bundlecaps': 'scsv',
155 'listkeys': 'csv',
171 'listkeys': 'csv',
156 'cg': 'boolean',
172 'cg': 'boolean',
157 'cbattempted': 'boolean',
173 'cbattempted': 'boolean',
158 'stream': 'boolean',
174 'stream': 'boolean',
159 }
175 }
160
176
161 class baseprotocolhandler(zi.Interface):
177 class baseprotocolhandler(zi.Interface):
162 """Abstract base class for wire protocol handlers.
178 """Abstract base class for wire protocol handlers.
163
179
164 A wire protocol handler serves as an interface between protocol command
180 A wire protocol handler serves as an interface between protocol command
165 handlers and the wire protocol transport layer. Protocol handlers provide
181 handlers and the wire protocol transport layer. Protocol handlers provide
166 methods to read command arguments, redirect stdio for the duration of
182 methods to read command arguments, redirect stdio for the duration of
167 the request, handle response types, etc.
183 the request, handle response types, etc.
168 """
184 """
169
185
170 name = zi.Attribute(
186 name = zi.Attribute(
171 """The name of the protocol implementation.
187 """The name of the protocol implementation.
172
188
173 Used for uniquely identifying the transport type.
189 Used for uniquely identifying the transport type.
174 """)
190 """)
175
191
176 def getargs(args):
192 def getargs(args):
177 """return the value for arguments in <args>
193 """return the value for arguments in <args>
178
194
179 For version 1 transports, returns a list of values in the same
195 For version 1 transports, returns a list of values in the same
180 order they appear in ``args``. For version 2 transports, returns
196 order they appear in ``args``. For version 2 transports, returns
181 a dict mapping argument name to value.
197 a dict mapping argument name to value.
182 """
198 """
183
199
184 def getprotocaps():
200 def getprotocaps():
185 """Returns the list of protocol-level capabilities of client
201 """Returns the list of protocol-level capabilities of client
186
202
187 Returns a list of capabilities as declared by the client for
203 Returns a list of capabilities as declared by the client for
188 the current request (or connection for stateful protocol handlers)."""
204 the current request (or connection for stateful protocol handlers)."""
189
205
190 def getpayload():
206 def getpayload():
191 """Provide a generator for the raw payload.
207 """Provide a generator for the raw payload.
192
208
193 The caller is responsible for ensuring that the full payload is
209 The caller is responsible for ensuring that the full payload is
194 processed.
210 processed.
195 """
211 """
196
212
197 def mayberedirectstdio():
213 def mayberedirectstdio():
198 """Context manager to possibly redirect stdio.
214 """Context manager to possibly redirect stdio.
199
215
200 The context manager yields a file-object like object that receives
216 The context manager yields a file-object like object that receives
201 stdout and stderr output when the context manager is active. Or it
217 stdout and stderr output when the context manager is active. Or it
202 yields ``None`` if no I/O redirection occurs.
218 yields ``None`` if no I/O redirection occurs.
203
219
204 The intent of this context manager is to capture stdio output
220 The intent of this context manager is to capture stdio output
205 so it may be sent in the response. Some transports support streaming
221 so it may be sent in the response. Some transports support streaming
206 stdio to the client in real time. For these transports, stdio output
222 stdio to the client in real time. For these transports, stdio output
207 won't be captured.
223 won't be captured.
208 """
224 """
209
225
210 def client():
226 def client():
211 """Returns a string representation of this client (as bytes)."""
227 """Returns a string representation of this client (as bytes)."""
212
228
213 def addcapabilities(repo, caps):
229 def addcapabilities(repo, caps):
214 """Adds advertised capabilities specific to this protocol.
230 """Adds advertised capabilities specific to this protocol.
215
231
216 Receives the list of capabilities collected so far.
232 Receives the list of capabilities collected so far.
217
233
218 Returns a list of capabilities. The passed in argument can be returned.
234 Returns a list of capabilities. The passed in argument can be returned.
219 """
235 """
220
236
221 def checkperm(perm):
237 def checkperm(perm):
222 """Validate that the client has permissions to perform a request.
238 """Validate that the client has permissions to perform a request.
223
239
224 The argument is the permission required to proceed. If the client
240 The argument is the permission required to proceed. If the client
225 doesn't have that permission, the exception should raise or abort
241 doesn't have that permission, the exception should raise or abort
226 in a protocol specific manner.
242 in a protocol specific manner.
227 """
243 """
@@ -1,480 +1,489
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10
10
11 from .i18n import _
11 from .i18n import _
12 from .thirdparty import (
12 from .thirdparty import (
13 cbor,
13 cbor,
14 )
14 )
15 from .thirdparty.zope import (
15 from .thirdparty.zope import (
16 interface as zi,
16 interface as zi,
17 )
17 )
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 streamclone,
22 streamclone,
23 util,
23 util,
24 wireproto,
24 wireproto,
25 wireprotoframing,
25 wireprotoframing,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28
28
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30
30
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32
32
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 from .hgweb import common as hgwebcommon
34 from .hgweb import common as hgwebcommon
35
35
36 # URL space looks like: <permissions>/<command>, where <permission> can
36 # URL space looks like: <permissions>/<command>, where <permission> can
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38
38
39 # Root URL does nothing meaningful... yet.
39 # Root URL does nothing meaningful... yet.
40 if not urlparts:
40 if not urlparts:
41 res.status = b'200 OK'
41 res.status = b'200 OK'
42 res.headers[b'Content-Type'] = b'text/plain'
42 res.headers[b'Content-Type'] = b'text/plain'
43 res.setbodybytes(_('HTTP version 2 API handler'))
43 res.setbodybytes(_('HTTP version 2 API handler'))
44 return
44 return
45
45
46 if len(urlparts) == 1:
46 if len(urlparts) == 1:
47 res.status = b'404 Not Found'
47 res.status = b'404 Not Found'
48 res.headers[b'Content-Type'] = b'text/plain'
48 res.headers[b'Content-Type'] = b'text/plain'
49 res.setbodybytes(_('do not know how to process %s\n') %
49 res.setbodybytes(_('do not know how to process %s\n') %
50 req.dispatchpath)
50 req.dispatchpath)
51 return
51 return
52
52
53 permission, command = urlparts[0:2]
53 permission, command = urlparts[0:2]
54
54
55 if permission not in (b'ro', b'rw'):
55 if permission not in (b'ro', b'rw'):
56 res.status = b'404 Not Found'
56 res.status = b'404 Not Found'
57 res.headers[b'Content-Type'] = b'text/plain'
57 res.headers[b'Content-Type'] = b'text/plain'
58 res.setbodybytes(_('unknown permission: %s') % permission)
58 res.setbodybytes(_('unknown permission: %s') % permission)
59 return
59 return
60
60
61 if req.method != 'POST':
61 if req.method != 'POST':
62 res.status = b'405 Method Not Allowed'
62 res.status = b'405 Method Not Allowed'
63 res.headers[b'Allow'] = b'POST'
63 res.headers[b'Allow'] = b'POST'
64 res.setbodybytes(_('commands require POST requests'))
64 res.setbodybytes(_('commands require POST requests'))
65 return
65 return
66
66
67 # At some point we'll want to use our own API instead of recycling the
67 # At some point we'll want to use our own API instead of recycling the
68 # behavior of version 1 of the wire protocol...
68 # behavior of version 1 of the wire protocol...
69 # TODO return reasonable responses - not responses that overload the
69 # TODO return reasonable responses - not responses that overload the
70 # HTTP status line message for error reporting.
70 # HTTP status line message for error reporting.
71 try:
71 try:
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 except hgwebcommon.ErrorResponse as e:
73 except hgwebcommon.ErrorResponse as e:
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 for k, v in e.headers:
75 for k, v in e.headers:
76 res.headers[k] = v
76 res.headers[k] = v
77 res.setbodybytes('permission denied')
77 res.setbodybytes('permission denied')
78 return
78 return
79
79
80 # We have a special endpoint to reflect the request back at the client.
80 # We have a special endpoint to reflect the request back at the client.
81 if command == b'debugreflect':
81 if command == b'debugreflect':
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 return
83 return
84
84
85 # Extra commands that we handle that aren't really wire protocol
85 # Extra commands that we handle that aren't really wire protocol
86 # commands. Think extra hard before making this hackery available to
86 # commands. Think extra hard before making this hackery available to
87 # extension.
87 # extension.
88 extracommands = {'multirequest'}
88 extracommands = {'multirequest'}
89
89
90 if command not in wireproto.commandsv2 and command not in extracommands:
90 if command not in wireproto.commandsv2 and command not in extracommands:
91 res.status = b'404 Not Found'
91 res.status = b'404 Not Found'
92 res.headers[b'Content-Type'] = b'text/plain'
92 res.headers[b'Content-Type'] = b'text/plain'
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 return
94 return
95
95
96 repo = rctx.repo
96 repo = rctx.repo
97 ui = repo.ui
97 ui = repo.ui
98
98
99 proto = httpv2protocolhandler(req, ui)
99 proto = httpv2protocolhandler(req, ui)
100
100
101 if (not wireproto.commandsv2.commandavailable(command, proto)
101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 and command not in extracommands):
102 and command not in extracommands):
103 res.status = b'404 Not Found'
103 res.status = b'404 Not Found'
104 res.headers[b'Content-Type'] = b'text/plain'
104 res.headers[b'Content-Type'] = b'text/plain'
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 return
106 return
107
107
108 # TODO consider cases where proxies may add additional Accept headers.
108 # TODO consider cases where proxies may add additional Accept headers.
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 res.status = b'406 Not Acceptable'
110 res.status = b'406 Not Acceptable'
111 res.headers[b'Content-Type'] = b'text/plain'
111 res.headers[b'Content-Type'] = b'text/plain'
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 % FRAMINGTYPE)
113 % FRAMINGTYPE)
114 return
114 return
115
115
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 res.status = b'415 Unsupported Media Type'
117 res.status = b'415 Unsupported Media Type'
118 # TODO we should send a response with appropriate media type,
118 # TODO we should send a response with appropriate media type,
119 # since client does Accept it.
119 # since client does Accept it.
120 res.headers[b'Content-Type'] = b'text/plain'
120 res.headers[b'Content-Type'] = b'text/plain'
121 res.setbodybytes(_('client MUST send Content-Type header with '
121 res.setbodybytes(_('client MUST send Content-Type header with '
122 'value: %s\n') % FRAMINGTYPE)
122 'value: %s\n') % FRAMINGTYPE)
123 return
123 return
124
124
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126
126
127 def _processhttpv2reflectrequest(ui, repo, req, res):
127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 """Reads unified frame protocol request and dumps out state to client.
128 """Reads unified frame protocol request and dumps out state to client.
129
129
130 This special endpoint can be used to help debug the wire protocol.
130 This special endpoint can be used to help debug the wire protocol.
131
131
132 Instead of routing the request through the normal dispatch mechanism,
132 Instead of routing the request through the normal dispatch mechanism,
133 we instead read all frames, decode them, and feed them into our state
133 we instead read all frames, decode them, and feed them into our state
134 tracker. We then dump the log of all that activity back out to the
134 tracker. We then dump the log of all that activity back out to the
135 client.
135 client.
136 """
136 """
137 import json
137 import json
138
138
139 # Reflection APIs have a history of being abused, accidentally disclosing
139 # Reflection APIs have a history of being abused, accidentally disclosing
140 # sensitive data, etc. So we have a config knob.
140 # sensitive data, etc. So we have a config knob.
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 res.status = b'404 Not Found'
142 res.status = b'404 Not Found'
143 res.headers[b'Content-Type'] = b'text/plain'
143 res.headers[b'Content-Type'] = b'text/plain'
144 res.setbodybytes(_('debugreflect service not available'))
144 res.setbodybytes(_('debugreflect service not available'))
145 return
145 return
146
146
147 # We assume we have a unified framing protocol request body.
147 # We assume we have a unified framing protocol request body.
148
148
149 reactor = wireprotoframing.serverreactor()
149 reactor = wireprotoframing.serverreactor()
150 states = []
150 states = []
151
151
152 while True:
152 while True:
153 frame = wireprotoframing.readframe(req.bodyfh)
153 frame = wireprotoframing.readframe(req.bodyfh)
154
154
155 if not frame:
155 if not frame:
156 states.append(b'received: <no frame>')
156 states.append(b'received: <no frame>')
157 break
157 break
158
158
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 frame.requestid,
160 frame.requestid,
161 frame.payload))
161 frame.payload))
162
162
163 action, meta = reactor.onframerecv(frame)
163 action, meta = reactor.onframerecv(frame)
164 states.append(json.dumps((action, meta), sort_keys=True,
164 states.append(json.dumps((action, meta), sort_keys=True,
165 separators=(', ', ': ')))
165 separators=(', ', ': ')))
166
166
167 action, meta = reactor.oninputeof()
167 action, meta = reactor.oninputeof()
168 meta['action'] = action
168 meta['action'] = action
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170
170
171 res.status = b'200 OK'
171 res.status = b'200 OK'
172 res.headers[b'Content-Type'] = b'text/plain'
172 res.headers[b'Content-Type'] = b'text/plain'
173 res.setbodybytes(b'\n'.join(states))
173 res.setbodybytes(b'\n'.join(states))
174
174
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 """Post-validation handler for HTTPv2 requests.
176 """Post-validation handler for HTTPv2 requests.
177
177
178 Called when the HTTP request contains unified frame-based protocol
178 Called when the HTTP request contains unified frame-based protocol
179 frames for evaluation.
179 frames for evaluation.
180 """
180 """
181 # TODO Some HTTP clients are full duplex and can receive data before
181 # TODO Some HTTP clients are full duplex and can receive data before
182 # the entire request is transmitted. Figure out a way to indicate support
182 # the entire request is transmitted. Figure out a way to indicate support
183 # for that so we can opt into full duplex mode.
183 # for that so we can opt into full duplex mode.
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 seencommand = False
185 seencommand = False
186
186
187 outstream = reactor.makeoutputstream()
187 outstream = reactor.makeoutputstream()
188
188
189 while True:
189 while True:
190 frame = wireprotoframing.readframe(req.bodyfh)
190 frame = wireprotoframing.readframe(req.bodyfh)
191 if not frame:
191 if not frame:
192 break
192 break
193
193
194 action, meta = reactor.onframerecv(frame)
194 action, meta = reactor.onframerecv(frame)
195
195
196 if action == 'wantframe':
196 if action == 'wantframe':
197 # Need more data before we can do anything.
197 # Need more data before we can do anything.
198 continue
198 continue
199 elif action == 'runcommand':
199 elif action == 'runcommand':
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 reqcommand, reactor, outstream,
201 reqcommand, reactor, outstream,
202 meta, issubsequent=seencommand)
202 meta, issubsequent=seencommand)
203
203
204 if sentoutput:
204 if sentoutput:
205 return
205 return
206
206
207 seencommand = True
207 seencommand = True
208
208
209 elif action == 'error':
209 elif action == 'error':
210 # TODO define proper error mechanism.
210 # TODO define proper error mechanism.
211 res.status = b'200 OK'
211 res.status = b'200 OK'
212 res.headers[b'Content-Type'] = b'text/plain'
212 res.headers[b'Content-Type'] = b'text/plain'
213 res.setbodybytes(meta['message'] + b'\n')
213 res.setbodybytes(meta['message'] + b'\n')
214 return
214 return
215 else:
215 else:
216 raise error.ProgrammingError(
216 raise error.ProgrammingError(
217 'unhandled action from frame processor: %s' % action)
217 'unhandled action from frame processor: %s' % action)
218
218
219 action, meta = reactor.oninputeof()
219 action, meta = reactor.oninputeof()
220 if action == 'sendframes':
220 if action == 'sendframes':
221 # We assume we haven't started sending the response yet. If we're
221 # We assume we haven't started sending the response yet. If we're
222 # wrong, the response type will raise an exception.
222 # wrong, the response type will raise an exception.
223 res.status = b'200 OK'
223 res.status = b'200 OK'
224 res.headers[b'Content-Type'] = FRAMINGTYPE
224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 res.setbodygen(meta['framegen'])
225 res.setbodygen(meta['framegen'])
226 elif action == 'noop':
226 elif action == 'noop':
227 pass
227 pass
228 else:
228 else:
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 % action)
230 % action)
231
231
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 outstream, command, issubsequent):
233 outstream, command, issubsequent):
234 """Dispatch a wire protocol command made from HTTPv2 requests.
234 """Dispatch a wire protocol command made from HTTPv2 requests.
235
235
236 The authenticated permission (``authedperm``) along with the original
236 The authenticated permission (``authedperm``) along with the original
237 command from the URL (``reqcommand``) are passed in.
237 command from the URL (``reqcommand``) are passed in.
238 """
238 """
239 # We already validated that the session has permissions to perform the
239 # We already validated that the session has permissions to perform the
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 # command to run is expressed in a frame. However, the URL also requested
241 # command to run is expressed in a frame. However, the URL also requested
242 # to run a specific command. We need to be careful that the command we
242 # to run a specific command. We need to be careful that the command we
243 # run doesn't have permissions requirements greater than what was granted
243 # run doesn't have permissions requirements greater than what was granted
244 # by ``authedperm``.
244 # by ``authedperm``.
245 #
245 #
246 # Our rule for this is we only allow one command per HTTP request and
246 # Our rule for this is we only allow one command per HTTP request and
247 # that command must match the command in the URL. However, we make
247 # that command must match the command in the URL. However, we make
248 # an exception for the ``multirequest`` URL. This URL is allowed to
248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 # execute multiple commands. We double check permissions of each command
249 # execute multiple commands. We double check permissions of each command
250 # as it is invoked to ensure there is no privilege escalation.
250 # as it is invoked to ensure there is no privilege escalation.
251 # TODO consider allowing multiple commands to regular command URLs
251 # TODO consider allowing multiple commands to regular command URLs
252 # iff each command is the same.
252 # iff each command is the same.
253
253
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255
255
256 if reqcommand == b'multirequest':
256 if reqcommand == b'multirequest':
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 # TODO proper error mechanism
258 # TODO proper error mechanism
259 res.status = b'200 OK'
259 res.status = b'200 OK'
260 res.headers[b'Content-Type'] = b'text/plain'
260 res.headers[b'Content-Type'] = b'text/plain'
261 res.setbodybytes(_('wire protocol command not available: %s') %
261 res.setbodybytes(_('wire protocol command not available: %s') %
262 command['command'])
262 command['command'])
263 return True
263 return True
264
264
265 # TODO don't use assert here, since it may be elided by -O.
265 # TODO don't use assert here, since it may be elided by -O.
266 assert authedperm in (b'ro', b'rw')
266 assert authedperm in (b'ro', b'rw')
267 wirecommand = wireproto.commandsv2[command['command']]
267 wirecommand = wireproto.commandsv2[command['command']]
268 assert wirecommand.permission in ('push', 'pull')
268 assert wirecommand.permission in ('push', 'pull')
269
269
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 # TODO proper error mechanism
271 # TODO proper error mechanism
272 res.status = b'403 Forbidden'
272 res.status = b'403 Forbidden'
273 res.headers[b'Content-Type'] = b'text/plain'
273 res.headers[b'Content-Type'] = b'text/plain'
274 res.setbodybytes(_('insufficient permissions to execute '
274 res.setbodybytes(_('insufficient permissions to execute '
275 'command: %s') % command['command'])
275 'command: %s') % command['command'])
276 return True
276 return True
277
277
278 # TODO should we also call checkperm() here? Maybe not if we're going
278 # TODO should we also call checkperm() here? Maybe not if we're going
279 # to overhaul that API. The granted scope from the URL check should
279 # to overhaul that API. The granted scope from the URL check should
280 # be good enough.
280 # be good enough.
281
281
282 else:
282 else:
283 # Don't allow multiple commands outside of ``multirequest`` URL.
283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 if issubsequent:
284 if issubsequent:
285 # TODO proper error mechanism
285 # TODO proper error mechanism
286 res.status = b'200 OK'
286 res.status = b'200 OK'
287 res.headers[b'Content-Type'] = b'text/plain'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('multiple commands cannot be issued to this '
288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 'URL'))
289 'URL'))
290 return True
290 return True
291
291
292 if reqcommand != command['command']:
292 if reqcommand != command['command']:
293 # TODO define proper error mechanism
293 # TODO define proper error mechanism
294 res.status = b'200 OK'
294 res.status = b'200 OK'
295 res.headers[b'Content-Type'] = b'text/plain'
295 res.headers[b'Content-Type'] = b'text/plain'
296 res.setbodybytes(_('command in frame must match command in URL'))
296 res.setbodybytes(_('command in frame must match command in URL'))
297 return True
297 return True
298
298
299 rsp = wireproto.dispatch(repo, proto, command['command'])
299 rsp = wireproto.dispatch(repo, proto, command['command'])
300
300
301 res.status = b'200 OK'
301 res.status = b'200 OK'
302 res.headers[b'Content-Type'] = FRAMINGTYPE
302 res.headers[b'Content-Type'] = FRAMINGTYPE
303
303
304 if isinstance(rsp, wireprototypes.cborresponse):
304 if isinstance(rsp, wireprototypes.cborresponse):
305 encoded = cbor.dumps(rsp.value, canonical=True)
305 encoded = cbor.dumps(rsp.value, canonical=True)
306 action, meta = reactor.oncommandresponseready(outstream,
306 action, meta = reactor.oncommandresponseready(outstream,
307 command['requestid'],
307 command['requestid'],
308 encoded)
308 encoded)
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 command['requestid'],
312 rsp.gen)
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 action, meta = reactor.oncommanderror(outstream,
315 command['requestid'],
316 rsp.message,
317 rsp.args)
309 else:
318 else:
310 action, meta = reactor.onservererror(
319 action, meta = reactor.onservererror(
311 _('unhandled response type from wire proto command'))
320 _('unhandled response type from wire proto command'))
312
321
313 if action == 'sendframes':
322 if action == 'sendframes':
314 res.setbodygen(meta['framegen'])
323 res.setbodygen(meta['framegen'])
315 return True
324 return True
316 elif action == 'noop':
325 elif action == 'noop':
317 return False
326 return False
318 else:
327 else:
319 raise error.ProgrammingError('unhandled event from reactor: %s' %
328 raise error.ProgrammingError('unhandled event from reactor: %s' %
320 action)
329 action)
321
330
322 @zi.implementer(wireprototypes.baseprotocolhandler)
331 @zi.implementer(wireprototypes.baseprotocolhandler)
323 class httpv2protocolhandler(object):
332 class httpv2protocolhandler(object):
324 def __init__(self, req, ui, args=None):
333 def __init__(self, req, ui, args=None):
325 self._req = req
334 self._req = req
326 self._ui = ui
335 self._ui = ui
327 self._args = args
336 self._args = args
328
337
329 @property
338 @property
330 def name(self):
339 def name(self):
331 return HTTP_WIREPROTO_V2
340 return HTTP_WIREPROTO_V2
332
341
333 def getargs(self, args):
342 def getargs(self, args):
334 data = {}
343 data = {}
335 for k, typ in args.items():
344 for k, typ in args.items():
336 if k == '*':
345 if k == '*':
337 raise NotImplementedError('do not support * args')
346 raise NotImplementedError('do not support * args')
338 elif k in self._args:
347 elif k in self._args:
339 # TODO consider validating value types.
348 # TODO consider validating value types.
340 data[k] = self._args[k]
349 data[k] = self._args[k]
341
350
342 return data
351 return data
343
352
344 def getprotocaps(self):
353 def getprotocaps(self):
345 # Protocol capabilities are currently not implemented for HTTP V2.
354 # Protocol capabilities are currently not implemented for HTTP V2.
346 return set()
355 return set()
347
356
348 def getpayload(self):
357 def getpayload(self):
349 raise NotImplementedError
358 raise NotImplementedError
350
359
351 @contextlib.contextmanager
360 @contextlib.contextmanager
352 def mayberedirectstdio(self):
361 def mayberedirectstdio(self):
353 raise NotImplementedError
362 raise NotImplementedError
354
363
355 def client(self):
364 def client(self):
356 raise NotImplementedError
365 raise NotImplementedError
357
366
358 def addcapabilities(self, repo, caps):
367 def addcapabilities(self, repo, caps):
359 return caps
368 return caps
360
369
361 def checkperm(self, perm):
370 def checkperm(self, perm):
362 raise NotImplementedError
371 raise NotImplementedError
363
372
364 def httpv2apidescriptor(req, repo):
373 def httpv2apidescriptor(req, repo):
365 proto = httpv2protocolhandler(req, repo.ui)
374 proto = httpv2protocolhandler(req, repo.ui)
366
375
367 return _capabilitiesv2(repo, proto)
376 return _capabilitiesv2(repo, proto)
368
377
369 def _capabilitiesv2(repo, proto):
378 def _capabilitiesv2(repo, proto):
370 """Obtain the set of capabilities for version 2 transports.
379 """Obtain the set of capabilities for version 2 transports.
371
380
372 These capabilities are distinct from the capabilities for version 1
381 These capabilities are distinct from the capabilities for version 1
373 transports.
382 transports.
374 """
383 """
375 compression = []
384 compression = []
376 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
385 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
377 compression.append({
386 compression.append({
378 b'name': engine.wireprotosupport().name,
387 b'name': engine.wireprotosupport().name,
379 })
388 })
380
389
381 caps = {
390 caps = {
382 'commands': {},
391 'commands': {},
383 'compression': compression,
392 'compression': compression,
384 'framingmediatypes': [FRAMINGTYPE],
393 'framingmediatypes': [FRAMINGTYPE],
385 }
394 }
386
395
387 for command, entry in wireproto.commandsv2.items():
396 for command, entry in wireproto.commandsv2.items():
388 caps['commands'][command] = {
397 caps['commands'][command] = {
389 'args': entry.args,
398 'args': entry.args,
390 'permissions': [entry.permission],
399 'permissions': [entry.permission],
391 }
400 }
392
401
393 if streamclone.allowservergeneration(repo):
402 if streamclone.allowservergeneration(repo):
394 caps['rawrepoformats'] = sorted(repo.requirements &
403 caps['rawrepoformats'] = sorted(repo.requirements &
395 repo.supportedformats)
404 repo.supportedformats)
396
405
397 return proto.addcapabilities(repo, caps)
406 return proto.addcapabilities(repo, caps)
398
407
399 def wireprotocommand(*args, **kwargs):
408 def wireprotocommand(*args, **kwargs):
400 def register(func):
409 def register(func):
401 return wireproto.wireprotocommand(
410 return wireproto.wireprotocommand(
402 *args, transportpolicy=wireproto.POLICY_V2_ONLY, **kwargs)(func)
411 *args, transportpolicy=wireproto.POLICY_V2_ONLY, **kwargs)(func)
403
412
404 return register
413 return register
405
414
406 @wireprotocommand('branchmap', permission='pull')
415 @wireprotocommand('branchmap', permission='pull')
407 def branchmapv2(repo, proto):
416 def branchmapv2(repo, proto):
408 branchmap = {encoding.fromlocal(k): v
417 branchmap = {encoding.fromlocal(k): v
409 for k, v in repo.branchmap().iteritems()}
418 for k, v in repo.branchmap().iteritems()}
410
419
411 return wireprototypes.cborresponse(branchmap)
420 return wireprototypes.cborresponse(branchmap)
412
421
413 @wireprotocommand('capabilities', permission='pull')
422 @wireprotocommand('capabilities', permission='pull')
414 def capabilitiesv2(repo, proto):
423 def capabilitiesv2(repo, proto):
415 caps = _capabilitiesv2(repo, proto)
424 caps = _capabilitiesv2(repo, proto)
416
425
417 return wireprototypes.cborresponse(caps)
426 return wireprototypes.cborresponse(caps)
418
427
419 @wireprotocommand('heads',
428 @wireprotocommand('heads',
420 args={
429 args={
421 'publiconly': False,
430 'publiconly': False,
422 },
431 },
423 permission='pull')
432 permission='pull')
424 def headsv2(repo, proto, publiconly=False):
433 def headsv2(repo, proto, publiconly=False):
425 if publiconly:
434 if publiconly:
426 repo = repo.filtered('immutable')
435 repo = repo.filtered('immutable')
427
436
428 return wireprototypes.cborresponse(repo.heads())
437 return wireprototypes.cborresponse(repo.heads())
429
438
430 @wireprotocommand('known',
439 @wireprotocommand('known',
431 args={
440 args={
432 'nodes': [b'deadbeef'],
441 'nodes': [b'deadbeef'],
433 },
442 },
434 permission='pull')
443 permission='pull')
435 def knownv2(repo, proto, nodes=None):
444 def knownv2(repo, proto, nodes=None):
436 nodes = nodes or []
445 nodes = nodes or []
437 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
446 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
438 return wireprototypes.cborresponse(result)
447 return wireprototypes.cborresponse(result)
439
448
440 @wireprotocommand('listkeys',
449 @wireprotocommand('listkeys',
441 args={
450 args={
442 'namespace': b'ns',
451 'namespace': b'ns',
443 },
452 },
444 permission='pull')
453 permission='pull')
445 def listkeysv2(repo, proto, namespace=None):
454 def listkeysv2(repo, proto, namespace=None):
446 keys = repo.listkeys(encoding.tolocal(namespace))
455 keys = repo.listkeys(encoding.tolocal(namespace))
447 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
456 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
448 for k, v in keys.iteritems()}
457 for k, v in keys.iteritems()}
449
458
450 return wireprototypes.cborresponse(keys)
459 return wireprototypes.cborresponse(keys)
451
460
452 @wireprotocommand('lookup',
461 @wireprotocommand('lookup',
453 args={
462 args={
454 'key': b'foo',
463 'key': b'foo',
455 },
464 },
456 permission='pull')
465 permission='pull')
457 def lookupv2(repo, proto, key):
466 def lookupv2(repo, proto, key):
458 key = encoding.tolocal(key)
467 key = encoding.tolocal(key)
459
468
460 # TODO handle exception.
469 # TODO handle exception.
461 node = repo.lookup(key)
470 node = repo.lookup(key)
462
471
463 return wireprototypes.cborresponse(node)
472 return wireprototypes.cborresponse(node)
464
473
465 @wireprotocommand('pushkey',
474 @wireprotocommand('pushkey',
466 args={
475 args={
467 'namespace': b'ns',
476 'namespace': b'ns',
468 'key': b'key',
477 'key': b'key',
469 'old': b'old',
478 'old': b'old',
470 'new': b'new',
479 'new': b'new',
471 },
480 },
472 permission='push')
481 permission='push')
473 def pushkeyv2(repo, proto, namespace, key, old, new):
482 def pushkeyv2(repo, proto, namespace, key, old, new):
474 # TODO handle ui output redirection
483 # TODO handle ui output redirection
475 r = repo.pushkey(encoding.tolocal(namespace),
484 r = repo.pushkey(encoding.tolocal(namespace),
476 encoding.tolocal(key),
485 encoding.tolocal(key),
477 encoding.tolocal(old),
486 encoding.tolocal(old),
478 encoding.tolocal(new))
487 encoding.tolocal(new))
479
488
480 return wireprototypes.cborresponse(r)
489 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now