##// END OF EJS Templates
wireproto: start to associate frame generation with a stream...
Gregory Szorc -
r37303:3ed34454 default
parent child Browse files
Show More
@@ -1,720 +1,732
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import struct
14 import struct
15
15
16 from .i18n import _
16 from .i18n import _
17 from .thirdparty import (
17 from .thirdparty import (
18 attr,
18 attr,
19 )
19 )
20 from . import (
20 from . import (
21 error,
21 error,
22 util,
22 util,
23 )
23 )
24 from .utils import (
24 from .utils import (
25 stringutil,
25 stringutil,
26 )
26 )
27
27
28 FRAME_HEADER_SIZE = 6
28 FRAME_HEADER_SIZE = 6
29 DEFAULT_MAX_FRAME_SIZE = 32768
29 DEFAULT_MAX_FRAME_SIZE = 32768
30
30
31 FRAME_TYPE_COMMAND_NAME = 0x01
31 FRAME_TYPE_COMMAND_NAME = 0x01
32 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
32 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
33 FRAME_TYPE_COMMAND_DATA = 0x03
33 FRAME_TYPE_COMMAND_DATA = 0x03
34 FRAME_TYPE_BYTES_RESPONSE = 0x04
34 FRAME_TYPE_BYTES_RESPONSE = 0x04
35 FRAME_TYPE_ERROR_RESPONSE = 0x05
35 FRAME_TYPE_ERROR_RESPONSE = 0x05
36 FRAME_TYPE_TEXT_OUTPUT = 0x06
36 FRAME_TYPE_TEXT_OUTPUT = 0x06
37
37
38 FRAME_TYPES = {
38 FRAME_TYPES = {
39 b'command-name': FRAME_TYPE_COMMAND_NAME,
39 b'command-name': FRAME_TYPE_COMMAND_NAME,
40 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
40 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
41 b'command-data': FRAME_TYPE_COMMAND_DATA,
41 b'command-data': FRAME_TYPE_COMMAND_DATA,
42 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
42 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
43 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
43 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
44 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
44 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
45 }
45 }
46
46
47 FLAG_COMMAND_NAME_EOS = 0x01
47 FLAG_COMMAND_NAME_EOS = 0x01
48 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
48 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
49 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
49 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
50
50
51 FLAGS_COMMAND = {
51 FLAGS_COMMAND = {
52 b'eos': FLAG_COMMAND_NAME_EOS,
52 b'eos': FLAG_COMMAND_NAME_EOS,
53 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
53 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
54 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
54 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
55 }
55 }
56
56
57 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
57 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
58 FLAG_COMMAND_ARGUMENT_EOA = 0x02
58 FLAG_COMMAND_ARGUMENT_EOA = 0x02
59
59
60 FLAGS_COMMAND_ARGUMENT = {
60 FLAGS_COMMAND_ARGUMENT = {
61 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
61 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
62 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
62 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
63 }
63 }
64
64
65 FLAG_COMMAND_DATA_CONTINUATION = 0x01
65 FLAG_COMMAND_DATA_CONTINUATION = 0x01
66 FLAG_COMMAND_DATA_EOS = 0x02
66 FLAG_COMMAND_DATA_EOS = 0x02
67
67
68 FLAGS_COMMAND_DATA = {
68 FLAGS_COMMAND_DATA = {
69 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
69 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
70 b'eos': FLAG_COMMAND_DATA_EOS,
70 b'eos': FLAG_COMMAND_DATA_EOS,
71 }
71 }
72
72
73 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
73 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
74 FLAG_BYTES_RESPONSE_EOS = 0x02
74 FLAG_BYTES_RESPONSE_EOS = 0x02
75
75
76 FLAGS_BYTES_RESPONSE = {
76 FLAGS_BYTES_RESPONSE = {
77 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
77 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
78 b'eos': FLAG_BYTES_RESPONSE_EOS,
78 b'eos': FLAG_BYTES_RESPONSE_EOS,
79 }
79 }
80
80
81 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
81 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
82 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
82 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
83
83
84 FLAGS_ERROR_RESPONSE = {
84 FLAGS_ERROR_RESPONSE = {
85 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
85 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
86 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
86 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
87 }
87 }
88
88
89 # Maps frame types to their available flags.
89 # Maps frame types to their available flags.
90 FRAME_TYPE_FLAGS = {
90 FRAME_TYPE_FLAGS = {
91 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
91 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
92 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
92 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
94 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
95 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
95 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
96 FRAME_TYPE_TEXT_OUTPUT: {},
96 FRAME_TYPE_TEXT_OUTPUT: {},
97 }
97 }
98
98
99 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
99 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
100
100
101 @attr.s(slots=True)
101 @attr.s(slots=True)
102 class frameheader(object):
102 class frameheader(object):
103 """Represents the data in a frame header."""
103 """Represents the data in a frame header."""
104
104
105 length = attr.ib()
105 length = attr.ib()
106 requestid = attr.ib()
106 requestid = attr.ib()
107 typeid = attr.ib()
107 typeid = attr.ib()
108 flags = attr.ib()
108 flags = attr.ib()
109
109
110 @attr.s(slots=True)
110 @attr.s(slots=True)
111 class frame(object):
111 class frame(object):
112 """Represents a parsed frame."""
112 """Represents a parsed frame."""
113
113
114 requestid = attr.ib()
114 requestid = attr.ib()
115 typeid = attr.ib()
115 typeid = attr.ib()
116 flags = attr.ib()
116 flags = attr.ib()
117 payload = attr.ib()
117 payload = attr.ib()
118
118
119 def makeframe(requestid, typeid, flags, payload):
119 def makeframe(requestid, typeid, flags, payload):
120 """Assemble a frame into a byte array."""
120 """Assemble a frame into a byte array."""
121 # TODO assert size of payload.
121 # TODO assert size of payload.
122 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
122 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
123
123
124 # 24 bits length
124 # 24 bits length
125 # 16 bits request id
125 # 16 bits request id
126 # 4 bits type
126 # 4 bits type
127 # 4 bits flags
127 # 4 bits flags
128
128
129 l = struct.pack(r'<I', len(payload))
129 l = struct.pack(r'<I', len(payload))
130 frame[0:3] = l[0:3]
130 frame[0:3] = l[0:3]
131 struct.pack_into(r'<H', frame, 3, requestid)
131 struct.pack_into(r'<H', frame, 3, requestid)
132 frame[5] = (typeid << 4) | flags
132 frame[5] = (typeid << 4) | flags
133 frame[6:] = payload
133 frame[6:] = payload
134
134
135 return frame
135 return frame
136
136
137 def makeframefromhumanstring(s):
137 def makeframefromhumanstring(s):
138 """Create a frame from a human readable string
138 """Create a frame from a human readable string
139
139
140 Strings have the form:
140 Strings have the form:
141
141
142 <request-id> <type> <flags> <payload>
142 <request-id> <type> <flags> <payload>
143
143
144 This can be used by user-facing applications and tests for creating
144 This can be used by user-facing applications and tests for creating
145 frames easily without having to type out a bunch of constants.
145 frames easily without having to type out a bunch of constants.
146
146
147 Request ID is an integer.
147 Request ID is an integer.
148
148
149 Frame type and flags can be specified by integer or named constant.
149 Frame type and flags can be specified by integer or named constant.
150
150
151 Flags can be delimited by `|` to bitwise OR them together.
151 Flags can be delimited by `|` to bitwise OR them together.
152 """
152 """
153 requestid, frametype, frameflags, payload = s.split(b' ', 3)
153 requestid, frametype, frameflags, payload = s.split(b' ', 3)
154
154
155 requestid = int(requestid)
155 requestid = int(requestid)
156
156
157 if frametype in FRAME_TYPES:
157 if frametype in FRAME_TYPES:
158 frametype = FRAME_TYPES[frametype]
158 frametype = FRAME_TYPES[frametype]
159 else:
159 else:
160 frametype = int(frametype)
160 frametype = int(frametype)
161
161
162 finalflags = 0
162 finalflags = 0
163 validflags = FRAME_TYPE_FLAGS[frametype]
163 validflags = FRAME_TYPE_FLAGS[frametype]
164 for flag in frameflags.split(b'|'):
164 for flag in frameflags.split(b'|'):
165 if flag in validflags:
165 if flag in validflags:
166 finalflags |= validflags[flag]
166 finalflags |= validflags[flag]
167 else:
167 else:
168 finalflags |= int(flag)
168 finalflags |= int(flag)
169
169
170 payload = stringutil.unescapestr(payload)
170 payload = stringutil.unescapestr(payload)
171
171
172 return makeframe(requestid=requestid, typeid=frametype,
172 return makeframe(requestid=requestid, typeid=frametype,
173 flags=finalflags, payload=payload)
173 flags=finalflags, payload=payload)
174
174
175 def parseheader(data):
175 def parseheader(data):
176 """Parse a unified framing protocol frame header from a buffer.
176 """Parse a unified framing protocol frame header from a buffer.
177
177
178 The header is expected to be in the buffer at offset 0 and the
178 The header is expected to be in the buffer at offset 0 and the
179 buffer is expected to be large enough to hold a full header.
179 buffer is expected to be large enough to hold a full header.
180 """
180 """
181 # 24 bits payload length (little endian)
181 # 24 bits payload length (little endian)
182 # 4 bits frame type
182 # 4 bits frame type
183 # 4 bits frame flags
183 # 4 bits frame flags
184 # ... payload
184 # ... payload
185 framelength = data[0] + 256 * data[1] + 16384 * data[2]
185 framelength = data[0] + 256 * data[1] + 16384 * data[2]
186 requestid = struct.unpack_from(r'<H', data, 3)[0]
186 requestid = struct.unpack_from(r'<H', data, 3)[0]
187 typeflags = data[5]
187 typeflags = data[5]
188
188
189 frametype = (typeflags & 0xf0) >> 4
189 frametype = (typeflags & 0xf0) >> 4
190 frameflags = typeflags & 0x0f
190 frameflags = typeflags & 0x0f
191
191
192 return frameheader(framelength, requestid, frametype, frameflags)
192 return frameheader(framelength, requestid, frametype, frameflags)
193
193
194 def readframe(fh):
194 def readframe(fh):
195 """Read a unified framing protocol frame from a file object.
195 """Read a unified framing protocol frame from a file object.
196
196
197 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
197 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
198 None if no frame is available. May raise if a malformed frame is
198 None if no frame is available. May raise if a malformed frame is
199 seen.
199 seen.
200 """
200 """
201 header = bytearray(FRAME_HEADER_SIZE)
201 header = bytearray(FRAME_HEADER_SIZE)
202
202
203 readcount = fh.readinto(header)
203 readcount = fh.readinto(header)
204
204
205 if readcount == 0:
205 if readcount == 0:
206 return None
206 return None
207
207
208 if readcount != FRAME_HEADER_SIZE:
208 if readcount != FRAME_HEADER_SIZE:
209 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
209 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
210 (readcount, header))
210 (readcount, header))
211
211
212 h = parseheader(header)
212 h = parseheader(header)
213
213
214 payload = fh.read(h.length)
214 payload = fh.read(h.length)
215 if len(payload) != h.length:
215 if len(payload) != h.length:
216 raise error.Abort(_('frame length error: expected %d; got %d') %
216 raise error.Abort(_('frame length error: expected %d; got %d') %
217 (h.length, len(payload)))
217 (h.length, len(payload)))
218
218
219 return frame(h.requestid, h.typeid, h.flags, payload)
219 return frame(h.requestid, h.typeid, h.flags, payload)
220
220
221 def createcommandframes(requestid, cmd, args, datafh=None):
221 def createcommandframes(stream, requestid, cmd, args, datafh=None):
222 """Create frames necessary to transmit a request to run a command.
222 """Create frames necessary to transmit a request to run a command.
223
223
224 This is a generator of bytearrays. Each item represents a frame
224 This is a generator of bytearrays. Each item represents a frame
225 ready to be sent over the wire to a peer.
225 ready to be sent over the wire to a peer.
226 """
226 """
227 flags = 0
227 flags = 0
228 if args:
228 if args:
229 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
229 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
230 if datafh:
230 if datafh:
231 flags |= FLAG_COMMAND_NAME_HAVE_DATA
231 flags |= FLAG_COMMAND_NAME_HAVE_DATA
232
232
233 if not flags:
233 if not flags:
234 flags |= FLAG_COMMAND_NAME_EOS
234 flags |= FLAG_COMMAND_NAME_EOS
235
235
236 yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
236 yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
237 flags=flags, payload=cmd)
237 flags=flags, payload=cmd)
238
238
239 for i, k in enumerate(sorted(args)):
239 for i, k in enumerate(sorted(args)):
240 v = args[k]
240 v = args[k]
241 last = i == len(args) - 1
241 last = i == len(args) - 1
242
242
243 # TODO handle splitting of argument values across frames.
243 # TODO handle splitting of argument values across frames.
244 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
244 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
245 offset = 0
245 offset = 0
246 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
246 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
247 offset += ARGUMENT_FRAME_HEADER.size
247 offset += ARGUMENT_FRAME_HEADER.size
248 payload[offset:offset + len(k)] = k
248 payload[offset:offset + len(k)] = k
249 offset += len(k)
249 offset += len(k)
250 payload[offset:offset + len(v)] = v
250 payload[offset:offset + len(v)] = v
251
251
252 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
252 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
253 yield makeframe(requestid=requestid,
253 yield stream.makeframe(requestid=requestid,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
255 flags=flags,
255 flags=flags,
256 payload=payload)
256 payload=payload)
257
257
258 if datafh:
258 if datafh:
259 while True:
259 while True:
260 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
260 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
261
261
262 done = False
262 done = False
263 if len(data) == DEFAULT_MAX_FRAME_SIZE:
263 if len(data) == DEFAULT_MAX_FRAME_SIZE:
264 flags = FLAG_COMMAND_DATA_CONTINUATION
264 flags = FLAG_COMMAND_DATA_CONTINUATION
265 else:
265 else:
266 flags = FLAG_COMMAND_DATA_EOS
266 flags = FLAG_COMMAND_DATA_EOS
267 assert datafh.read(1) == b''
267 assert datafh.read(1) == b''
268 done = True
268 done = True
269
269
270 yield makeframe(requestid=requestid,
270 yield stream.makeframe(requestid=requestid,
271 typeid=FRAME_TYPE_COMMAND_DATA,
271 typeid=FRAME_TYPE_COMMAND_DATA,
272 flags=flags,
272 flags=flags,
273 payload=data)
273 payload=data)
274
274
275 if done:
275 if done:
276 break
276 break
277
277
278 def createbytesresponseframesfrombytes(requestid, data,
278 def createbytesresponseframesfrombytes(stream, requestid, data,
279 maxframesize=DEFAULT_MAX_FRAME_SIZE):
279 maxframesize=DEFAULT_MAX_FRAME_SIZE):
280 """Create a raw frame to send a bytes response from static bytes input.
280 """Create a raw frame to send a bytes response from static bytes input.
281
281
282 Returns a generator of bytearrays.
282 Returns a generator of bytearrays.
283 """
283 """
284
284
285 # Simple case of a single frame.
285 # Simple case of a single frame.
286 if len(data) <= maxframesize:
286 if len(data) <= maxframesize:
287 yield makeframe(requestid=requestid,
287 yield stream.makeframe(requestid=requestid,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
289 flags=FLAG_BYTES_RESPONSE_EOS,
289 flags=FLAG_BYTES_RESPONSE_EOS,
290 payload=data)
290 payload=data)
291 return
291 return
292
292
293 offset = 0
293 offset = 0
294 while True:
294 while True:
295 chunk = data[offset:offset + maxframesize]
295 chunk = data[offset:offset + maxframesize]
296 offset += len(chunk)
296 offset += len(chunk)
297 done = offset == len(data)
297 done = offset == len(data)
298
298
299 if done:
299 if done:
300 flags = FLAG_BYTES_RESPONSE_EOS
300 flags = FLAG_BYTES_RESPONSE_EOS
301 else:
301 else:
302 flags = FLAG_BYTES_RESPONSE_CONTINUATION
302 flags = FLAG_BYTES_RESPONSE_CONTINUATION
303
303
304 yield makeframe(requestid=requestid,
304 yield stream.makeframe(requestid=requestid,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
306 flags=flags,
306 flags=flags,
307 payload=chunk)
307 payload=chunk)
308
308
309 if done:
309 if done:
310 break
310 break
311
311
312 def createerrorframe(requestid, msg, protocol=False, application=False):
312 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
313 # TODO properly handle frame size limits.
313 # TODO properly handle frame size limits.
314 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
314 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
315
315
316 flags = 0
316 flags = 0
317 if protocol:
317 if protocol:
318 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
318 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
319 if application:
319 if application:
320 flags |= FLAG_ERROR_RESPONSE_APPLICATION
320 flags |= FLAG_ERROR_RESPONSE_APPLICATION
321
321
322 yield makeframe(requestid=requestid,
322 yield stream.makeframe(requestid=requestid,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
324 flags=flags,
324 flags=flags,
325 payload=msg)
325 payload=msg)
326
326
327 def createtextoutputframe(requestid, atoms):
327 def createtextoutputframe(stream, requestid, atoms):
328 """Create a text output frame to render text to people.
328 """Create a text output frame to render text to people.
329
329
330 ``atoms`` is a 3-tuple of (formatting string, args, labels).
330 ``atoms`` is a 3-tuple of (formatting string, args, labels).
331
331
332 The formatting string contains ``%s`` tokens to be replaced by the
332 The formatting string contains ``%s`` tokens to be replaced by the
333 corresponding indexed entry in ``args``. ``labels`` is an iterable of
333 corresponding indexed entry in ``args``. ``labels`` is an iterable of
334 formatters to be applied at rendering time. In terms of the ``ui``
334 formatters to be applied at rendering time. In terms of the ``ui``
335 class, each atom corresponds to a ``ui.write()``.
335 class, each atom corresponds to a ``ui.write()``.
336 """
336 """
337 bytesleft = DEFAULT_MAX_FRAME_SIZE
337 bytesleft = DEFAULT_MAX_FRAME_SIZE
338 atomchunks = []
338 atomchunks = []
339
339
340 for (formatting, args, labels) in atoms:
340 for (formatting, args, labels) in atoms:
341 if len(args) > 255:
341 if len(args) > 255:
342 raise ValueError('cannot use more than 255 formatting arguments')
342 raise ValueError('cannot use more than 255 formatting arguments')
343 if len(labels) > 255:
343 if len(labels) > 255:
344 raise ValueError('cannot use more than 255 labels')
344 raise ValueError('cannot use more than 255 labels')
345
345
346 # TODO look for localstr, other types here?
346 # TODO look for localstr, other types here?
347
347
348 if not isinstance(formatting, bytes):
348 if not isinstance(formatting, bytes):
349 raise ValueError('must use bytes formatting strings')
349 raise ValueError('must use bytes formatting strings')
350 for arg in args:
350 for arg in args:
351 if not isinstance(arg, bytes):
351 if not isinstance(arg, bytes):
352 raise ValueError('must use bytes for arguments')
352 raise ValueError('must use bytes for arguments')
353 for label in labels:
353 for label in labels:
354 if not isinstance(label, bytes):
354 if not isinstance(label, bytes):
355 raise ValueError('must use bytes for labels')
355 raise ValueError('must use bytes for labels')
356
356
357 # Formatting string must be UTF-8.
357 # Formatting string must be UTF-8.
358 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
358 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
359
359
360 # Arguments must be UTF-8.
360 # Arguments must be UTF-8.
361 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
361 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
362
362
363 # Labels must be ASCII.
363 # Labels must be ASCII.
364 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
364 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
365 for l in labels]
365 for l in labels]
366
366
367 if len(formatting) > 65535:
367 if len(formatting) > 65535:
368 raise ValueError('formatting string cannot be longer than 64k')
368 raise ValueError('formatting string cannot be longer than 64k')
369
369
370 if any(len(a) > 65535 for a in args):
370 if any(len(a) > 65535 for a in args):
371 raise ValueError('argument string cannot be longer than 64k')
371 raise ValueError('argument string cannot be longer than 64k')
372
372
373 if any(len(l) > 255 for l in labels):
373 if any(len(l) > 255 for l in labels):
374 raise ValueError('label string cannot be longer than 255 bytes')
374 raise ValueError('label string cannot be longer than 255 bytes')
375
375
376 chunks = [
376 chunks = [
377 struct.pack(r'<H', len(formatting)),
377 struct.pack(r'<H', len(formatting)),
378 struct.pack(r'<BB', len(labels), len(args)),
378 struct.pack(r'<BB', len(labels), len(args)),
379 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
379 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
380 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
380 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
381 ]
381 ]
382 chunks.append(formatting)
382 chunks.append(formatting)
383 chunks.extend(labels)
383 chunks.extend(labels)
384 chunks.extend(args)
384 chunks.extend(args)
385
385
386 atom = b''.join(chunks)
386 atom = b''.join(chunks)
387 atomchunks.append(atom)
387 atomchunks.append(atom)
388 bytesleft -= len(atom)
388 bytesleft -= len(atom)
389
389
390 if bytesleft < 0:
390 if bytesleft < 0:
391 raise ValueError('cannot encode data in a single frame')
391 raise ValueError('cannot encode data in a single frame')
392
392
393 yield makeframe(requestid=requestid,
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
395 flags=0,
395 flags=0,
396 payload=b''.join(atomchunks))
396 payload=b''.join(atomchunks))
397
398 class stream(object):
399 """Represents a logical unidirectional series of frames."""
400
401 def makeframe(self, requestid, typeid, flags, payload):
402 """Create a frame to be sent out over this stream.
403
404 Only returns the frame instance. Does not actually send it.
405 """
406 return makeframe(requestid, typeid, flags, payload)
397
407
398 class serverreactor(object):
408 class serverreactor(object):
399 """Holds state of a server handling frame-based protocol requests.
409 """Holds state of a server handling frame-based protocol requests.
400
410
401 This class is the "brain" of the unified frame-based protocol server
411 This class is the "brain" of the unified frame-based protocol server
402 component. While the protocol is stateless from the perspective of
412 component. While the protocol is stateless from the perspective of
403 requests/commands, something needs to track which frames have been
413 requests/commands, something needs to track which frames have been
404 received, what frames to expect, etc. This class is that thing.
414 received, what frames to expect, etc. This class is that thing.
405
415
406 Instances are modeled as a state machine of sorts. Instances are also
416 Instances are modeled as a state machine of sorts. Instances are also
407 reactionary to external events. The point of this class is to encapsulate
417 reactionary to external events. The point of this class is to encapsulate
408 the state of the connection and the exchange of frames, not to perform
418 the state of the connection and the exchange of frames, not to perform
409 work. Instead, callers tell this class when something occurs, like a
419 work. Instead, callers tell this class when something occurs, like a
410 frame arriving. If that activity is worthy of a follow-up action (say
420 frame arriving. If that activity is worthy of a follow-up action (say
411 *run a command*), the return value of that handler will say so.
421 *run a command*), the return value of that handler will say so.
412
422
413 I/O and CPU intensive operations are purposefully delegated outside of
423 I/O and CPU intensive operations are purposefully delegated outside of
414 this class.
424 this class.
415
425
416 Consumers are expected to tell instances when events occur. They do so by
426 Consumers are expected to tell instances when events occur. They do so by
417 calling the various ``on*`` methods. These methods return a 2-tuple
427 calling the various ``on*`` methods. These methods return a 2-tuple
418 describing any follow-up action(s) to take. The first element is the
428 describing any follow-up action(s) to take. The first element is the
419 name of an action to perform. The second is a data structure (usually
429 name of an action to perform. The second is a data structure (usually
420 a dict) specific to that action that contains more information. e.g.
430 a dict) specific to that action that contains more information. e.g.
421 if the server wants to send frames back to the client, the data structure
431 if the server wants to send frames back to the client, the data structure
422 will contain a reference to those frames.
432 will contain a reference to those frames.
423
433
424 Valid actions that consumers can be instructed to take are:
434 Valid actions that consumers can be instructed to take are:
425
435
426 sendframes
436 sendframes
427 Indicates that frames should be sent to the client. The ``framegen``
437 Indicates that frames should be sent to the client. The ``framegen``
428 key contains a generator of frames that should be sent. The server
438 key contains a generator of frames that should be sent. The server
429 assumes that all frames are sent to the client.
439 assumes that all frames are sent to the client.
430
440
431 error
441 error
432 Indicates that an error occurred. Consumer should probably abort.
442 Indicates that an error occurred. Consumer should probably abort.
433
443
434 runcommand
444 runcommand
435 Indicates that the consumer should run a wire protocol command. Details
445 Indicates that the consumer should run a wire protocol command. Details
436 of the command to run are given in the data structure.
446 of the command to run are given in the data structure.
437
447
438 wantframe
448 wantframe
439 Indicates that nothing of interest happened and the server is waiting on
449 Indicates that nothing of interest happened and the server is waiting on
440 more frames from the client before anything interesting can be done.
450 more frames from the client before anything interesting can be done.
441
451
442 noop
452 noop
443 Indicates no additional action is required.
453 Indicates no additional action is required.
444
454
445 Known Issues
455 Known Issues
446 ------------
456 ------------
447
457
448 There are no limits to the number of partially received commands or their
458 There are no limits to the number of partially received commands or their
449 size. A malicious client could stream command request data and exhaust the
459 size. A malicious client could stream command request data and exhaust the
450 server's memory.
460 server's memory.
451
461
452 Partially received commands are not acted upon when end of input is
462 Partially received commands are not acted upon when end of input is
453 reached. Should the server error if it receives a partial request?
463 reached. Should the server error if it receives a partial request?
454 Should the client send a message to abort a partially transmitted request
464 Should the client send a message to abort a partially transmitted request
455 to facilitate graceful shutdown?
465 to facilitate graceful shutdown?
456
466
457 Active requests that haven't been responded to aren't tracked. This means
467 Active requests that haven't been responded to aren't tracked. This means
458 that if we receive a command and instruct its dispatch, another command
468 that if we receive a command and instruct its dispatch, another command
459 with its request ID can come in over the wire and there will be a race
469 with its request ID can come in over the wire and there will be a race
460 between who responds to what.
470 between who responds to what.
461 """
471 """
462
472
463 def __init__(self, deferoutput=False):
473 def __init__(self, deferoutput=False):
464 """Construct a new server reactor.
474 """Construct a new server reactor.
465
475
466 ``deferoutput`` can be used to indicate that no output frames should be
476 ``deferoutput`` can be used to indicate that no output frames should be
467 instructed to be sent until input has been exhausted. In this mode,
477 instructed to be sent until input has been exhausted. In this mode,
468 events that would normally generate output frames (such as a command
478 events that would normally generate output frames (such as a command
469 response being ready) will instead defer instructing the consumer to
479 response being ready) will instead defer instructing the consumer to
470 send those frames. This is useful for half-duplex transports where the
480 send those frames. This is useful for half-duplex transports where the
471 sender cannot receive until all data has been transmitted.
481 sender cannot receive until all data has been transmitted.
472 """
482 """
473 self._deferoutput = deferoutput
483 self._deferoutput = deferoutput
474 self._state = 'idle'
484 self._state = 'idle'
475 self._bufferedframegens = []
485 self._bufferedframegens = []
476 # request id -> dict of commands that are actively being received.
486 # request id -> dict of commands that are actively being received.
477 self._receivingcommands = {}
487 self._receivingcommands = {}
478 # Request IDs that have been received and are actively being processed.
488 # Request IDs that have been received and are actively being processed.
479 # Once all output for a request has been sent, it is removed from this
489 # Once all output for a request has been sent, it is removed from this
480 # set.
490 # set.
481 self._activecommands = set()
491 self._activecommands = set()
482
492
483 def onframerecv(self, frame):
493 def onframerecv(self, frame):
484 """Process a frame that has been received off the wire.
494 """Process a frame that has been received off the wire.
485
495
486 Returns a dict with an ``action`` key that details what action,
496 Returns a dict with an ``action`` key that details what action,
487 if any, the consumer should take next.
497 if any, the consumer should take next.
488 """
498 """
489 handlers = {
499 handlers = {
490 'idle': self._onframeidle,
500 'idle': self._onframeidle,
491 'command-receiving': self._onframecommandreceiving,
501 'command-receiving': self._onframecommandreceiving,
492 'errored': self._onframeerrored,
502 'errored': self._onframeerrored,
493 }
503 }
494
504
495 meth = handlers.get(self._state)
505 meth = handlers.get(self._state)
496 if not meth:
506 if not meth:
497 raise error.ProgrammingError('unhandled state: %s' % self._state)
507 raise error.ProgrammingError('unhandled state: %s' % self._state)
498
508
499 return meth(frame)
509 return meth(frame)
500
510
501 def onbytesresponseready(self, requestid, data):
511 def onbytesresponseready(self, stream, requestid, data):
502 """Signal that a bytes response is ready to be sent to the client.
512 """Signal that a bytes response is ready to be sent to the client.
503
513
504 The raw bytes response is passed as an argument.
514 The raw bytes response is passed as an argument.
505 """
515 """
506 def sendframes():
516 def sendframes():
507 for frame in createbytesresponseframesfrombytes(requestid, data):
517 for frame in createbytesresponseframesfrombytes(stream, requestid,
518 data):
508 yield frame
519 yield frame
509
520
510 self._activecommands.remove(requestid)
521 self._activecommands.remove(requestid)
511
522
512 result = sendframes()
523 result = sendframes()
513
524
514 if self._deferoutput:
525 if self._deferoutput:
515 self._bufferedframegens.append(result)
526 self._bufferedframegens.append(result)
516 return 'noop', {}
527 return 'noop', {}
517 else:
528 else:
518 return 'sendframes', {
529 return 'sendframes', {
519 'framegen': result,
530 'framegen': result,
520 }
531 }
521
532
522 def oninputeof(self):
533 def oninputeof(self):
523 """Signals that end of input has been received.
534 """Signals that end of input has been received.
524
535
525 No more frames will be received. All pending activity should be
536 No more frames will be received. All pending activity should be
526 completed.
537 completed.
527 """
538 """
528 # TODO should we do anything about in-flight commands?
539 # TODO should we do anything about in-flight commands?
529
540
530 if not self._deferoutput or not self._bufferedframegens:
541 if not self._deferoutput or not self._bufferedframegens:
531 return 'noop', {}
542 return 'noop', {}
532
543
533 # If we buffered all our responses, emit those.
544 # If we buffered all our responses, emit those.
534 def makegen():
545 def makegen():
535 for gen in self._bufferedframegens:
546 for gen in self._bufferedframegens:
536 for frame in gen:
547 for frame in gen:
537 yield frame
548 yield frame
538
549
539 return 'sendframes', {
550 return 'sendframes', {
540 'framegen': makegen(),
551 'framegen': makegen(),
541 }
552 }
542
553
543 def onapplicationerror(self, requestid, msg):
554 def onapplicationerror(self, stream, requestid, msg):
544 return 'sendframes', {
555 return 'sendframes', {
545 'framegen': createerrorframe(requestid, msg, application=True),
556 'framegen': createerrorframe(stream, requestid, msg,
557 application=True),
546 }
558 }
547
559
548 def _makeerrorresult(self, msg):
560 def _makeerrorresult(self, msg):
549 return 'error', {
561 return 'error', {
550 'message': msg,
562 'message': msg,
551 }
563 }
552
564
553 def _makeruncommandresult(self, requestid):
565 def _makeruncommandresult(self, requestid):
554 entry = self._receivingcommands[requestid]
566 entry = self._receivingcommands[requestid]
555 del self._receivingcommands[requestid]
567 del self._receivingcommands[requestid]
556
568
557 if self._receivingcommands:
569 if self._receivingcommands:
558 self._state = 'command-receiving'
570 self._state = 'command-receiving'
559 else:
571 else:
560 self._state = 'idle'
572 self._state = 'idle'
561
573
562 assert requestid not in self._activecommands
574 assert requestid not in self._activecommands
563 self._activecommands.add(requestid)
575 self._activecommands.add(requestid)
564
576
565 return 'runcommand', {
577 return 'runcommand', {
566 'requestid': requestid,
578 'requestid': requestid,
567 'command': entry['command'],
579 'command': entry['command'],
568 'args': entry['args'],
580 'args': entry['args'],
569 'data': entry['data'].getvalue() if entry['data'] else None,
581 'data': entry['data'].getvalue() if entry['data'] else None,
570 }
582 }
571
583
572 def _makewantframeresult(self):
584 def _makewantframeresult(self):
573 return 'wantframe', {
585 return 'wantframe', {
574 'state': self._state,
586 'state': self._state,
575 }
587 }
576
588
577 def _onframeidle(self, frame):
589 def _onframeidle(self, frame):
578 # The only frame type that should be received in this state is a
590 # The only frame type that should be received in this state is a
579 # command request.
591 # command request.
580 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
592 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
581 self._state = 'errored'
593 self._state = 'errored'
582 return self._makeerrorresult(
594 return self._makeerrorresult(
583 _('expected command frame; got %d') % frame.typeid)
595 _('expected command frame; got %d') % frame.typeid)
584
596
585 if frame.requestid in self._receivingcommands:
597 if frame.requestid in self._receivingcommands:
586 self._state = 'errored'
598 self._state = 'errored'
587 return self._makeerrorresult(
599 return self._makeerrorresult(
588 _('request with ID %d already received') % frame.requestid)
600 _('request with ID %d already received') % frame.requestid)
589
601
590 if frame.requestid in self._activecommands:
602 if frame.requestid in self._activecommands:
591 self._state = 'errored'
603 self._state = 'errored'
592 return self._makeerrorresult((
604 return self._makeerrorresult((
593 _('request with ID %d is already active') % frame.requestid))
605 _('request with ID %d is already active') % frame.requestid))
594
606
595 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
607 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
596 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
608 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
597
609
598 self._receivingcommands[frame.requestid] = {
610 self._receivingcommands[frame.requestid] = {
599 'command': frame.payload,
611 'command': frame.payload,
600 'args': {},
612 'args': {},
601 'data': None,
613 'data': None,
602 'expectingargs': expectingargs,
614 'expectingargs': expectingargs,
603 'expectingdata': expectingdata,
615 'expectingdata': expectingdata,
604 }
616 }
605
617
606 if frame.flags & FLAG_COMMAND_NAME_EOS:
618 if frame.flags & FLAG_COMMAND_NAME_EOS:
607 return self._makeruncommandresult(frame.requestid)
619 return self._makeruncommandresult(frame.requestid)
608
620
609 if expectingargs or expectingdata:
621 if expectingargs or expectingdata:
610 self._state = 'command-receiving'
622 self._state = 'command-receiving'
611 return self._makewantframeresult()
623 return self._makewantframeresult()
612 else:
624 else:
613 self._state = 'errored'
625 self._state = 'errored'
614 return self._makeerrorresult(_('missing frame flags on '
626 return self._makeerrorresult(_('missing frame flags on '
615 'command frame'))
627 'command frame'))
616
628
617 def _onframecommandreceiving(self, frame):
629 def _onframecommandreceiving(self, frame):
618 # It could be a new command request. Process it as such.
630 # It could be a new command request. Process it as such.
619 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
631 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
620 return self._onframeidle(frame)
632 return self._onframeidle(frame)
621
633
622 # All other frames should be related to a command that is currently
634 # All other frames should be related to a command that is currently
623 # receiving but is not active.
635 # receiving but is not active.
624 if frame.requestid in self._activecommands:
636 if frame.requestid in self._activecommands:
625 self._state = 'errored'
637 self._state = 'errored'
626 return self._makeerrorresult(
638 return self._makeerrorresult(
627 _('received frame for request that is still active: %d') %
639 _('received frame for request that is still active: %d') %
628 frame.requestid)
640 frame.requestid)
629
641
630 if frame.requestid not in self._receivingcommands:
642 if frame.requestid not in self._receivingcommands:
631 self._state = 'errored'
643 self._state = 'errored'
632 return self._makeerrorresult(
644 return self._makeerrorresult(
633 _('received frame for request that is not receiving: %d') %
645 _('received frame for request that is not receiving: %d') %
634 frame.requestid)
646 frame.requestid)
635
647
636 entry = self._receivingcommands[frame.requestid]
648 entry = self._receivingcommands[frame.requestid]
637
649
638 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
650 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
639 if not entry['expectingargs']:
651 if not entry['expectingargs']:
640 self._state = 'errored'
652 self._state = 'errored'
641 return self._makeerrorresult(_(
653 return self._makeerrorresult(_(
642 'received command argument frame for request that is not '
654 'received command argument frame for request that is not '
643 'expecting arguments: %d') % frame.requestid)
655 'expecting arguments: %d') % frame.requestid)
644
656
645 return self._handlecommandargsframe(frame, entry)
657 return self._handlecommandargsframe(frame, entry)
646
658
647 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
659 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
648 if not entry['expectingdata']:
660 if not entry['expectingdata']:
649 self._state = 'errored'
661 self._state = 'errored'
650 return self._makeerrorresult(_(
662 return self._makeerrorresult(_(
651 'received command data frame for request that is not '
663 'received command data frame for request that is not '
652 'expecting data: %d') % frame.requestid)
664 'expecting data: %d') % frame.requestid)
653
665
654 if entry['data'] is None:
666 if entry['data'] is None:
655 entry['data'] = util.bytesio()
667 entry['data'] = util.bytesio()
656
668
657 return self._handlecommanddataframe(frame, entry)
669 return self._handlecommanddataframe(frame, entry)
658
670
659 def _handlecommandargsframe(self, frame, entry):
671 def _handlecommandargsframe(self, frame, entry):
660 # The frame and state of command should have already been validated.
672 # The frame and state of command should have already been validated.
661 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
673 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
662
674
663 offset = 0
675 offset = 0
664 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
676 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
665 offset += ARGUMENT_FRAME_HEADER.size
677 offset += ARGUMENT_FRAME_HEADER.size
666
678
667 # The argument name MUST fit inside the frame.
679 # The argument name MUST fit inside the frame.
668 argname = bytes(frame.payload[offset:offset + namesize])
680 argname = bytes(frame.payload[offset:offset + namesize])
669 offset += namesize
681 offset += namesize
670
682
671 if len(argname) != namesize:
683 if len(argname) != namesize:
672 self._state = 'errored'
684 self._state = 'errored'
673 return self._makeerrorresult(_('malformed argument frame: '
685 return self._makeerrorresult(_('malformed argument frame: '
674 'partial argument name'))
686 'partial argument name'))
675
687
676 argvalue = bytes(frame.payload[offset:])
688 argvalue = bytes(frame.payload[offset:])
677
689
678 # Argument value spans multiple frames. Record our active state
690 # Argument value spans multiple frames. Record our active state
679 # and wait for the next frame.
691 # and wait for the next frame.
680 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
692 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
681 raise error.ProgrammingError('not yet implemented')
693 raise error.ProgrammingError('not yet implemented')
682
694
683 # Common case: the argument value is completely contained in this
695 # Common case: the argument value is completely contained in this
684 # frame.
696 # frame.
685
697
686 if len(argvalue) != valuesize:
698 if len(argvalue) != valuesize:
687 self._state = 'errored'
699 self._state = 'errored'
688 return self._makeerrorresult(_('malformed argument frame: '
700 return self._makeerrorresult(_('malformed argument frame: '
689 'partial argument value'))
701 'partial argument value'))
690
702
691 entry['args'][argname] = argvalue
703 entry['args'][argname] = argvalue
692
704
693 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
705 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
694 if entry['expectingdata']:
706 if entry['expectingdata']:
695 # TODO signal request to run a command once we don't
707 # TODO signal request to run a command once we don't
696 # buffer data frames.
708 # buffer data frames.
697 return self._makewantframeresult()
709 return self._makewantframeresult()
698 else:
710 else:
699 return self._makeruncommandresult(frame.requestid)
711 return self._makeruncommandresult(frame.requestid)
700 else:
712 else:
701 return self._makewantframeresult()
713 return self._makewantframeresult()
702
714
703 def _handlecommanddataframe(self, frame, entry):
715 def _handlecommanddataframe(self, frame, entry):
704 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
716 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
705
717
706 # TODO support streaming data instead of buffering it.
718 # TODO support streaming data instead of buffering it.
707 entry['data'].write(frame.payload)
719 entry['data'].write(frame.payload)
708
720
709 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
721 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
710 return self._makewantframeresult()
722 return self._makewantframeresult()
711 elif frame.flags & FLAG_COMMAND_DATA_EOS:
723 elif frame.flags & FLAG_COMMAND_DATA_EOS:
712 entry['data'].seek(0)
724 entry['data'].seek(0)
713 return self._makeruncommandresult(frame.requestid)
725 return self._makeruncommandresult(frame.requestid)
714 else:
726 else:
715 self._state = 'errored'
727 self._state = 'errored'
716 return self._makeerrorresult(_('command data frame without '
728 return self._makeerrorresult(_('command data frame without '
717 'flags'))
729 'flags'))
718
730
719 def _onframeerrored(self, frame):
731 def _onframeerrored(self, frame):
720 return self._makeerrorresult(_('server already errored'))
732 return self._makeerrorresult(_('server already errored'))
@@ -1,1047 +1,1049
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 encoding,
16 encoding,
17 error,
17 error,
18 hook,
18 hook,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 wireprotoframing,
22 wireprotoframing,
23 wireprototypes,
23 wireprototypes,
24 )
24 )
25 from .utils import (
25 from .utils import (
26 procutil,
26 procutil,
27 )
27 )
28
28
29 stringio = util.stringio
29 stringio = util.stringio
30
30
31 urlerr = util.urlerr
31 urlerr = util.urlerr
32 urlreq = util.urlreq
32 urlreq = util.urlreq
33
33
34 HTTP_OK = 200
34 HTTP_OK = 200
35
35
36 HGTYPE = 'application/mercurial-0.1'
36 HGTYPE = 'application/mercurial-0.1'
37 HGTYPE2 = 'application/mercurial-0.2'
37 HGTYPE2 = 'application/mercurial-0.2'
38 HGERRTYPE = 'application/hg-error'
38 HGERRTYPE = 'application/hg-error'
39 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
39 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
40
40
41 HTTPV2 = wireprototypes.HTTPV2
41 HTTPV2 = wireprototypes.HTTPV2
42 SSHV1 = wireprototypes.SSHV1
42 SSHV1 = wireprototypes.SSHV1
43 SSHV2 = wireprototypes.SSHV2
43 SSHV2 = wireprototypes.SSHV2
44
44
45 def decodevaluefromheaders(req, headerprefix):
45 def decodevaluefromheaders(req, headerprefix):
46 """Decode a long value from multiple HTTP request headers.
46 """Decode a long value from multiple HTTP request headers.
47
47
48 Returns the value as a bytes, not a str.
48 Returns the value as a bytes, not a str.
49 """
49 """
50 chunks = []
50 chunks = []
51 i = 1
51 i = 1
52 while True:
52 while True:
53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
54 if v is None:
54 if v is None:
55 break
55 break
56 chunks.append(pycompat.bytesurl(v))
56 chunks.append(pycompat.bytesurl(v))
57 i += 1
57 i += 1
58
58
59 return ''.join(chunks)
59 return ''.join(chunks)
60
60
61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
62 def __init__(self, req, ui, checkperm):
62 def __init__(self, req, ui, checkperm):
63 self._req = req
63 self._req = req
64 self._ui = ui
64 self._ui = ui
65 self._checkperm = checkperm
65 self._checkperm = checkperm
66
66
67 @property
67 @property
68 def name(self):
68 def name(self):
69 return 'http-v1'
69 return 'http-v1'
70
70
71 def getargs(self, args):
71 def getargs(self, args):
72 knownargs = self._args()
72 knownargs = self._args()
73 data = {}
73 data = {}
74 keys = args.split()
74 keys = args.split()
75 for k in keys:
75 for k in keys:
76 if k == '*':
76 if k == '*':
77 star = {}
77 star = {}
78 for key in knownargs.keys():
78 for key in knownargs.keys():
79 if key != 'cmd' and key not in keys:
79 if key != 'cmd' and key not in keys:
80 star[key] = knownargs[key][0]
80 star[key] = knownargs[key][0]
81 data['*'] = star
81 data['*'] = star
82 else:
82 else:
83 data[k] = knownargs[k][0]
83 data[k] = knownargs[k][0]
84 return [data[k] for k in keys]
84 return [data[k] for k in keys]
85
85
86 def _args(self):
86 def _args(self):
87 args = self._req.qsparams.asdictoflists()
87 args = self._req.qsparams.asdictoflists()
88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
89 if postlen:
89 if postlen:
90 args.update(urlreq.parseqs(
90 args.update(urlreq.parseqs(
91 self._req.bodyfh.read(postlen), keep_blank_values=True))
91 self._req.bodyfh.read(postlen), keep_blank_values=True))
92 return args
92 return args
93
93
94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
96 return args
96 return args
97
97
98 def forwardpayload(self, fp):
98 def forwardpayload(self, fp):
99 # Existing clients *always* send Content-Length.
99 # Existing clients *always* send Content-Length.
100 length = int(self._req.headers[b'Content-Length'])
100 length = int(self._req.headers[b'Content-Length'])
101
101
102 # If httppostargs is used, we need to read Content-Length
102 # If httppostargs is used, we need to read Content-Length
103 # minus the amount that was consumed by args.
103 # minus the amount that was consumed by args.
104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
106 fp.write(s)
106 fp.write(s)
107
107
108 @contextlib.contextmanager
108 @contextlib.contextmanager
109 def mayberedirectstdio(self):
109 def mayberedirectstdio(self):
110 oldout = self._ui.fout
110 oldout = self._ui.fout
111 olderr = self._ui.ferr
111 olderr = self._ui.ferr
112
112
113 out = util.stringio()
113 out = util.stringio()
114
114
115 try:
115 try:
116 self._ui.fout = out
116 self._ui.fout = out
117 self._ui.ferr = out
117 self._ui.ferr = out
118 yield out
118 yield out
119 finally:
119 finally:
120 self._ui.fout = oldout
120 self._ui.fout = oldout
121 self._ui.ferr = olderr
121 self._ui.ferr = olderr
122
122
123 def client(self):
123 def client(self):
124 return 'remote:%s:%s:%s' % (
124 return 'remote:%s:%s:%s' % (
125 self._req.urlscheme,
125 self._req.urlscheme,
126 urlreq.quote(self._req.remotehost or ''),
126 urlreq.quote(self._req.remotehost or ''),
127 urlreq.quote(self._req.remoteuser or ''))
127 urlreq.quote(self._req.remoteuser or ''))
128
128
129 def addcapabilities(self, repo, caps):
129 def addcapabilities(self, repo, caps):
130 caps.append(b'batch')
130 caps.append(b'batch')
131
131
132 caps.append('httpheader=%d' %
132 caps.append('httpheader=%d' %
133 repo.ui.configint('server', 'maxhttpheaderlen'))
133 repo.ui.configint('server', 'maxhttpheaderlen'))
134 if repo.ui.configbool('experimental', 'httppostargs'):
134 if repo.ui.configbool('experimental', 'httppostargs'):
135 caps.append('httppostargs')
135 caps.append('httppostargs')
136
136
137 # FUTURE advertise 0.2rx once support is implemented
137 # FUTURE advertise 0.2rx once support is implemented
138 # FUTURE advertise minrx and mintx after consulting config option
138 # FUTURE advertise minrx and mintx after consulting config option
139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
140
140
141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
142 if compengines:
142 if compengines:
143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
144 for e in compengines)
144 for e in compengines)
145 caps.append('compression=%s' % comptypes)
145 caps.append('compression=%s' % comptypes)
146
146
147 return caps
147 return caps
148
148
149 def checkperm(self, perm):
149 def checkperm(self, perm):
150 return self._checkperm(perm)
150 return self._checkperm(perm)
151
151
152 # This method exists mostly so that extensions like remotefilelog can
152 # This method exists mostly so that extensions like remotefilelog can
153 # disable a kludgey legacy method only over http. As of early 2018,
153 # disable a kludgey legacy method only over http. As of early 2018,
154 # there are no other known users, so with any luck we can discard this
154 # there are no other known users, so with any luck we can discard this
155 # hook if remotefilelog becomes a first-party extension.
155 # hook if remotefilelog becomes a first-party extension.
156 def iscmd(cmd):
156 def iscmd(cmd):
157 return cmd in wireproto.commands
157 return cmd in wireproto.commands
158
158
159 def handlewsgirequest(rctx, req, res, checkperm):
159 def handlewsgirequest(rctx, req, res, checkperm):
160 """Possibly process a wire protocol request.
160 """Possibly process a wire protocol request.
161
161
162 If the current request is a wire protocol request, the request is
162 If the current request is a wire protocol request, the request is
163 processed by this function.
163 processed by this function.
164
164
165 ``req`` is a ``parsedrequest`` instance.
165 ``req`` is a ``parsedrequest`` instance.
166 ``res`` is a ``wsgiresponse`` instance.
166 ``res`` is a ``wsgiresponse`` instance.
167
167
168 Returns a bool indicating if the request was serviced. If set, the caller
168 Returns a bool indicating if the request was serviced. If set, the caller
169 should stop processing the request, as a response has already been issued.
169 should stop processing the request, as a response has already been issued.
170 """
170 """
171 # Avoid cycle involving hg module.
171 # Avoid cycle involving hg module.
172 from .hgweb import common as hgwebcommon
172 from .hgweb import common as hgwebcommon
173
173
174 repo = rctx.repo
174 repo = rctx.repo
175
175
176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
177 # string parameter. If it isn't present, this isn't a wire protocol
177 # string parameter. If it isn't present, this isn't a wire protocol
178 # request.
178 # request.
179 if 'cmd' not in req.qsparams:
179 if 'cmd' not in req.qsparams:
180 return False
180 return False
181
181
182 cmd = req.qsparams['cmd']
182 cmd = req.qsparams['cmd']
183
183
184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
185 # While not all wire protocol commands are available for all transports,
185 # While not all wire protocol commands are available for all transports,
186 # if we see a "cmd" value that resembles a known wire protocol command, we
186 # if we see a "cmd" value that resembles a known wire protocol command, we
187 # route it to a protocol handler. This is better than routing possible
187 # route it to a protocol handler. This is better than routing possible
188 # wire protocol requests to hgweb because it prevents hgweb from using
188 # wire protocol requests to hgweb because it prevents hgweb from using
189 # known wire protocol commands and it is less confusing for machine
189 # known wire protocol commands and it is less confusing for machine
190 # clients.
190 # clients.
191 if not iscmd(cmd):
191 if not iscmd(cmd):
192 return False
192 return False
193
193
194 # The "cmd" query string argument is only valid on the root path of the
194 # The "cmd" query string argument is only valid on the root path of the
195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
198 if req.dispatchpath:
198 if req.dispatchpath:
199 res.status = hgwebcommon.statusmessage(404)
199 res.status = hgwebcommon.statusmessage(404)
200 res.headers['Content-Type'] = HGTYPE
200 res.headers['Content-Type'] = HGTYPE
201 # TODO This is not a good response to issue for this request. This
201 # TODO This is not a good response to issue for this request. This
202 # is mostly for BC for now.
202 # is mostly for BC for now.
203 res.setbodybytes('0\n%s\n' % b'Not Found')
203 res.setbodybytes('0\n%s\n' % b'Not Found')
204 return True
204 return True
205
205
206 proto = httpv1protocolhandler(req, repo.ui,
206 proto = httpv1protocolhandler(req, repo.ui,
207 lambda perm: checkperm(rctx, req, perm))
207 lambda perm: checkperm(rctx, req, perm))
208
208
209 # The permissions checker should be the only thing that can raise an
209 # The permissions checker should be the only thing that can raise an
210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
211 # exception here. So consider refactoring into a exception type that
211 # exception here. So consider refactoring into a exception type that
212 # is associated with the wire protocol.
212 # is associated with the wire protocol.
213 try:
213 try:
214 _callhttp(repo, req, res, proto, cmd)
214 _callhttp(repo, req, res, proto, cmd)
215 except hgwebcommon.ErrorResponse as e:
215 except hgwebcommon.ErrorResponse as e:
216 for k, v in e.headers:
216 for k, v in e.headers:
217 res.headers[k] = v
217 res.headers[k] = v
218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
219 # TODO This response body assumes the failed command was
219 # TODO This response body assumes the failed command was
220 # "unbundle." That assumption is not always valid.
220 # "unbundle." That assumption is not always valid.
221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
222
222
223 return True
223 return True
224
224
225 def handlewsgiapirequest(rctx, req, res, checkperm):
225 def handlewsgiapirequest(rctx, req, res, checkperm):
226 """Handle requests to /api/*."""
226 """Handle requests to /api/*."""
227 assert req.dispatchparts[0] == b'api'
227 assert req.dispatchparts[0] == b'api'
228
228
229 repo = rctx.repo
229 repo = rctx.repo
230
230
231 # This whole URL space is experimental for now. But we want to
231 # This whole URL space is experimental for now. But we want to
232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
233 if not repo.ui.configbool('experimental', 'web.apiserver'):
233 if not repo.ui.configbool('experimental', 'web.apiserver'):
234 res.status = b'404 Not Found'
234 res.status = b'404 Not Found'
235 res.headers[b'Content-Type'] = b'text/plain'
235 res.headers[b'Content-Type'] = b'text/plain'
236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
237 return
237 return
238
238
239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
240 # by <protocol>.
240 # by <protocol>.
241
241
242 # Registered APIs are made available via config options of the name of
242 # Registered APIs are made available via config options of the name of
243 # the protocol.
243 # the protocol.
244 availableapis = set()
244 availableapis = set()
245 for k, v in API_HANDLERS.items():
245 for k, v in API_HANDLERS.items():
246 section, option = v['config']
246 section, option = v['config']
247 if repo.ui.configbool(section, option):
247 if repo.ui.configbool(section, option):
248 availableapis.add(k)
248 availableapis.add(k)
249
249
250 # Requests to /api/ list available APIs.
250 # Requests to /api/ list available APIs.
251 if req.dispatchparts == [b'api']:
251 if req.dispatchparts == [b'api']:
252 res.status = b'200 OK'
252 res.status = b'200 OK'
253 res.headers[b'Content-Type'] = b'text/plain'
253 res.headers[b'Content-Type'] = b'text/plain'
254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
255 'one of the following:\n')]
255 'one of the following:\n')]
256 if availableapis:
256 if availableapis:
257 lines.extend(sorted(availableapis))
257 lines.extend(sorted(availableapis))
258 else:
258 else:
259 lines.append(_('(no available APIs)\n'))
259 lines.append(_('(no available APIs)\n'))
260 res.setbodybytes(b'\n'.join(lines))
260 res.setbodybytes(b'\n'.join(lines))
261 return
261 return
262
262
263 proto = req.dispatchparts[1]
263 proto = req.dispatchparts[1]
264
264
265 if proto not in API_HANDLERS:
265 if proto not in API_HANDLERS:
266 res.status = b'404 Not Found'
266 res.status = b'404 Not Found'
267 res.headers[b'Content-Type'] = b'text/plain'
267 res.headers[b'Content-Type'] = b'text/plain'
268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
269 proto, b', '.join(sorted(availableapis))))
269 proto, b', '.join(sorted(availableapis))))
270 return
270 return
271
271
272 if proto not in availableapis:
272 if proto not in availableapis:
273 res.status = b'404 Not Found'
273 res.status = b'404 Not Found'
274 res.headers[b'Content-Type'] = b'text/plain'
274 res.headers[b'Content-Type'] = b'text/plain'
275 res.setbodybytes(_('API %s not enabled\n') % proto)
275 res.setbodybytes(_('API %s not enabled\n') % proto)
276 return
276 return
277
277
278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
279 req.dispatchparts[2:])
279 req.dispatchparts[2:])
280
280
281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
282 from .hgweb import common as hgwebcommon
282 from .hgweb import common as hgwebcommon
283
283
284 # URL space looks like: <permissions>/<command>, where <permission> can
284 # URL space looks like: <permissions>/<command>, where <permission> can
285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
286
286
287 # Root URL does nothing meaningful... yet.
287 # Root URL does nothing meaningful... yet.
288 if not urlparts:
288 if not urlparts:
289 res.status = b'200 OK'
289 res.status = b'200 OK'
290 res.headers[b'Content-Type'] = b'text/plain'
290 res.headers[b'Content-Type'] = b'text/plain'
291 res.setbodybytes(_('HTTP version 2 API handler'))
291 res.setbodybytes(_('HTTP version 2 API handler'))
292 return
292 return
293
293
294 if len(urlparts) == 1:
294 if len(urlparts) == 1:
295 res.status = b'404 Not Found'
295 res.status = b'404 Not Found'
296 res.headers[b'Content-Type'] = b'text/plain'
296 res.headers[b'Content-Type'] = b'text/plain'
297 res.setbodybytes(_('do not know how to process %s\n') %
297 res.setbodybytes(_('do not know how to process %s\n') %
298 req.dispatchpath)
298 req.dispatchpath)
299 return
299 return
300
300
301 permission, command = urlparts[0:2]
301 permission, command = urlparts[0:2]
302
302
303 if permission not in (b'ro', b'rw'):
303 if permission not in (b'ro', b'rw'):
304 res.status = b'404 Not Found'
304 res.status = b'404 Not Found'
305 res.headers[b'Content-Type'] = b'text/plain'
305 res.headers[b'Content-Type'] = b'text/plain'
306 res.setbodybytes(_('unknown permission: %s') % permission)
306 res.setbodybytes(_('unknown permission: %s') % permission)
307 return
307 return
308
308
309 if req.method != 'POST':
309 if req.method != 'POST':
310 res.status = b'405 Method Not Allowed'
310 res.status = b'405 Method Not Allowed'
311 res.headers[b'Allow'] = b'POST'
311 res.headers[b'Allow'] = b'POST'
312 res.setbodybytes(_('commands require POST requests'))
312 res.setbodybytes(_('commands require POST requests'))
313 return
313 return
314
314
315 # At some point we'll want to use our own API instead of recycling the
315 # At some point we'll want to use our own API instead of recycling the
316 # behavior of version 1 of the wire protocol...
316 # behavior of version 1 of the wire protocol...
317 # TODO return reasonable responses - not responses that overload the
317 # TODO return reasonable responses - not responses that overload the
318 # HTTP status line message for error reporting.
318 # HTTP status line message for error reporting.
319 try:
319 try:
320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
321 except hgwebcommon.ErrorResponse as e:
321 except hgwebcommon.ErrorResponse as e:
322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
323 for k, v in e.headers:
323 for k, v in e.headers:
324 res.headers[k] = v
324 res.headers[k] = v
325 res.setbodybytes('permission denied')
325 res.setbodybytes('permission denied')
326 return
326 return
327
327
328 # We have a special endpoint to reflect the request back at the client.
328 # We have a special endpoint to reflect the request back at the client.
329 if command == b'debugreflect':
329 if command == b'debugreflect':
330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
331 return
331 return
332
332
333 # Extra commands that we handle that aren't really wire protocol
333 # Extra commands that we handle that aren't really wire protocol
334 # commands. Think extra hard before making this hackery available to
334 # commands. Think extra hard before making this hackery available to
335 # extension.
335 # extension.
336 extracommands = {'multirequest'}
336 extracommands = {'multirequest'}
337
337
338 if command not in wireproto.commands and command not in extracommands:
338 if command not in wireproto.commands and command not in extracommands:
339 res.status = b'404 Not Found'
339 res.status = b'404 Not Found'
340 res.headers[b'Content-Type'] = b'text/plain'
340 res.headers[b'Content-Type'] = b'text/plain'
341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
342 return
342 return
343
343
344 repo = rctx.repo
344 repo = rctx.repo
345 ui = repo.ui
345 ui = repo.ui
346
346
347 proto = httpv2protocolhandler(req, ui)
347 proto = httpv2protocolhandler(req, ui)
348
348
349 if (not wireproto.commands.commandavailable(command, proto)
349 if (not wireproto.commands.commandavailable(command, proto)
350 and command not in extracommands):
350 and command not in extracommands):
351 res.status = b'404 Not Found'
351 res.status = b'404 Not Found'
352 res.headers[b'Content-Type'] = b'text/plain'
352 res.headers[b'Content-Type'] = b'text/plain'
353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
354 return
354 return
355
355
356 # TODO consider cases where proxies may add additional Accept headers.
356 # TODO consider cases where proxies may add additional Accept headers.
357 if req.headers.get(b'Accept') != FRAMINGTYPE:
357 if req.headers.get(b'Accept') != FRAMINGTYPE:
358 res.status = b'406 Not Acceptable'
358 res.status = b'406 Not Acceptable'
359 res.headers[b'Content-Type'] = b'text/plain'
359 res.headers[b'Content-Type'] = b'text/plain'
360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
361 % FRAMINGTYPE)
361 % FRAMINGTYPE)
362 return
362 return
363
363
364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
365 res.status = b'415 Unsupported Media Type'
365 res.status = b'415 Unsupported Media Type'
366 # TODO we should send a response with appropriate media type,
366 # TODO we should send a response with appropriate media type,
367 # since client does Accept it.
367 # since client does Accept it.
368 res.headers[b'Content-Type'] = b'text/plain'
368 res.headers[b'Content-Type'] = b'text/plain'
369 res.setbodybytes(_('client MUST send Content-Type header with '
369 res.setbodybytes(_('client MUST send Content-Type header with '
370 'value: %s\n') % FRAMINGTYPE)
370 'value: %s\n') % FRAMINGTYPE)
371 return
371 return
372
372
373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
374
374
375 def _processhttpv2reflectrequest(ui, repo, req, res):
375 def _processhttpv2reflectrequest(ui, repo, req, res):
376 """Reads unified frame protocol request and dumps out state to client.
376 """Reads unified frame protocol request and dumps out state to client.
377
377
378 This special endpoint can be used to help debug the wire protocol.
378 This special endpoint can be used to help debug the wire protocol.
379
379
380 Instead of routing the request through the normal dispatch mechanism,
380 Instead of routing the request through the normal dispatch mechanism,
381 we instead read all frames, decode them, and feed them into our state
381 we instead read all frames, decode them, and feed them into our state
382 tracker. We then dump the log of all that activity back out to the
382 tracker. We then dump the log of all that activity back out to the
383 client.
383 client.
384 """
384 """
385 import json
385 import json
386
386
387 # Reflection APIs have a history of being abused, accidentally disclosing
387 # Reflection APIs have a history of being abused, accidentally disclosing
388 # sensitive data, etc. So we have a config knob.
388 # sensitive data, etc. So we have a config knob.
389 if not ui.configbool('experimental', 'web.api.debugreflect'):
389 if not ui.configbool('experimental', 'web.api.debugreflect'):
390 res.status = b'404 Not Found'
390 res.status = b'404 Not Found'
391 res.headers[b'Content-Type'] = b'text/plain'
391 res.headers[b'Content-Type'] = b'text/plain'
392 res.setbodybytes(_('debugreflect service not available'))
392 res.setbodybytes(_('debugreflect service not available'))
393 return
393 return
394
394
395 # We assume we have a unified framing protocol request body.
395 # We assume we have a unified framing protocol request body.
396
396
397 reactor = wireprotoframing.serverreactor()
397 reactor = wireprotoframing.serverreactor()
398 states = []
398 states = []
399
399
400 while True:
400 while True:
401 frame = wireprotoframing.readframe(req.bodyfh)
401 frame = wireprotoframing.readframe(req.bodyfh)
402
402
403 if not frame:
403 if not frame:
404 states.append(b'received: <no frame>')
404 states.append(b'received: <no frame>')
405 break
405 break
406
406
407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
408 frame.requestid,
408 frame.requestid,
409 frame.payload))
409 frame.payload))
410
410
411 action, meta = reactor.onframerecv(frame)
411 action, meta = reactor.onframerecv(frame)
412 states.append(json.dumps((action, meta), sort_keys=True,
412 states.append(json.dumps((action, meta), sort_keys=True,
413 separators=(', ', ': ')))
413 separators=(', ', ': ')))
414
414
415 action, meta = reactor.oninputeof()
415 action, meta = reactor.oninputeof()
416 meta['action'] = action
416 meta['action'] = action
417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
418
418
419 res.status = b'200 OK'
419 res.status = b'200 OK'
420 res.headers[b'Content-Type'] = b'text/plain'
420 res.headers[b'Content-Type'] = b'text/plain'
421 res.setbodybytes(b'\n'.join(states))
421 res.setbodybytes(b'\n'.join(states))
422
422
423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
424 """Post-validation handler for HTTPv2 requests.
424 """Post-validation handler for HTTPv2 requests.
425
425
426 Called when the HTTP request contains unified frame-based protocol
426 Called when the HTTP request contains unified frame-based protocol
427 frames for evaluation.
427 frames for evaluation.
428 """
428 """
429 # TODO Some HTTP clients are full duplex and can receive data before
429 # TODO Some HTTP clients are full duplex and can receive data before
430 # the entire request is transmitted. Figure out a way to indicate support
430 # the entire request is transmitted. Figure out a way to indicate support
431 # for that so we can opt into full duplex mode.
431 # for that so we can opt into full duplex mode.
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 seencommand = False
433 seencommand = False
434
434
435 while True:
435 while True:
436 frame = wireprotoframing.readframe(req.bodyfh)
436 frame = wireprotoframing.readframe(req.bodyfh)
437 if not frame:
437 if not frame:
438 break
438 break
439
439
440 action, meta = reactor.onframerecv(frame)
440 action, meta = reactor.onframerecv(frame)
441
441
442 if action == 'wantframe':
442 if action == 'wantframe':
443 # Need more data before we can do anything.
443 # Need more data before we can do anything.
444 continue
444 continue
445 elif action == 'runcommand':
445 elif action == 'runcommand':
446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 reqcommand, reactor, meta,
447 reqcommand, reactor, meta,
448 issubsequent=seencommand)
448 issubsequent=seencommand)
449
449
450 if sentoutput:
450 if sentoutput:
451 return
451 return
452
452
453 seencommand = True
453 seencommand = True
454
454
455 elif action == 'error':
455 elif action == 'error':
456 # TODO define proper error mechanism.
456 # TODO define proper error mechanism.
457 res.status = b'200 OK'
457 res.status = b'200 OK'
458 res.headers[b'Content-Type'] = b'text/plain'
458 res.headers[b'Content-Type'] = b'text/plain'
459 res.setbodybytes(meta['message'] + b'\n')
459 res.setbodybytes(meta['message'] + b'\n')
460 return
460 return
461 else:
461 else:
462 raise error.ProgrammingError(
462 raise error.ProgrammingError(
463 'unhandled action from frame processor: %s' % action)
463 'unhandled action from frame processor: %s' % action)
464
464
465 action, meta = reactor.oninputeof()
465 action, meta = reactor.oninputeof()
466 if action == 'sendframes':
466 if action == 'sendframes':
467 # We assume we haven't started sending the response yet. If we're
467 # We assume we haven't started sending the response yet. If we're
468 # wrong, the response type will raise an exception.
468 # wrong, the response type will raise an exception.
469 res.status = b'200 OK'
469 res.status = b'200 OK'
470 res.headers[b'Content-Type'] = FRAMINGTYPE
470 res.headers[b'Content-Type'] = FRAMINGTYPE
471 res.setbodygen(meta['framegen'])
471 res.setbodygen(meta['framegen'])
472 elif action == 'noop':
472 elif action == 'noop':
473 pass
473 pass
474 else:
474 else:
475 raise error.ProgrammingError('unhandled action from frame processor: %s'
475 raise error.ProgrammingError('unhandled action from frame processor: %s'
476 % action)
476 % action)
477
477
478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 command, issubsequent):
479 command, issubsequent):
480 """Dispatch a wire protocol command made from HTTPv2 requests.
480 """Dispatch a wire protocol command made from HTTPv2 requests.
481
481
482 The authenticated permission (``authedperm``) along with the original
482 The authenticated permission (``authedperm``) along with the original
483 command from the URL (``reqcommand``) are passed in.
483 command from the URL (``reqcommand``) are passed in.
484 """
484 """
485 # We already validated that the session has permissions to perform the
485 # We already validated that the session has permissions to perform the
486 # actions in ``authedperm``. In the unified frame protocol, the canonical
486 # actions in ``authedperm``. In the unified frame protocol, the canonical
487 # command to run is expressed in a frame. However, the URL also requested
487 # command to run is expressed in a frame. However, the URL also requested
488 # to run a specific command. We need to be careful that the command we
488 # to run a specific command. We need to be careful that the command we
489 # run doesn't have permissions requirements greater than what was granted
489 # run doesn't have permissions requirements greater than what was granted
490 # by ``authedperm``.
490 # by ``authedperm``.
491 #
491 #
492 # Our rule for this is we only allow one command per HTTP request and
492 # Our rule for this is we only allow one command per HTTP request and
493 # that command must match the command in the URL. However, we make
493 # that command must match the command in the URL. However, we make
494 # an exception for the ``multirequest`` URL. This URL is allowed to
494 # an exception for the ``multirequest`` URL. This URL is allowed to
495 # execute multiple commands. We double check permissions of each command
495 # execute multiple commands. We double check permissions of each command
496 # as it is invoked to ensure there is no privilege escalation.
496 # as it is invoked to ensure there is no privilege escalation.
497 # TODO consider allowing multiple commands to regular command URLs
497 # TODO consider allowing multiple commands to regular command URLs
498 # iff each command is the same.
498 # iff each command is the same.
499
499
500 proto = httpv2protocolhandler(req, ui, args=command['args'])
500 proto = httpv2protocolhandler(req, ui, args=command['args'])
501
501
502 if reqcommand == b'multirequest':
502 if reqcommand == b'multirequest':
503 if not wireproto.commands.commandavailable(command['command'], proto):
503 if not wireproto.commands.commandavailable(command['command'], proto):
504 # TODO proper error mechanism
504 # TODO proper error mechanism
505 res.status = b'200 OK'
505 res.status = b'200 OK'
506 res.headers[b'Content-Type'] = b'text/plain'
506 res.headers[b'Content-Type'] = b'text/plain'
507 res.setbodybytes(_('wire protocol command not available: %s') %
507 res.setbodybytes(_('wire protocol command not available: %s') %
508 command['command'])
508 command['command'])
509 return True
509 return True
510
510
511 # TODO don't use assert here, since it may be elided by -O.
511 # TODO don't use assert here, since it may be elided by -O.
512 assert authedperm in (b'ro', b'rw')
512 assert authedperm in (b'ro', b'rw')
513 wirecommand = wireproto.commands[command['command']]
513 wirecommand = wireproto.commands[command['command']]
514 assert wirecommand.permission in ('push', 'pull')
514 assert wirecommand.permission in ('push', 'pull')
515
515
516 if authedperm == b'ro' and wirecommand.permission != 'pull':
516 if authedperm == b'ro' and wirecommand.permission != 'pull':
517 # TODO proper error mechanism
517 # TODO proper error mechanism
518 res.status = b'403 Forbidden'
518 res.status = b'403 Forbidden'
519 res.headers[b'Content-Type'] = b'text/plain'
519 res.headers[b'Content-Type'] = b'text/plain'
520 res.setbodybytes(_('insufficient permissions to execute '
520 res.setbodybytes(_('insufficient permissions to execute '
521 'command: %s') % command['command'])
521 'command: %s') % command['command'])
522 return True
522 return True
523
523
524 # TODO should we also call checkperm() here? Maybe not if we're going
524 # TODO should we also call checkperm() here? Maybe not if we're going
525 # to overhaul that API. The granted scope from the URL check should
525 # to overhaul that API. The granted scope from the URL check should
526 # be good enough.
526 # be good enough.
527
527
528 else:
528 else:
529 # Don't allow multiple commands outside of ``multirequest`` URL.
529 # Don't allow multiple commands outside of ``multirequest`` URL.
530 if issubsequent:
530 if issubsequent:
531 # TODO proper error mechanism
531 # TODO proper error mechanism
532 res.status = b'200 OK'
532 res.status = b'200 OK'
533 res.headers[b'Content-Type'] = b'text/plain'
533 res.headers[b'Content-Type'] = b'text/plain'
534 res.setbodybytes(_('multiple commands cannot be issued to this '
534 res.setbodybytes(_('multiple commands cannot be issued to this '
535 'URL'))
535 'URL'))
536 return True
536 return True
537
537
538 if reqcommand != command['command']:
538 if reqcommand != command['command']:
539 # TODO define proper error mechanism
539 # TODO define proper error mechanism
540 res.status = b'200 OK'
540 res.status = b'200 OK'
541 res.headers[b'Content-Type'] = b'text/plain'
541 res.headers[b'Content-Type'] = b'text/plain'
542 res.setbodybytes(_('command in frame must match command in URL'))
542 res.setbodybytes(_('command in frame must match command in URL'))
543 return True
543 return True
544
544
545 rsp = wireproto.dispatch(repo, proto, command['command'])
545 rsp = wireproto.dispatch(repo, proto, command['command'])
546
546
547 res.status = b'200 OK'
547 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = FRAMINGTYPE
548 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream()
549
550
550 if isinstance(rsp, wireprototypes.bytesresponse):
551 if isinstance(rsp, wireprototypes.bytesresponse):
551 action, meta = reactor.onbytesresponseready(command['requestid'],
552 action, meta = reactor.onbytesresponseready(stream,
553 command['requestid'],
552 rsp.data)
554 rsp.data)
553 else:
555 else:
554 action, meta = reactor.onapplicationerror(
556 action, meta = reactor.onapplicationerror(
555 _('unhandled response type from wire proto command'))
557 _('unhandled response type from wire proto command'))
556
558
557 if action == 'sendframes':
559 if action == 'sendframes':
558 res.setbodygen(meta['framegen'])
560 res.setbodygen(meta['framegen'])
559 return True
561 return True
560 elif action == 'noop':
562 elif action == 'noop':
561 return False
563 return False
562 else:
564 else:
563 raise error.ProgrammingError('unhandled event from reactor: %s' %
565 raise error.ProgrammingError('unhandled event from reactor: %s' %
564 action)
566 action)
565
567
566 # Maps API name to metadata so custom API can be registered.
568 # Maps API name to metadata so custom API can be registered.
567 API_HANDLERS = {
569 API_HANDLERS = {
568 HTTPV2: {
570 HTTPV2: {
569 'config': ('experimental', 'web.api.http-v2'),
571 'config': ('experimental', 'web.api.http-v2'),
570 'handler': _handlehttpv2request,
572 'handler': _handlehttpv2request,
571 },
573 },
572 }
574 }
573
575
574 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
576 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
575 def __init__(self, req, ui, args=None):
577 def __init__(self, req, ui, args=None):
576 self._req = req
578 self._req = req
577 self._ui = ui
579 self._ui = ui
578 self._args = args
580 self._args = args
579
581
580 @property
582 @property
581 def name(self):
583 def name(self):
582 return HTTPV2
584 return HTTPV2
583
585
584 def getargs(self, args):
586 def getargs(self, args):
585 data = {}
587 data = {}
586 for k in args.split():
588 for k in args.split():
587 if k == '*':
589 if k == '*':
588 raise NotImplementedError('do not support * args')
590 raise NotImplementedError('do not support * args')
589 else:
591 else:
590 data[k] = self._args[k]
592 data[k] = self._args[k]
591
593
592 return [data[k] for k in args.split()]
594 return [data[k] for k in args.split()]
593
595
594 def forwardpayload(self, fp):
596 def forwardpayload(self, fp):
595 raise NotImplementedError
597 raise NotImplementedError
596
598
597 @contextlib.contextmanager
599 @contextlib.contextmanager
598 def mayberedirectstdio(self):
600 def mayberedirectstdio(self):
599 raise NotImplementedError
601 raise NotImplementedError
600
602
601 def client(self):
603 def client(self):
602 raise NotImplementedError
604 raise NotImplementedError
603
605
604 def addcapabilities(self, repo, caps):
606 def addcapabilities(self, repo, caps):
605 return caps
607 return caps
606
608
607 def checkperm(self, perm):
609 def checkperm(self, perm):
608 raise NotImplementedError
610 raise NotImplementedError
609
611
610 def _httpresponsetype(ui, req, prefer_uncompressed):
612 def _httpresponsetype(ui, req, prefer_uncompressed):
611 """Determine the appropriate response type and compression settings.
613 """Determine the appropriate response type and compression settings.
612
614
613 Returns a tuple of (mediatype, compengine, engineopts).
615 Returns a tuple of (mediatype, compengine, engineopts).
614 """
616 """
615 # Determine the response media type and compression engine based
617 # Determine the response media type and compression engine based
616 # on the request parameters.
618 # on the request parameters.
617 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
619 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
618
620
619 if '0.2' in protocaps:
621 if '0.2' in protocaps:
620 # All clients are expected to support uncompressed data.
622 # All clients are expected to support uncompressed data.
621 if prefer_uncompressed:
623 if prefer_uncompressed:
622 return HGTYPE2, util._noopengine(), {}
624 return HGTYPE2, util._noopengine(), {}
623
625
624 # Default as defined by wire protocol spec.
626 # Default as defined by wire protocol spec.
625 compformats = ['zlib', 'none']
627 compformats = ['zlib', 'none']
626 for cap in protocaps:
628 for cap in protocaps:
627 if cap.startswith('comp='):
629 if cap.startswith('comp='):
628 compformats = cap[5:].split(',')
630 compformats = cap[5:].split(',')
629 break
631 break
630
632
631 # Now find an agreed upon compression format.
633 # Now find an agreed upon compression format.
632 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
634 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
633 if engine.wireprotosupport().name in compformats:
635 if engine.wireprotosupport().name in compformats:
634 opts = {}
636 opts = {}
635 level = ui.configint('server', '%slevel' % engine.name())
637 level = ui.configint('server', '%slevel' % engine.name())
636 if level is not None:
638 if level is not None:
637 opts['level'] = level
639 opts['level'] = level
638
640
639 return HGTYPE2, engine, opts
641 return HGTYPE2, engine, opts
640
642
641 # No mutually supported compression format. Fall back to the
643 # No mutually supported compression format. Fall back to the
642 # legacy protocol.
644 # legacy protocol.
643
645
644 # Don't allow untrusted settings because disabling compression or
646 # Don't allow untrusted settings because disabling compression or
645 # setting a very high compression level could lead to flooding
647 # setting a very high compression level could lead to flooding
646 # the server's network or CPU.
648 # the server's network or CPU.
647 opts = {'level': ui.configint('server', 'zliblevel')}
649 opts = {'level': ui.configint('server', 'zliblevel')}
648 return HGTYPE, util.compengines['zlib'], opts
650 return HGTYPE, util.compengines['zlib'], opts
649
651
650 def _callhttp(repo, req, res, proto, cmd):
652 def _callhttp(repo, req, res, proto, cmd):
651 # Avoid cycle involving hg module.
653 # Avoid cycle involving hg module.
652 from .hgweb import common as hgwebcommon
654 from .hgweb import common as hgwebcommon
653
655
654 def genversion2(gen, engine, engineopts):
656 def genversion2(gen, engine, engineopts):
655 # application/mercurial-0.2 always sends a payload header
657 # application/mercurial-0.2 always sends a payload header
656 # identifying the compression engine.
658 # identifying the compression engine.
657 name = engine.wireprotosupport().name
659 name = engine.wireprotosupport().name
658 assert 0 < len(name) < 256
660 assert 0 < len(name) < 256
659 yield struct.pack('B', len(name))
661 yield struct.pack('B', len(name))
660 yield name
662 yield name
661
663
662 for chunk in gen:
664 for chunk in gen:
663 yield chunk
665 yield chunk
664
666
665 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
667 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
666 if code == HTTP_OK:
668 if code == HTTP_OK:
667 res.status = '200 Script output follows'
669 res.status = '200 Script output follows'
668 else:
670 else:
669 res.status = hgwebcommon.statusmessage(code)
671 res.status = hgwebcommon.statusmessage(code)
670
672
671 res.headers['Content-Type'] = contenttype
673 res.headers['Content-Type'] = contenttype
672
674
673 if bodybytes is not None:
675 if bodybytes is not None:
674 res.setbodybytes(bodybytes)
676 res.setbodybytes(bodybytes)
675 if bodygen is not None:
677 if bodygen is not None:
676 res.setbodygen(bodygen)
678 res.setbodygen(bodygen)
677
679
678 if not wireproto.commands.commandavailable(cmd, proto):
680 if not wireproto.commands.commandavailable(cmd, proto):
679 setresponse(HTTP_OK, HGERRTYPE,
681 setresponse(HTTP_OK, HGERRTYPE,
680 _('requested wire protocol command is not available over '
682 _('requested wire protocol command is not available over '
681 'HTTP'))
683 'HTTP'))
682 return
684 return
683
685
684 proto.checkperm(wireproto.commands[cmd].permission)
686 proto.checkperm(wireproto.commands[cmd].permission)
685
687
686 rsp = wireproto.dispatch(repo, proto, cmd)
688 rsp = wireproto.dispatch(repo, proto, cmd)
687
689
688 if isinstance(rsp, bytes):
690 if isinstance(rsp, bytes):
689 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
691 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
690 elif isinstance(rsp, wireprototypes.bytesresponse):
692 elif isinstance(rsp, wireprototypes.bytesresponse):
691 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
693 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
692 elif isinstance(rsp, wireprototypes.streamreslegacy):
694 elif isinstance(rsp, wireprototypes.streamreslegacy):
693 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
695 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
694 elif isinstance(rsp, wireprototypes.streamres):
696 elif isinstance(rsp, wireprototypes.streamres):
695 gen = rsp.gen
697 gen = rsp.gen
696
698
697 # This code for compression should not be streamres specific. It
699 # This code for compression should not be streamres specific. It
698 # is here because we only compress streamres at the moment.
700 # is here because we only compress streamres at the moment.
699 mediatype, engine, engineopts = _httpresponsetype(
701 mediatype, engine, engineopts = _httpresponsetype(
700 repo.ui, req, rsp.prefer_uncompressed)
702 repo.ui, req, rsp.prefer_uncompressed)
701 gen = engine.compressstream(gen, engineopts)
703 gen = engine.compressstream(gen, engineopts)
702
704
703 if mediatype == HGTYPE2:
705 if mediatype == HGTYPE2:
704 gen = genversion2(gen, engine, engineopts)
706 gen = genversion2(gen, engine, engineopts)
705
707
706 setresponse(HTTP_OK, mediatype, bodygen=gen)
708 setresponse(HTTP_OK, mediatype, bodygen=gen)
707 elif isinstance(rsp, wireprototypes.pushres):
709 elif isinstance(rsp, wireprototypes.pushres):
708 rsp = '%d\n%s' % (rsp.res, rsp.output)
710 rsp = '%d\n%s' % (rsp.res, rsp.output)
709 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
711 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
710 elif isinstance(rsp, wireprototypes.pusherr):
712 elif isinstance(rsp, wireprototypes.pusherr):
711 rsp = '0\n%s\n' % rsp.res
713 rsp = '0\n%s\n' % rsp.res
712 res.drain = True
714 res.drain = True
713 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
715 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
714 elif isinstance(rsp, wireprototypes.ooberror):
716 elif isinstance(rsp, wireprototypes.ooberror):
715 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
717 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
716 else:
718 else:
717 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
719 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
718
720
719 def _sshv1respondbytes(fout, value):
721 def _sshv1respondbytes(fout, value):
720 """Send a bytes response for protocol version 1."""
722 """Send a bytes response for protocol version 1."""
721 fout.write('%d\n' % len(value))
723 fout.write('%d\n' % len(value))
722 fout.write(value)
724 fout.write(value)
723 fout.flush()
725 fout.flush()
724
726
725 def _sshv1respondstream(fout, source):
727 def _sshv1respondstream(fout, source):
726 write = fout.write
728 write = fout.write
727 for chunk in source.gen:
729 for chunk in source.gen:
728 write(chunk)
730 write(chunk)
729 fout.flush()
731 fout.flush()
730
732
731 def _sshv1respondooberror(fout, ferr, rsp):
733 def _sshv1respondooberror(fout, ferr, rsp):
732 ferr.write(b'%s\n-\n' % rsp)
734 ferr.write(b'%s\n-\n' % rsp)
733 ferr.flush()
735 ferr.flush()
734 fout.write(b'\n')
736 fout.write(b'\n')
735 fout.flush()
737 fout.flush()
736
738
737 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
739 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
738 """Handler for requests services via version 1 of SSH protocol."""
740 """Handler for requests services via version 1 of SSH protocol."""
739 def __init__(self, ui, fin, fout):
741 def __init__(self, ui, fin, fout):
740 self._ui = ui
742 self._ui = ui
741 self._fin = fin
743 self._fin = fin
742 self._fout = fout
744 self._fout = fout
743
745
744 @property
746 @property
745 def name(self):
747 def name(self):
746 return wireprototypes.SSHV1
748 return wireprototypes.SSHV1
747
749
748 def getargs(self, args):
750 def getargs(self, args):
749 data = {}
751 data = {}
750 keys = args.split()
752 keys = args.split()
751 for n in xrange(len(keys)):
753 for n in xrange(len(keys)):
752 argline = self._fin.readline()[:-1]
754 argline = self._fin.readline()[:-1]
753 arg, l = argline.split()
755 arg, l = argline.split()
754 if arg not in keys:
756 if arg not in keys:
755 raise error.Abort(_("unexpected parameter %r") % arg)
757 raise error.Abort(_("unexpected parameter %r") % arg)
756 if arg == '*':
758 if arg == '*':
757 star = {}
759 star = {}
758 for k in xrange(int(l)):
760 for k in xrange(int(l)):
759 argline = self._fin.readline()[:-1]
761 argline = self._fin.readline()[:-1]
760 arg, l = argline.split()
762 arg, l = argline.split()
761 val = self._fin.read(int(l))
763 val = self._fin.read(int(l))
762 star[arg] = val
764 star[arg] = val
763 data['*'] = star
765 data['*'] = star
764 else:
766 else:
765 val = self._fin.read(int(l))
767 val = self._fin.read(int(l))
766 data[arg] = val
768 data[arg] = val
767 return [data[k] for k in keys]
769 return [data[k] for k in keys]
768
770
769 def forwardpayload(self, fpout):
771 def forwardpayload(self, fpout):
770 # We initially send an empty response. This tells the client it is
772 # We initially send an empty response. This tells the client it is
771 # OK to start sending data. If a client sees any other response, it
773 # OK to start sending data. If a client sees any other response, it
772 # interprets it as an error.
774 # interprets it as an error.
773 _sshv1respondbytes(self._fout, b'')
775 _sshv1respondbytes(self._fout, b'')
774
776
775 # The file is in the form:
777 # The file is in the form:
776 #
778 #
777 # <chunk size>\n<chunk>
779 # <chunk size>\n<chunk>
778 # ...
780 # ...
779 # 0\n
781 # 0\n
780 count = int(self._fin.readline())
782 count = int(self._fin.readline())
781 while count:
783 while count:
782 fpout.write(self._fin.read(count))
784 fpout.write(self._fin.read(count))
783 count = int(self._fin.readline())
785 count = int(self._fin.readline())
784
786
785 @contextlib.contextmanager
787 @contextlib.contextmanager
786 def mayberedirectstdio(self):
788 def mayberedirectstdio(self):
787 yield None
789 yield None
788
790
789 def client(self):
791 def client(self):
790 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
792 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
791 return 'remote:ssh:' + client
793 return 'remote:ssh:' + client
792
794
793 def addcapabilities(self, repo, caps):
795 def addcapabilities(self, repo, caps):
794 caps.append(b'batch')
796 caps.append(b'batch')
795 return caps
797 return caps
796
798
797 def checkperm(self, perm):
799 def checkperm(self, perm):
798 pass
800 pass
799
801
800 class sshv2protocolhandler(sshv1protocolhandler):
802 class sshv2protocolhandler(sshv1protocolhandler):
801 """Protocol handler for version 2 of the SSH protocol."""
803 """Protocol handler for version 2 of the SSH protocol."""
802
804
803 @property
805 @property
804 def name(self):
806 def name(self):
805 return wireprototypes.SSHV2
807 return wireprototypes.SSHV2
806
808
807 def _runsshserver(ui, repo, fin, fout, ev):
809 def _runsshserver(ui, repo, fin, fout, ev):
808 # This function operates like a state machine of sorts. The following
810 # This function operates like a state machine of sorts. The following
809 # states are defined:
811 # states are defined:
810 #
812 #
811 # protov1-serving
813 # protov1-serving
812 # Server is in protocol version 1 serving mode. Commands arrive on
814 # Server is in protocol version 1 serving mode. Commands arrive on
813 # new lines. These commands are processed in this state, one command
815 # new lines. These commands are processed in this state, one command
814 # after the other.
816 # after the other.
815 #
817 #
816 # protov2-serving
818 # protov2-serving
817 # Server is in protocol version 2 serving mode.
819 # Server is in protocol version 2 serving mode.
818 #
820 #
819 # upgrade-initial
821 # upgrade-initial
820 # The server is going to process an upgrade request.
822 # The server is going to process an upgrade request.
821 #
823 #
822 # upgrade-v2-filter-legacy-handshake
824 # upgrade-v2-filter-legacy-handshake
823 # The protocol is being upgraded to version 2. The server is expecting
825 # The protocol is being upgraded to version 2. The server is expecting
824 # the legacy handshake from version 1.
826 # the legacy handshake from version 1.
825 #
827 #
826 # upgrade-v2-finish
828 # upgrade-v2-finish
827 # The upgrade to version 2 of the protocol is imminent.
829 # The upgrade to version 2 of the protocol is imminent.
828 #
830 #
829 # shutdown
831 # shutdown
830 # The server is shutting down, possibly in reaction to a client event.
832 # The server is shutting down, possibly in reaction to a client event.
831 #
833 #
832 # And here are their transitions:
834 # And here are their transitions:
833 #
835 #
834 # protov1-serving -> shutdown
836 # protov1-serving -> shutdown
835 # When server receives an empty request or encounters another
837 # When server receives an empty request or encounters another
836 # error.
838 # error.
837 #
839 #
838 # protov1-serving -> upgrade-initial
840 # protov1-serving -> upgrade-initial
839 # An upgrade request line was seen.
841 # An upgrade request line was seen.
840 #
842 #
841 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
843 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
842 # Upgrade to version 2 in progress. Server is expecting to
844 # Upgrade to version 2 in progress. Server is expecting to
843 # process a legacy handshake.
845 # process a legacy handshake.
844 #
846 #
845 # upgrade-v2-filter-legacy-handshake -> shutdown
847 # upgrade-v2-filter-legacy-handshake -> shutdown
846 # Client did not fulfill upgrade handshake requirements.
848 # Client did not fulfill upgrade handshake requirements.
847 #
849 #
848 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
850 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
849 # Client fulfilled version 2 upgrade requirements. Finishing that
851 # Client fulfilled version 2 upgrade requirements. Finishing that
850 # upgrade.
852 # upgrade.
851 #
853 #
852 # upgrade-v2-finish -> protov2-serving
854 # upgrade-v2-finish -> protov2-serving
853 # Protocol upgrade to version 2 complete. Server can now speak protocol
855 # Protocol upgrade to version 2 complete. Server can now speak protocol
854 # version 2.
856 # version 2.
855 #
857 #
856 # protov2-serving -> protov1-serving
858 # protov2-serving -> protov1-serving
857 # Ths happens by default since protocol version 2 is the same as
859 # Ths happens by default since protocol version 2 is the same as
858 # version 1 except for the handshake.
860 # version 1 except for the handshake.
859
861
860 state = 'protov1-serving'
862 state = 'protov1-serving'
861 proto = sshv1protocolhandler(ui, fin, fout)
863 proto = sshv1protocolhandler(ui, fin, fout)
862 protoswitched = False
864 protoswitched = False
863
865
864 while not ev.is_set():
866 while not ev.is_set():
865 if state == 'protov1-serving':
867 if state == 'protov1-serving':
866 # Commands are issued on new lines.
868 # Commands are issued on new lines.
867 request = fin.readline()[:-1]
869 request = fin.readline()[:-1]
868
870
869 # Empty lines signal to terminate the connection.
871 # Empty lines signal to terminate the connection.
870 if not request:
872 if not request:
871 state = 'shutdown'
873 state = 'shutdown'
872 continue
874 continue
873
875
874 # It looks like a protocol upgrade request. Transition state to
876 # It looks like a protocol upgrade request. Transition state to
875 # handle it.
877 # handle it.
876 if request.startswith(b'upgrade '):
878 if request.startswith(b'upgrade '):
877 if protoswitched:
879 if protoswitched:
878 _sshv1respondooberror(fout, ui.ferr,
880 _sshv1respondooberror(fout, ui.ferr,
879 b'cannot upgrade protocols multiple '
881 b'cannot upgrade protocols multiple '
880 b'times')
882 b'times')
881 state = 'shutdown'
883 state = 'shutdown'
882 continue
884 continue
883
885
884 state = 'upgrade-initial'
886 state = 'upgrade-initial'
885 continue
887 continue
886
888
887 available = wireproto.commands.commandavailable(request, proto)
889 available = wireproto.commands.commandavailable(request, proto)
888
890
889 # This command isn't available. Send an empty response and go
891 # This command isn't available. Send an empty response and go
890 # back to waiting for a new command.
892 # back to waiting for a new command.
891 if not available:
893 if not available:
892 _sshv1respondbytes(fout, b'')
894 _sshv1respondbytes(fout, b'')
893 continue
895 continue
894
896
895 rsp = wireproto.dispatch(repo, proto, request)
897 rsp = wireproto.dispatch(repo, proto, request)
896
898
897 if isinstance(rsp, bytes):
899 if isinstance(rsp, bytes):
898 _sshv1respondbytes(fout, rsp)
900 _sshv1respondbytes(fout, rsp)
899 elif isinstance(rsp, wireprototypes.bytesresponse):
901 elif isinstance(rsp, wireprototypes.bytesresponse):
900 _sshv1respondbytes(fout, rsp.data)
902 _sshv1respondbytes(fout, rsp.data)
901 elif isinstance(rsp, wireprototypes.streamres):
903 elif isinstance(rsp, wireprototypes.streamres):
902 _sshv1respondstream(fout, rsp)
904 _sshv1respondstream(fout, rsp)
903 elif isinstance(rsp, wireprototypes.streamreslegacy):
905 elif isinstance(rsp, wireprototypes.streamreslegacy):
904 _sshv1respondstream(fout, rsp)
906 _sshv1respondstream(fout, rsp)
905 elif isinstance(rsp, wireprototypes.pushres):
907 elif isinstance(rsp, wireprototypes.pushres):
906 _sshv1respondbytes(fout, b'')
908 _sshv1respondbytes(fout, b'')
907 _sshv1respondbytes(fout, b'%d' % rsp.res)
909 _sshv1respondbytes(fout, b'%d' % rsp.res)
908 elif isinstance(rsp, wireprototypes.pusherr):
910 elif isinstance(rsp, wireprototypes.pusherr):
909 _sshv1respondbytes(fout, rsp.res)
911 _sshv1respondbytes(fout, rsp.res)
910 elif isinstance(rsp, wireprototypes.ooberror):
912 elif isinstance(rsp, wireprototypes.ooberror):
911 _sshv1respondooberror(fout, ui.ferr, rsp.message)
913 _sshv1respondooberror(fout, ui.ferr, rsp.message)
912 else:
914 else:
913 raise error.ProgrammingError('unhandled response type from '
915 raise error.ProgrammingError('unhandled response type from '
914 'wire protocol command: %s' % rsp)
916 'wire protocol command: %s' % rsp)
915
917
916 # For now, protocol version 2 serving just goes back to version 1.
918 # For now, protocol version 2 serving just goes back to version 1.
917 elif state == 'protov2-serving':
919 elif state == 'protov2-serving':
918 state = 'protov1-serving'
920 state = 'protov1-serving'
919 continue
921 continue
920
922
921 elif state == 'upgrade-initial':
923 elif state == 'upgrade-initial':
922 # We should never transition into this state if we've switched
924 # We should never transition into this state if we've switched
923 # protocols.
925 # protocols.
924 assert not protoswitched
926 assert not protoswitched
925 assert proto.name == wireprototypes.SSHV1
927 assert proto.name == wireprototypes.SSHV1
926
928
927 # Expected: upgrade <token> <capabilities>
929 # Expected: upgrade <token> <capabilities>
928 # If we get something else, the request is malformed. It could be
930 # If we get something else, the request is malformed. It could be
929 # from a future client that has altered the upgrade line content.
931 # from a future client that has altered the upgrade line content.
930 # We treat this as an unknown command.
932 # We treat this as an unknown command.
931 try:
933 try:
932 token, caps = request.split(b' ')[1:]
934 token, caps = request.split(b' ')[1:]
933 except ValueError:
935 except ValueError:
934 _sshv1respondbytes(fout, b'')
936 _sshv1respondbytes(fout, b'')
935 state = 'protov1-serving'
937 state = 'protov1-serving'
936 continue
938 continue
937
939
938 # Send empty response if we don't support upgrading protocols.
940 # Send empty response if we don't support upgrading protocols.
939 if not ui.configbool('experimental', 'sshserver.support-v2'):
941 if not ui.configbool('experimental', 'sshserver.support-v2'):
940 _sshv1respondbytes(fout, b'')
942 _sshv1respondbytes(fout, b'')
941 state = 'protov1-serving'
943 state = 'protov1-serving'
942 continue
944 continue
943
945
944 try:
946 try:
945 caps = urlreq.parseqs(caps)
947 caps = urlreq.parseqs(caps)
946 except ValueError:
948 except ValueError:
947 _sshv1respondbytes(fout, b'')
949 _sshv1respondbytes(fout, b'')
948 state = 'protov1-serving'
950 state = 'protov1-serving'
949 continue
951 continue
950
952
951 # We don't see an upgrade request to protocol version 2. Ignore
953 # We don't see an upgrade request to protocol version 2. Ignore
952 # the upgrade request.
954 # the upgrade request.
953 wantedprotos = caps.get(b'proto', [b''])[0]
955 wantedprotos = caps.get(b'proto', [b''])[0]
954 if SSHV2 not in wantedprotos:
956 if SSHV2 not in wantedprotos:
955 _sshv1respondbytes(fout, b'')
957 _sshv1respondbytes(fout, b'')
956 state = 'protov1-serving'
958 state = 'protov1-serving'
957 continue
959 continue
958
960
959 # It looks like we can honor this upgrade request to protocol 2.
961 # It looks like we can honor this upgrade request to protocol 2.
960 # Filter the rest of the handshake protocol request lines.
962 # Filter the rest of the handshake protocol request lines.
961 state = 'upgrade-v2-filter-legacy-handshake'
963 state = 'upgrade-v2-filter-legacy-handshake'
962 continue
964 continue
963
965
964 elif state == 'upgrade-v2-filter-legacy-handshake':
966 elif state == 'upgrade-v2-filter-legacy-handshake':
965 # Client should have sent legacy handshake after an ``upgrade``
967 # Client should have sent legacy handshake after an ``upgrade``
966 # request. Expected lines:
968 # request. Expected lines:
967 #
969 #
968 # hello
970 # hello
969 # between
971 # between
970 # pairs 81
972 # pairs 81
971 # 0000...-0000...
973 # 0000...-0000...
972
974
973 ok = True
975 ok = True
974 for line in (b'hello', b'between', b'pairs 81'):
976 for line in (b'hello', b'between', b'pairs 81'):
975 request = fin.readline()[:-1]
977 request = fin.readline()[:-1]
976
978
977 if request != line:
979 if request != line:
978 _sshv1respondooberror(fout, ui.ferr,
980 _sshv1respondooberror(fout, ui.ferr,
979 b'malformed handshake protocol: '
981 b'malformed handshake protocol: '
980 b'missing %s' % line)
982 b'missing %s' % line)
981 ok = False
983 ok = False
982 state = 'shutdown'
984 state = 'shutdown'
983 break
985 break
984
986
985 if not ok:
987 if not ok:
986 continue
988 continue
987
989
988 request = fin.read(81)
990 request = fin.read(81)
989 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
991 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
990 _sshv1respondooberror(fout, ui.ferr,
992 _sshv1respondooberror(fout, ui.ferr,
991 b'malformed handshake protocol: '
993 b'malformed handshake protocol: '
992 b'missing between argument value')
994 b'missing between argument value')
993 state = 'shutdown'
995 state = 'shutdown'
994 continue
996 continue
995
997
996 state = 'upgrade-v2-finish'
998 state = 'upgrade-v2-finish'
997 continue
999 continue
998
1000
999 elif state == 'upgrade-v2-finish':
1001 elif state == 'upgrade-v2-finish':
1000 # Send the upgrade response.
1002 # Send the upgrade response.
1001 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1003 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1002 servercaps = wireproto.capabilities(repo, proto)
1004 servercaps = wireproto.capabilities(repo, proto)
1003 rsp = b'capabilities: %s' % servercaps.data
1005 rsp = b'capabilities: %s' % servercaps.data
1004 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1006 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1005 fout.flush()
1007 fout.flush()
1006
1008
1007 proto = sshv2protocolhandler(ui, fin, fout)
1009 proto = sshv2protocolhandler(ui, fin, fout)
1008 protoswitched = True
1010 protoswitched = True
1009
1011
1010 state = 'protov2-serving'
1012 state = 'protov2-serving'
1011 continue
1013 continue
1012
1014
1013 elif state == 'shutdown':
1015 elif state == 'shutdown':
1014 break
1016 break
1015
1017
1016 else:
1018 else:
1017 raise error.ProgrammingError('unhandled ssh server state: %s' %
1019 raise error.ProgrammingError('unhandled ssh server state: %s' %
1018 state)
1020 state)
1019
1021
1020 class sshserver(object):
1022 class sshserver(object):
1021 def __init__(self, ui, repo, logfh=None):
1023 def __init__(self, ui, repo, logfh=None):
1022 self._ui = ui
1024 self._ui = ui
1023 self._repo = repo
1025 self._repo = repo
1024 self._fin = ui.fin
1026 self._fin = ui.fin
1025 self._fout = ui.fout
1027 self._fout = ui.fout
1026
1028
1027 # Log write I/O to stdout and stderr if configured.
1029 # Log write I/O to stdout and stderr if configured.
1028 if logfh:
1030 if logfh:
1029 self._fout = util.makeloggingfileobject(
1031 self._fout = util.makeloggingfileobject(
1030 logfh, self._fout, 'o', logdata=True)
1032 logfh, self._fout, 'o', logdata=True)
1031 ui.ferr = util.makeloggingfileobject(
1033 ui.ferr = util.makeloggingfileobject(
1032 logfh, ui.ferr, 'e', logdata=True)
1034 logfh, ui.ferr, 'e', logdata=True)
1033
1035
1034 hook.redirect(True)
1036 hook.redirect(True)
1035 ui.fout = repo.ui.fout = ui.ferr
1037 ui.fout = repo.ui.fout = ui.ferr
1036
1038
1037 # Prevent insertion/deletion of CRs
1039 # Prevent insertion/deletion of CRs
1038 procutil.setbinary(self._fin)
1040 procutil.setbinary(self._fin)
1039 procutil.setbinary(self._fout)
1041 procutil.setbinary(self._fout)
1040
1042
1041 def serve_forever(self):
1043 def serve_forever(self):
1042 self.serveuntil(threading.Event())
1044 self.serveuntil(threading.Event())
1043 sys.exit(0)
1045 sys.exit(0)
1044
1046
1045 def serveuntil(self, ev):
1047 def serveuntil(self, ev):
1046 """Serve until a threading.Event is set."""
1048 """Serve until a threading.Event is set."""
1047 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
1049 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,639 +1,678
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial import (
5 from mercurial import (
6 util,
6 util,
7 wireprotoframing as framing,
7 wireprotoframing as framing,
8 )
8 )
9
9
10 ffs = framing.makeframefromhumanstring
10 ffs = framing.makeframefromhumanstring
11
11
12 def makereactor(deferoutput=False):
12 def makereactor(deferoutput=False):
13 return framing.serverreactor(deferoutput=deferoutput)
13 return framing.serverreactor(deferoutput=deferoutput)
14
14
15 def sendframes(reactor, gen):
15 def sendframes(reactor, gen):
16 """Send a generator of frame bytearray to a reactor.
16 """Send a generator of frame bytearray to a reactor.
17
17
18 Emits a generator of results from ``onframerecv()`` calls.
18 Emits a generator of results from ``onframerecv()`` calls.
19 """
19 """
20 for frame in gen:
20 for frame in gen:
21 header = framing.parseheader(frame)
21 header = framing.parseheader(frame)
22 payload = frame[framing.FRAME_HEADER_SIZE:]
22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 assert len(payload) == header.length
23 assert len(payload) == header.length
24
24
25 yield reactor.onframerecv(framing.frame(header.requestid,
25 yield reactor.onframerecv(framing.frame(header.requestid,
26 header.typeid,
26 header.typeid,
27 header.flags,
27 header.flags,
28 payload))
28 payload))
29
29
30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
30 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
31 """Generate frames to run a command and send them to a reactor."""
31 """Generate frames to run a command and send them to a reactor."""
32 return sendframes(reactor,
32 return sendframes(reactor,
33 framing.createcommandframes(rid, cmd, args, datafh))
33 framing.createcommandframes(stream, rid, cmd, args,
34 datafh))
34
35
35 class FrameTests(unittest.TestCase):
36 class FrameTests(unittest.TestCase):
36 def testdataexactframesize(self):
37 def testdataexactframesize(self):
37 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
38 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
38
39
39 frames = list(framing.createcommandframes(1, b'command', {}, data))
40 stream = framing.stream()
41 frames = list(framing.createcommandframes(stream, 1, b'command',
42 {}, data))
40 self.assertEqual(frames, [
43 self.assertEqual(frames, [
41 ffs(b'1 command-name have-data command'),
44 ffs(b'1 command-name have-data command'),
42 ffs(b'1 command-data continuation %s' % data.getvalue()),
45 ffs(b'1 command-data continuation %s' % data.getvalue()),
43 ffs(b'1 command-data eos ')
46 ffs(b'1 command-data eos ')
44 ])
47 ])
45
48
46 def testdatamultipleframes(self):
49 def testdatamultipleframes(self):
47 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
50 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
48 frames = list(framing.createcommandframes(1, b'command', {}, data))
51
52 stream = framing.stream()
53 frames = list(framing.createcommandframes(stream, 1, b'command', {},
54 data))
49 self.assertEqual(frames, [
55 self.assertEqual(frames, [
50 ffs(b'1 command-name have-data command'),
56 ffs(b'1 command-name have-data command'),
51 ffs(b'1 command-data continuation %s' % (
57 ffs(b'1 command-data continuation %s' % (
52 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
58 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
53 ffs(b'1 command-data eos x'),
59 ffs(b'1 command-data eos x'),
54 ])
60 ])
55
61
56 def testargsanddata(self):
62 def testargsanddata(self):
57 data = util.bytesio(b'x' * 100)
63 data = util.bytesio(b'x' * 100)
58
64
59 frames = list(framing.createcommandframes(1, b'command', {
65 stream = framing.stream()
66 frames = list(framing.createcommandframes(stream, 1, b'command', {
60 b'key1': b'key1value',
67 b'key1': b'key1value',
61 b'key2': b'key2value',
68 b'key2': b'key2value',
62 b'key3': b'key3value',
69 b'key3': b'key3value',
63 }, data))
70 }, data))
64
71
65 self.assertEqual(frames, [
72 self.assertEqual(frames, [
66 ffs(b'1 command-name have-args|have-data command'),
73 ffs(b'1 command-name have-args|have-data command'),
67 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
74 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
68 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
75 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
69 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
76 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
70 ffs(b'1 command-data eos %s' % data.getvalue()),
77 ffs(b'1 command-data eos %s' % data.getvalue()),
71 ])
78 ])
72
79
73 def testtextoutputexcessiveargs(self):
80 def testtextoutputexcessiveargs(self):
74 """At most 255 formatting arguments are allowed."""
81 """At most 255 formatting arguments are allowed."""
75 with self.assertRaisesRegexp(ValueError,
82 with self.assertRaisesRegexp(ValueError,
76 'cannot use more than 255 formatting'):
83 'cannot use more than 255 formatting'):
77 args = [b'x' for i in range(256)]
84 args = [b'x' for i in range(256)]
78 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
85 list(framing.createtextoutputframe(None, 1,
86 [(b'bleh', args, [])]))
79
87
80 def testtextoutputexcessivelabels(self):
88 def testtextoutputexcessivelabels(self):
81 """At most 255 labels are allowed."""
89 """At most 255 labels are allowed."""
82 with self.assertRaisesRegexp(ValueError,
90 with self.assertRaisesRegexp(ValueError,
83 'cannot use more than 255 labels'):
91 'cannot use more than 255 labels'):
84 labels = [b'l' for i in range(256)]
92 labels = [b'l' for i in range(256)]
85 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
93 list(framing.createtextoutputframe(None, 1,
94 [(b'bleh', [], labels)]))
86
95
87 def testtextoutputformattingstringtype(self):
96 def testtextoutputformattingstringtype(self):
88 """Formatting string must be bytes."""
97 """Formatting string must be bytes."""
89 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
98 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
90 list(framing.createtextoutputframe(1, [
99 list(framing.createtextoutputframe(None, 1, [
91 (b'foo'.decode('ascii'), [], [])]))
100 (b'foo'.decode('ascii'), [], [])]))
92
101
93 def testtextoutputargumentbytes(self):
102 def testtextoutputargumentbytes(self):
94 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
103 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
95 list(framing.createtextoutputframe(1, [
104 list(framing.createtextoutputframe(None, 1, [
96 (b'foo', [b'foo'.decode('ascii')], [])]))
105 (b'foo', [b'foo'.decode('ascii')], [])]))
97
106
98 def testtextoutputlabelbytes(self):
107 def testtextoutputlabelbytes(self):
99 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
108 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
100 list(framing.createtextoutputframe(1, [
109 list(framing.createtextoutputframe(None, 1, [
101 (b'foo', [], [b'foo'.decode('ascii')])]))
110 (b'foo', [], [b'foo'.decode('ascii')])]))
102
111
103 def testtextoutputtoolongformatstring(self):
112 def testtextoutputtoolongformatstring(self):
104 with self.assertRaisesRegexp(ValueError,
113 with self.assertRaisesRegexp(ValueError,
105 'formatting string cannot be longer than'):
114 'formatting string cannot be longer than'):
106 list(framing.createtextoutputframe(1, [
115 list(framing.createtextoutputframe(None, 1, [
107 (b'x' * 65536, [], [])]))
116 (b'x' * 65536, [], [])]))
108
117
109 def testtextoutputtoolongargumentstring(self):
118 def testtextoutputtoolongargumentstring(self):
110 with self.assertRaisesRegexp(ValueError,
119 with self.assertRaisesRegexp(ValueError,
111 'argument string cannot be longer than'):
120 'argument string cannot be longer than'):
112 list(framing.createtextoutputframe(1, [
121 list(framing.createtextoutputframe(None, 1, [
113 (b'bleh', [b'x' * 65536], [])]))
122 (b'bleh', [b'x' * 65536], [])]))
114
123
115 def testtextoutputtoolonglabelstring(self):
124 def testtextoutputtoolonglabelstring(self):
116 with self.assertRaisesRegexp(ValueError,
125 with self.assertRaisesRegexp(ValueError,
117 'label string cannot be longer than'):
126 'label string cannot be longer than'):
118 list(framing.createtextoutputframe(1, [
127 list(framing.createtextoutputframe(None, 1, [
119 (b'bleh', [], [b'x' * 65536])]))
128 (b'bleh', [], [b'x' * 65536])]))
120
129
121 def testtextoutput1simpleatom(self):
130 def testtextoutput1simpleatom(self):
122 val = list(framing.createtextoutputframe(1, [
131 stream = framing.stream()
132 val = list(framing.createtextoutputframe(stream, 1, [
123 (b'foo', [], [])]))
133 (b'foo', [], [])]))
124
134
125 self.assertEqual(val, [
135 self.assertEqual(val, [
126 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
136 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
127 ])
137 ])
128
138
129 def testtextoutput2simpleatoms(self):
139 def testtextoutput2simpleatoms(self):
130 val = list(framing.createtextoutputframe(1, [
140 stream = framing.stream()
141 val = list(framing.createtextoutputframe(stream, 1, [
131 (b'foo', [], []),
142 (b'foo', [], []),
132 (b'bar', [], []),
143 (b'bar', [], []),
133 ]))
144 ]))
134
145
135 self.assertEqual(val, [
146 self.assertEqual(val, [
136 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
147 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
137 ])
148 ])
138
149
139 def testtextoutput1arg(self):
150 def testtextoutput1arg(self):
140 val = list(framing.createtextoutputframe(1, [
151 stream = framing.stream()
152 val = list(framing.createtextoutputframe(stream, 1, [
141 (b'foo %s', [b'val1'], []),
153 (b'foo %s', [b'val1'], []),
142 ]))
154 ]))
143
155
144 self.assertEqual(val, [
156 self.assertEqual(val, [
145 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
157 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
146 ])
158 ])
147
159
148 def testtextoutput2arg(self):
160 def testtextoutput2arg(self):
149 val = list(framing.createtextoutputframe(1, [
161 stream = framing.stream()
162 val = list(framing.createtextoutputframe(stream, 1, [
150 (b'foo %s %s', [b'val', b'value'], []),
163 (b'foo %s %s', [b'val', b'value'], []),
151 ]))
164 ]))
152
165
153 self.assertEqual(val, [
166 self.assertEqual(val, [
154 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
167 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
155 br'foo %s %svalvalue'),
168 br'foo %s %svalvalue'),
156 ])
169 ])
157
170
158 def testtextoutput1label(self):
171 def testtextoutput1label(self):
159 val = list(framing.createtextoutputframe(1, [
172 stream = framing.stream()
173 val = list(framing.createtextoutputframe(stream, 1, [
160 (b'foo', [], [b'label']),
174 (b'foo', [], [b'label']),
161 ]))
175 ]))
162
176
163 self.assertEqual(val, [
177 self.assertEqual(val, [
164 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
178 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
165 ])
179 ])
166
180
167 def testargandlabel(self):
181 def testargandlabel(self):
168 val = list(framing.createtextoutputframe(1, [
182 stream = framing.stream()
183 val = list(framing.createtextoutputframe(stream, 1, [
169 (b'foo %s', [b'arg'], [b'label']),
184 (b'foo %s', [b'arg'], [b'label']),
170 ]))
185 ]))
171
186
172 self.assertEqual(val, [
187 self.assertEqual(val, [
173 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
188 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
174 ])
189 ])
175
190
176 class ServerReactorTests(unittest.TestCase):
191 class ServerReactorTests(unittest.TestCase):
177 def _sendsingleframe(self, reactor, f):
192 def _sendsingleframe(self, reactor, f):
178 results = list(sendframes(reactor, [f]))
193 results = list(sendframes(reactor, [f]))
179 self.assertEqual(len(results), 1)
194 self.assertEqual(len(results), 1)
180
195
181 return results[0]
196 return results[0]
182
197
183 def assertaction(self, res, expected):
198 def assertaction(self, res, expected):
184 self.assertIsInstance(res, tuple)
199 self.assertIsInstance(res, tuple)
185 self.assertEqual(len(res), 2)
200 self.assertEqual(len(res), 2)
186 self.assertIsInstance(res[1], dict)
201 self.assertIsInstance(res[1], dict)
187 self.assertEqual(res[0], expected)
202 self.assertEqual(res[0], expected)
188
203
189 def assertframesequal(self, frames, framestrings):
204 def assertframesequal(self, frames, framestrings):
190 expected = [ffs(s) for s in framestrings]
205 expected = [ffs(s) for s in framestrings]
191 self.assertEqual(list(frames), expected)
206 self.assertEqual(list(frames), expected)
192
207
193 def test1framecommand(self):
208 def test1framecommand(self):
194 """Receiving a command in a single frame yields request to run it."""
209 """Receiving a command in a single frame yields request to run it."""
195 reactor = makereactor()
210 reactor = makereactor()
196 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
211 stream = framing.stream()
212 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
197 self.assertEqual(len(results), 1)
213 self.assertEqual(len(results), 1)
198 self.assertaction(results[0], 'runcommand')
214 self.assertaction(results[0], 'runcommand')
199 self.assertEqual(results[0][1], {
215 self.assertEqual(results[0][1], {
200 'requestid': 1,
216 'requestid': 1,
201 'command': b'mycommand',
217 'command': b'mycommand',
202 'args': {},
218 'args': {},
203 'data': None,
219 'data': None,
204 })
220 })
205
221
206 result = reactor.oninputeof()
222 result = reactor.oninputeof()
207 self.assertaction(result, 'noop')
223 self.assertaction(result, 'noop')
208
224
209 def test1argument(self):
225 def test1argument(self):
210 reactor = makereactor()
226 reactor = makereactor()
211 results = list(sendcommandframes(reactor, 41, b'mycommand',
227 stream = framing.stream()
228 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
212 {b'foo': b'bar'}))
229 {b'foo': b'bar'}))
213 self.assertEqual(len(results), 2)
230 self.assertEqual(len(results), 2)
214 self.assertaction(results[0], 'wantframe')
231 self.assertaction(results[0], 'wantframe')
215 self.assertaction(results[1], 'runcommand')
232 self.assertaction(results[1], 'runcommand')
216 self.assertEqual(results[1][1], {
233 self.assertEqual(results[1][1], {
217 'requestid': 41,
234 'requestid': 41,
218 'command': b'mycommand',
235 'command': b'mycommand',
219 'args': {b'foo': b'bar'},
236 'args': {b'foo': b'bar'},
220 'data': None,
237 'data': None,
221 })
238 })
222
239
223 def testmultiarguments(self):
240 def testmultiarguments(self):
224 reactor = makereactor()
241 reactor = makereactor()
225 results = list(sendcommandframes(reactor, 1, b'mycommand',
242 stream = framing.stream()
243 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
226 {b'foo': b'bar', b'biz': b'baz'}))
244 {b'foo': b'bar', b'biz': b'baz'}))
227 self.assertEqual(len(results), 3)
245 self.assertEqual(len(results), 3)
228 self.assertaction(results[0], 'wantframe')
246 self.assertaction(results[0], 'wantframe')
229 self.assertaction(results[1], 'wantframe')
247 self.assertaction(results[1], 'wantframe')
230 self.assertaction(results[2], 'runcommand')
248 self.assertaction(results[2], 'runcommand')
231 self.assertEqual(results[2][1], {
249 self.assertEqual(results[2][1], {
232 'requestid': 1,
250 'requestid': 1,
233 'command': b'mycommand',
251 'command': b'mycommand',
234 'args': {b'foo': b'bar', b'biz': b'baz'},
252 'args': {b'foo': b'bar', b'biz': b'baz'},
235 'data': None,
253 'data': None,
236 })
254 })
237
255
238 def testsimplecommanddata(self):
256 def testsimplecommanddata(self):
239 reactor = makereactor()
257 reactor = makereactor()
240 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
258 stream = framing.stream()
259 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
241 util.bytesio(b'data!')))
260 util.bytesio(b'data!')))
242 self.assertEqual(len(results), 2)
261 self.assertEqual(len(results), 2)
243 self.assertaction(results[0], 'wantframe')
262 self.assertaction(results[0], 'wantframe')
244 self.assertaction(results[1], 'runcommand')
263 self.assertaction(results[1], 'runcommand')
245 self.assertEqual(results[1][1], {
264 self.assertEqual(results[1][1], {
246 'requestid': 1,
265 'requestid': 1,
247 'command': b'mycommand',
266 'command': b'mycommand',
248 'args': {},
267 'args': {},
249 'data': b'data!',
268 'data': b'data!',
250 })
269 })
251
270
252 def testmultipledataframes(self):
271 def testmultipledataframes(self):
253 frames = [
272 frames = [
254 ffs(b'1 command-name have-data mycommand'),
273 ffs(b'1 command-name have-data mycommand'),
255 ffs(b'1 command-data continuation data1'),
274 ffs(b'1 command-data continuation data1'),
256 ffs(b'1 command-data continuation data2'),
275 ffs(b'1 command-data continuation data2'),
257 ffs(b'1 command-data eos data3'),
276 ffs(b'1 command-data eos data3'),
258 ]
277 ]
259
278
260 reactor = makereactor()
279 reactor = makereactor()
261 results = list(sendframes(reactor, frames))
280 results = list(sendframes(reactor, frames))
262 self.assertEqual(len(results), 4)
281 self.assertEqual(len(results), 4)
263 for i in range(3):
282 for i in range(3):
264 self.assertaction(results[i], 'wantframe')
283 self.assertaction(results[i], 'wantframe')
265 self.assertaction(results[3], 'runcommand')
284 self.assertaction(results[3], 'runcommand')
266 self.assertEqual(results[3][1], {
285 self.assertEqual(results[3][1], {
267 'requestid': 1,
286 'requestid': 1,
268 'command': b'mycommand',
287 'command': b'mycommand',
269 'args': {},
288 'args': {},
270 'data': b'data1data2data3',
289 'data': b'data1data2data3',
271 })
290 })
272
291
273 def testargumentanddata(self):
292 def testargumentanddata(self):
274 frames = [
293 frames = [
275 ffs(b'1 command-name have-args|have-data command'),
294 ffs(b'1 command-name have-args|have-data command'),
276 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
295 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
277 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
296 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
278 ffs(b'1 command-data continuation value1'),
297 ffs(b'1 command-data continuation value1'),
279 ffs(b'1 command-data eos value2'),
298 ffs(b'1 command-data eos value2'),
280 ]
299 ]
281
300
282 reactor = makereactor()
301 reactor = makereactor()
283 results = list(sendframes(reactor, frames))
302 results = list(sendframes(reactor, frames))
284
303
285 self.assertaction(results[-1], 'runcommand')
304 self.assertaction(results[-1], 'runcommand')
286 self.assertEqual(results[-1][1], {
305 self.assertEqual(results[-1][1], {
287 'requestid': 1,
306 'requestid': 1,
288 'command': b'command',
307 'command': b'command',
289 'args': {
308 'args': {
290 b'key': b'val',
309 b'key': b'val',
291 b'foo': b'bar',
310 b'foo': b'bar',
292 },
311 },
293 'data': b'value1value2',
312 'data': b'value1value2',
294 })
313 })
295
314
296 def testunexpectedcommandargument(self):
315 def testunexpectedcommandargument(self):
297 """Command argument frame when not running a command is an error."""
316 """Command argument frame when not running a command is an error."""
298 result = self._sendsingleframe(makereactor(),
317 result = self._sendsingleframe(makereactor(),
299 ffs(b'1 command-argument 0 ignored'))
318 ffs(b'1 command-argument 0 ignored'))
300 self.assertaction(result, 'error')
319 self.assertaction(result, 'error')
301 self.assertEqual(result[1], {
320 self.assertEqual(result[1], {
302 'message': b'expected command frame; got 2',
321 'message': b'expected command frame; got 2',
303 })
322 })
304
323
305 def testunexpectedcommandargumentreceiving(self):
324 def testunexpectedcommandargumentreceiving(self):
306 """Same as above but the command is receiving."""
325 """Same as above but the command is receiving."""
307 results = list(sendframes(makereactor(), [
326 results = list(sendframes(makereactor(), [
308 ffs(b'1 command-name have-data command'),
327 ffs(b'1 command-name have-data command'),
309 ffs(b'1 command-argument eoa ignored'),
328 ffs(b'1 command-argument eoa ignored'),
310 ]))
329 ]))
311
330
312 self.assertaction(results[1], 'error')
331 self.assertaction(results[1], 'error')
313 self.assertEqual(results[1][1], {
332 self.assertEqual(results[1][1], {
314 'message': b'received command argument frame for request that is '
333 'message': b'received command argument frame for request that is '
315 b'not expecting arguments: 1',
334 b'not expecting arguments: 1',
316 })
335 })
317
336
318 def testunexpectedcommanddata(self):
337 def testunexpectedcommanddata(self):
319 """Command argument frame when not running a command is an error."""
338 """Command argument frame when not running a command is an error."""
320 result = self._sendsingleframe(makereactor(),
339 result = self._sendsingleframe(makereactor(),
321 ffs(b'1 command-data 0 ignored'))
340 ffs(b'1 command-data 0 ignored'))
322 self.assertaction(result, 'error')
341 self.assertaction(result, 'error')
323 self.assertEqual(result[1], {
342 self.assertEqual(result[1], {
324 'message': b'expected command frame; got 3',
343 'message': b'expected command frame; got 3',
325 })
344 })
326
345
327 def testunexpectedcommanddatareceiving(self):
346 def testunexpectedcommanddatareceiving(self):
328 """Same as above except the command is receiving."""
347 """Same as above except the command is receiving."""
329 results = list(sendframes(makereactor(), [
348 results = list(sendframes(makereactor(), [
330 ffs(b'1 command-name have-args command'),
349 ffs(b'1 command-name have-args command'),
331 ffs(b'1 command-data eos ignored'),
350 ffs(b'1 command-data eos ignored'),
332 ]))
351 ]))
333
352
334 self.assertaction(results[1], 'error')
353 self.assertaction(results[1], 'error')
335 self.assertEqual(results[1][1], {
354 self.assertEqual(results[1][1], {
336 'message': b'received command data frame for request that is not '
355 'message': b'received command data frame for request that is not '
337 b'expecting data: 1',
356 b'expecting data: 1',
338 })
357 })
339
358
340 def testmissingcommandframeflags(self):
359 def testmissingcommandframeflags(self):
341 """Command name frame must have flags set."""
360 """Command name frame must have flags set."""
342 result = self._sendsingleframe(makereactor(),
361 result = self._sendsingleframe(makereactor(),
343 ffs(b'1 command-name 0 command'))
362 ffs(b'1 command-name 0 command'))
344 self.assertaction(result, 'error')
363 self.assertaction(result, 'error')
345 self.assertEqual(result[1], {
364 self.assertEqual(result[1], {
346 'message': b'missing frame flags on command frame',
365 'message': b'missing frame flags on command frame',
347 })
366 })
348
367
349 def testconflictingrequestidallowed(self):
368 def testconflictingrequestidallowed(self):
350 """Multiple fully serviced commands with same request ID is allowed."""
369 """Multiple fully serviced commands with same request ID is allowed."""
351 reactor = makereactor()
370 reactor = makereactor()
352 results = []
371 results = []
372 outstream = framing.stream()
353 results.append(self._sendsingleframe(
373 results.append(self._sendsingleframe(
354 reactor, ffs(b'1 command-name eos command')))
374 reactor, ffs(b'1 command-name eos command')))
355 result = reactor.onbytesresponseready(1, b'response1')
375 result = reactor.onbytesresponseready(outstream, 1, b'response1')
356 self.assertaction(result, 'sendframes')
376 self.assertaction(result, 'sendframes')
357 list(result[1]['framegen'])
377 list(result[1]['framegen'])
358 results.append(self._sendsingleframe(
378 results.append(self._sendsingleframe(
359 reactor, ffs(b'1 command-name eos command')))
379 reactor, ffs(b'1 command-name eos command')))
360 result = reactor.onbytesresponseready(1, b'response2')
380 result = reactor.onbytesresponseready(outstream, 1, b'response2')
361 self.assertaction(result, 'sendframes')
381 self.assertaction(result, 'sendframes')
362 list(result[1]['framegen'])
382 list(result[1]['framegen'])
363 results.append(self._sendsingleframe(
383 results.append(self._sendsingleframe(
364 reactor, ffs(b'1 command-name eos command')))
384 reactor, ffs(b'1 command-name eos command')))
365 result = reactor.onbytesresponseready(1, b'response3')
385 result = reactor.onbytesresponseready(outstream, 1, b'response3')
366 self.assertaction(result, 'sendframes')
386 self.assertaction(result, 'sendframes')
367 list(result[1]['framegen'])
387 list(result[1]['framegen'])
368
388
369 for i in range(3):
389 for i in range(3):
370 self.assertaction(results[i], 'runcommand')
390 self.assertaction(results[i], 'runcommand')
371 self.assertEqual(results[i][1], {
391 self.assertEqual(results[i][1], {
372 'requestid': 1,
392 'requestid': 1,
373 'command': b'command',
393 'command': b'command',
374 'args': {},
394 'args': {},
375 'data': None,
395 'data': None,
376 })
396 })
377
397
378 def testconflictingrequestid(self):
398 def testconflictingrequestid(self):
379 """Request ID for new command matching in-flight command is illegal."""
399 """Request ID for new command matching in-flight command is illegal."""
380 results = list(sendframes(makereactor(), [
400 results = list(sendframes(makereactor(), [
381 ffs(b'1 command-name have-args command'),
401 ffs(b'1 command-name have-args command'),
382 ffs(b'1 command-name eos command'),
402 ffs(b'1 command-name eos command'),
383 ]))
403 ]))
384
404
385 self.assertaction(results[0], 'wantframe')
405 self.assertaction(results[0], 'wantframe')
386 self.assertaction(results[1], 'error')
406 self.assertaction(results[1], 'error')
387 self.assertEqual(results[1][1], {
407 self.assertEqual(results[1][1], {
388 'message': b'request with ID 1 already received',
408 'message': b'request with ID 1 already received',
389 })
409 })
390
410
391 def testinterleavedcommands(self):
411 def testinterleavedcommands(self):
392 results = list(sendframes(makereactor(), [
412 results = list(sendframes(makereactor(), [
393 ffs(b'1 command-name have-args command1'),
413 ffs(b'1 command-name have-args command1'),
394 ffs(b'3 command-name have-args command3'),
414 ffs(b'3 command-name have-args command3'),
395 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
415 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
396 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
416 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
397 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
417 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
398 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
418 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
399 ]))
419 ]))
400
420
401 self.assertEqual([t[0] for t in results], [
421 self.assertEqual([t[0] for t in results], [
402 'wantframe',
422 'wantframe',
403 'wantframe',
423 'wantframe',
404 'wantframe',
424 'wantframe',
405 'wantframe',
425 'wantframe',
406 'runcommand',
426 'runcommand',
407 'runcommand',
427 'runcommand',
408 ])
428 ])
409
429
410 self.assertEqual(results[4][1], {
430 self.assertEqual(results[4][1], {
411 'requestid': 3,
431 'requestid': 3,
412 'command': 'command3',
432 'command': 'command3',
413 'args': {b'biz': b'baz', b'key': b'val'},
433 'args': {b'biz': b'baz', b'key': b'val'},
414 'data': None,
434 'data': None,
415 })
435 })
416 self.assertEqual(results[5][1], {
436 self.assertEqual(results[5][1], {
417 'requestid': 1,
437 'requestid': 1,
418 'command': 'command1',
438 'command': 'command1',
419 'args': {b'foo': b'bar', b'key1': b'val'},
439 'args': {b'foo': b'bar', b'key1': b'val'},
420 'data': None,
440 'data': None,
421 })
441 })
422
442
423 def testmissingargumentframe(self):
443 def testmissingargumentframe(self):
424 # This test attempts to test behavior when reactor has an incomplete
444 # This test attempts to test behavior when reactor has an incomplete
425 # command request waiting on argument data. But it doesn't handle that
445 # command request waiting on argument data. But it doesn't handle that
426 # scenario yet. So this test does nothing of value.
446 # scenario yet. So this test does nothing of value.
427 frames = [
447 frames = [
428 ffs(b'1 command-name have-args command'),
448 ffs(b'1 command-name have-args command'),
429 ]
449 ]
430
450
431 results = list(sendframes(makereactor(), frames))
451 results = list(sendframes(makereactor(), frames))
432 self.assertaction(results[0], 'wantframe')
452 self.assertaction(results[0], 'wantframe')
433
453
434 def testincompleteargumentname(self):
454 def testincompleteargumentname(self):
435 """Argument frame with incomplete name."""
455 """Argument frame with incomplete name."""
436 frames = [
456 frames = [
437 ffs(b'1 command-name have-args command1'),
457 ffs(b'1 command-name have-args command1'),
438 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
458 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
439 ]
459 ]
440
460
441 results = list(sendframes(makereactor(), frames))
461 results = list(sendframes(makereactor(), frames))
442 self.assertEqual(len(results), 2)
462 self.assertEqual(len(results), 2)
443 self.assertaction(results[0], 'wantframe')
463 self.assertaction(results[0], 'wantframe')
444 self.assertaction(results[1], 'error')
464 self.assertaction(results[1], 'error')
445 self.assertEqual(results[1][1], {
465 self.assertEqual(results[1][1], {
446 'message': b'malformed argument frame: partial argument name',
466 'message': b'malformed argument frame: partial argument name',
447 })
467 })
448
468
449 def testincompleteargumentvalue(self):
469 def testincompleteargumentvalue(self):
450 """Argument frame with incomplete value."""
470 """Argument frame with incomplete value."""
451 frames = [
471 frames = [
452 ffs(b'1 command-name have-args command'),
472 ffs(b'1 command-name have-args command'),
453 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
473 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
454 ]
474 ]
455
475
456 results = list(sendframes(makereactor(), frames))
476 results = list(sendframes(makereactor(), frames))
457 self.assertEqual(len(results), 2)
477 self.assertEqual(len(results), 2)
458 self.assertaction(results[0], 'wantframe')
478 self.assertaction(results[0], 'wantframe')
459 self.assertaction(results[1], 'error')
479 self.assertaction(results[1], 'error')
460 self.assertEqual(results[1][1], {
480 self.assertEqual(results[1][1], {
461 'message': b'malformed argument frame: partial argument value',
481 'message': b'malformed argument frame: partial argument value',
462 })
482 })
463
483
464 def testmissingcommanddataframe(self):
484 def testmissingcommanddataframe(self):
465 # The reactor doesn't currently handle partially received commands.
485 # The reactor doesn't currently handle partially received commands.
466 # So this test is failing to do anything with request 1.
486 # So this test is failing to do anything with request 1.
467 frames = [
487 frames = [
468 ffs(b'1 command-name have-data command1'),
488 ffs(b'1 command-name have-data command1'),
469 ffs(b'3 command-name eos command2'),
489 ffs(b'3 command-name eos command2'),
470 ]
490 ]
471 results = list(sendframes(makereactor(), frames))
491 results = list(sendframes(makereactor(), frames))
472 self.assertEqual(len(results), 2)
492 self.assertEqual(len(results), 2)
473 self.assertaction(results[0], 'wantframe')
493 self.assertaction(results[0], 'wantframe')
474 self.assertaction(results[1], 'runcommand')
494 self.assertaction(results[1], 'runcommand')
475
495
476 def testmissingcommanddataframeflags(self):
496 def testmissingcommanddataframeflags(self):
477 frames = [
497 frames = [
478 ffs(b'1 command-name have-data command1'),
498 ffs(b'1 command-name have-data command1'),
479 ffs(b'1 command-data 0 data'),
499 ffs(b'1 command-data 0 data'),
480 ]
500 ]
481 results = list(sendframes(makereactor(), frames))
501 results = list(sendframes(makereactor(), frames))
482 self.assertEqual(len(results), 2)
502 self.assertEqual(len(results), 2)
483 self.assertaction(results[0], 'wantframe')
503 self.assertaction(results[0], 'wantframe')
484 self.assertaction(results[1], 'error')
504 self.assertaction(results[1], 'error')
485 self.assertEqual(results[1][1], {
505 self.assertEqual(results[1][1], {
486 'message': b'command data frame without flags',
506 'message': b'command data frame without flags',
487 })
507 })
488
508
489 def testframefornonreceivingrequest(self):
509 def testframefornonreceivingrequest(self):
490 """Receiving a frame for a command that is not receiving is illegal."""
510 """Receiving a frame for a command that is not receiving is illegal."""
491 results = list(sendframes(makereactor(), [
511 results = list(sendframes(makereactor(), [
492 ffs(b'1 command-name eos command1'),
512 ffs(b'1 command-name eos command1'),
493 ffs(b'3 command-name have-data command3'),
513 ffs(b'3 command-name have-data command3'),
494 ffs(b'5 command-argument eoa ignored'),
514 ffs(b'5 command-argument eoa ignored'),
495 ]))
515 ]))
496 self.assertaction(results[2], 'error')
516 self.assertaction(results[2], 'error')
497 self.assertEqual(results[2][1], {
517 self.assertEqual(results[2][1], {
498 'message': b'received frame for request that is not receiving: 5',
518 'message': b'received frame for request that is not receiving: 5',
499 })
519 })
500
520
501 def testsimpleresponse(self):
521 def testsimpleresponse(self):
502 """Bytes response to command sends result frames."""
522 """Bytes response to command sends result frames."""
503 reactor = makereactor()
523 reactor = makereactor()
504 list(sendcommandframes(reactor, 1, b'mycommand', {}))
524 instream = framing.stream()
525 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
505
526
506 result = reactor.onbytesresponseready(1, b'response')
527 outstream = framing.stream()
528 result = reactor.onbytesresponseready(outstream, 1, b'response')
507 self.assertaction(result, 'sendframes')
529 self.assertaction(result, 'sendframes')
508 self.assertframesequal(result[1]['framegen'], [
530 self.assertframesequal(result[1]['framegen'], [
509 b'1 bytes-response eos response',
531 b'1 bytes-response eos response',
510 ])
532 ])
511
533
512 def testmultiframeresponse(self):
534 def testmultiframeresponse(self):
513 """Bytes response spanning multiple frames is handled."""
535 """Bytes response spanning multiple frames is handled."""
514 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
536 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
515 second = b'y' * 100
537 second = b'y' * 100
516
538
517 reactor = makereactor()
539 reactor = makereactor()
518 list(sendcommandframes(reactor, 1, b'mycommand', {}))
540 instream = framing.stream()
541 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
519
542
520 result = reactor.onbytesresponseready(1, first + second)
543 outstream = framing.stream()
544 result = reactor.onbytesresponseready(outstream, 1, first + second)
521 self.assertaction(result, 'sendframes')
545 self.assertaction(result, 'sendframes')
522 self.assertframesequal(result[1]['framegen'], [
546 self.assertframesequal(result[1]['framegen'], [
523 b'1 bytes-response continuation %s' % first,
547 b'1 bytes-response continuation %s' % first,
524 b'1 bytes-response eos %s' % second,
548 b'1 bytes-response eos %s' % second,
525 ])
549 ])
526
550
527 def testapplicationerror(self):
551 def testapplicationerror(self):
528 reactor = makereactor()
552 reactor = makereactor()
529 list(sendcommandframes(reactor, 1, b'mycommand', {}))
553 instream = framing.stream()
554 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
530
555
531 result = reactor.onapplicationerror(1, b'some message')
556 outstream = framing.stream()
557 result = reactor.onapplicationerror(outstream, 1, b'some message')
532 self.assertaction(result, 'sendframes')
558 self.assertaction(result, 'sendframes')
533 self.assertframesequal(result[1]['framegen'], [
559 self.assertframesequal(result[1]['framegen'], [
534 b'1 error-response application some message',
560 b'1 error-response application some message',
535 ])
561 ])
536
562
537 def test1commanddeferresponse(self):
563 def test1commanddeferresponse(self):
538 """Responses when in deferred output mode are delayed until EOF."""
564 """Responses when in deferred output mode are delayed until EOF."""
539 reactor = makereactor(deferoutput=True)
565 reactor = makereactor(deferoutput=True)
540 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
566 instream = framing.stream()
567 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
568 {}))
541 self.assertEqual(len(results), 1)
569 self.assertEqual(len(results), 1)
542 self.assertaction(results[0], 'runcommand')
570 self.assertaction(results[0], 'runcommand')
543
571
544 result = reactor.onbytesresponseready(1, b'response')
572 outstream = framing.stream()
573 result = reactor.onbytesresponseready(outstream, 1, b'response')
545 self.assertaction(result, 'noop')
574 self.assertaction(result, 'noop')
546 result = reactor.oninputeof()
575 result = reactor.oninputeof()
547 self.assertaction(result, 'sendframes')
576 self.assertaction(result, 'sendframes')
548 self.assertframesequal(result[1]['framegen'], [
577 self.assertframesequal(result[1]['framegen'], [
549 b'1 bytes-response eos response',
578 b'1 bytes-response eos response',
550 ])
579 ])
551
580
552 def testmultiplecommanddeferresponse(self):
581 def testmultiplecommanddeferresponse(self):
553 reactor = makereactor(deferoutput=True)
582 reactor = makereactor(deferoutput=True)
554 list(sendcommandframes(reactor, 1, b'command1', {}))
583 instream = framing.stream()
555 list(sendcommandframes(reactor, 3, b'command2', {}))
584 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
585 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
556
586
557 result = reactor.onbytesresponseready(1, b'response1')
587 outstream = framing.stream()
588 result = reactor.onbytesresponseready(outstream, 1, b'response1')
558 self.assertaction(result, 'noop')
589 self.assertaction(result, 'noop')
559 result = reactor.onbytesresponseready(3, b'response2')
590 result = reactor.onbytesresponseready(outstream, 3, b'response2')
560 self.assertaction(result, 'noop')
591 self.assertaction(result, 'noop')
561 result = reactor.oninputeof()
592 result = reactor.oninputeof()
562 self.assertaction(result, 'sendframes')
593 self.assertaction(result, 'sendframes')
563 self.assertframesequal(result[1]['framegen'], [
594 self.assertframesequal(result[1]['framegen'], [
564 b'1 bytes-response eos response1',
595 b'1 bytes-response eos response1',
565 b'3 bytes-response eos response2'
596 b'3 bytes-response eos response2'
566 ])
597 ])
567
598
568 def testrequestidtracking(self):
599 def testrequestidtracking(self):
569 reactor = makereactor(deferoutput=True)
600 reactor = makereactor(deferoutput=True)
570 list(sendcommandframes(reactor, 1, b'command1', {}))
601 instream = framing.stream()
571 list(sendcommandframes(reactor, 3, b'command2', {}))
602 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
572 list(sendcommandframes(reactor, 5, b'command3', {}))
603 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
604 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
573
605
574 # Register results for commands out of order.
606 # Register results for commands out of order.
575 reactor.onbytesresponseready(3, b'response3')
607 outstream = framing.stream()
576 reactor.onbytesresponseready(1, b'response1')
608 reactor.onbytesresponseready(outstream, 3, b'response3')
577 reactor.onbytesresponseready(5, b'response5')
609 reactor.onbytesresponseready(outstream, 1, b'response1')
610 reactor.onbytesresponseready(outstream, 5, b'response5')
578
611
579 result = reactor.oninputeof()
612 result = reactor.oninputeof()
580 self.assertaction(result, 'sendframes')
613 self.assertaction(result, 'sendframes')
581 self.assertframesequal(result[1]['framegen'], [
614 self.assertframesequal(result[1]['framegen'], [
582 b'3 bytes-response eos response3',
615 b'3 bytes-response eos response3',
583 b'1 bytes-response eos response1',
616 b'1 bytes-response eos response1',
584 b'5 bytes-response eos response5',
617 b'5 bytes-response eos response5',
585 ])
618 ])
586
619
587 def testduplicaterequestonactivecommand(self):
620 def testduplicaterequestonactivecommand(self):
588 """Receiving a request ID that matches a request that isn't finished."""
621 """Receiving a request ID that matches a request that isn't finished."""
589 reactor = makereactor()
622 reactor = makereactor()
590 list(sendcommandframes(reactor, 1, b'command1', {}))
623 stream = framing.stream()
591 results = list(sendcommandframes(reactor, 1, b'command1', {}))
624 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
625 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
592
626
593 self.assertaction(results[0], 'error')
627 self.assertaction(results[0], 'error')
594 self.assertEqual(results[0][1], {
628 self.assertEqual(results[0][1], {
595 'message': b'request with ID 1 is already active',
629 'message': b'request with ID 1 is already active',
596 })
630 })
597
631
598 def testduplicaterequestonactivecommandnosend(self):
632 def testduplicaterequestonactivecommandnosend(self):
599 """Same as above but we've registered a response but haven't sent it."""
633 """Same as above but we've registered a response but haven't sent it."""
600 reactor = makereactor()
634 reactor = makereactor()
601 list(sendcommandframes(reactor, 1, b'command1', {}))
635 instream = framing.stream()
602 reactor.onbytesresponseready(1, b'response')
636 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
637 outstream = framing.stream()
638 reactor.onbytesresponseready(outstream, 1, b'response')
603
639
604 # We've registered the response but haven't sent it. From the
640 # We've registered the response but haven't sent it. From the
605 # perspective of the reactor, the command is still active.
641 # perspective of the reactor, the command is still active.
606
642
607 results = list(sendcommandframes(reactor, 1, b'command1', {}))
643 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
608 self.assertaction(results[0], 'error')
644 self.assertaction(results[0], 'error')
609 self.assertEqual(results[0][1], {
645 self.assertEqual(results[0][1], {
610 'message': b'request with ID 1 is already active',
646 'message': b'request with ID 1 is already active',
611 })
647 })
612
648
613 def testduplicaterequestargumentframe(self):
649 def testduplicaterequestargumentframe(self):
614 """Variant on above except we sent an argument frame instead of name."""
650 """Variant on above except we sent an argument frame instead of name."""
615 reactor = makereactor()
651 reactor = makereactor()
616 list(sendcommandframes(reactor, 1, b'command', {}))
652 stream = framing.stream()
653 list(sendcommandframes(reactor, stream, 1, b'command', {}))
617 results = list(sendframes(reactor, [
654 results = list(sendframes(reactor, [
618 ffs(b'3 command-name have-args command'),
655 ffs(b'3 command-name have-args command'),
619 ffs(b'1 command-argument 0 ignored'),
656 ffs(b'1 command-argument 0 ignored'),
620 ]))
657 ]))
621 self.assertaction(results[0], 'wantframe')
658 self.assertaction(results[0], 'wantframe')
622 self.assertaction(results[1], 'error')
659 self.assertaction(results[1], 'error')
623 self.assertEqual(results[1][1], {
660 self.assertEqual(results[1][1], {
624 'message': 'received frame for request that is still active: 1',
661 'message': 'received frame for request that is still active: 1',
625 })
662 })
626
663
627 def testduplicaterequestaftersend(self):
664 def testduplicaterequestaftersend(self):
628 """We can use a duplicate request ID after we've sent the response."""
665 """We can use a duplicate request ID after we've sent the response."""
629 reactor = makereactor()
666 reactor = makereactor()
630 list(sendcommandframes(reactor, 1, b'command1', {}))
667 instream = framing.stream()
631 res = reactor.onbytesresponseready(1, b'response')
668 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
669 outstream = framing.stream()
670 res = reactor.onbytesresponseready(outstream, 1, b'response')
632 list(res[1]['framegen'])
671 list(res[1]['framegen'])
633
672
634 results = list(sendcommandframes(reactor, 1, b'command1', {}))
673 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
635 self.assertaction(results[0], 'runcommand')
674 self.assertaction(results[0], 'runcommand')
636
675
637 if __name__ == '__main__':
676 if __name__ == '__main__':
638 import silenttestrunner
677 import silenttestrunner
639 silenttestrunner.main(__name__)
678 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now