##// END OF EJS Templates
wireproto: define attr-based classes for representing frames...
Gregory Szorc -
r37079:884a0c16 default
parent child Browse files
Show More
@@ -1,658 +1,674
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import struct
14 import struct
15
15
16 from .i18n import _
16 from .i18n import _
17 from .thirdparty import (
18 attr,
19 )
17 from . import (
20 from . import (
18 error,
21 error,
19 util,
22 util,
20 )
23 )
21
24
22 FRAME_HEADER_SIZE = 6
25 FRAME_HEADER_SIZE = 6
23 DEFAULT_MAX_FRAME_SIZE = 32768
26 DEFAULT_MAX_FRAME_SIZE = 32768
24
27
25 FRAME_TYPE_COMMAND_NAME = 0x01
28 FRAME_TYPE_COMMAND_NAME = 0x01
26 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
29 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
27 FRAME_TYPE_COMMAND_DATA = 0x03
30 FRAME_TYPE_COMMAND_DATA = 0x03
28 FRAME_TYPE_BYTES_RESPONSE = 0x04
31 FRAME_TYPE_BYTES_RESPONSE = 0x04
29 FRAME_TYPE_ERROR_RESPONSE = 0x05
32 FRAME_TYPE_ERROR_RESPONSE = 0x05
30 FRAME_TYPE_TEXT_OUTPUT = 0x06
33 FRAME_TYPE_TEXT_OUTPUT = 0x06
31
34
32 FRAME_TYPES = {
35 FRAME_TYPES = {
33 b'command-name': FRAME_TYPE_COMMAND_NAME,
36 b'command-name': FRAME_TYPE_COMMAND_NAME,
34 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
37 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
35 b'command-data': FRAME_TYPE_COMMAND_DATA,
38 b'command-data': FRAME_TYPE_COMMAND_DATA,
36 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
39 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
37 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
40 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
38 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
41 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
39 }
42 }
40
43
41 FLAG_COMMAND_NAME_EOS = 0x01
44 FLAG_COMMAND_NAME_EOS = 0x01
42 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
45 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
43 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
46 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
44
47
45 FLAGS_COMMAND = {
48 FLAGS_COMMAND = {
46 b'eos': FLAG_COMMAND_NAME_EOS,
49 b'eos': FLAG_COMMAND_NAME_EOS,
47 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
50 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
48 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
51 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
49 }
52 }
50
53
51 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
54 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
52 FLAG_COMMAND_ARGUMENT_EOA = 0x02
55 FLAG_COMMAND_ARGUMENT_EOA = 0x02
53
56
54 FLAGS_COMMAND_ARGUMENT = {
57 FLAGS_COMMAND_ARGUMENT = {
55 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
58 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
56 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
59 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
57 }
60 }
58
61
59 FLAG_COMMAND_DATA_CONTINUATION = 0x01
62 FLAG_COMMAND_DATA_CONTINUATION = 0x01
60 FLAG_COMMAND_DATA_EOS = 0x02
63 FLAG_COMMAND_DATA_EOS = 0x02
61
64
62 FLAGS_COMMAND_DATA = {
65 FLAGS_COMMAND_DATA = {
63 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
66 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
64 b'eos': FLAG_COMMAND_DATA_EOS,
67 b'eos': FLAG_COMMAND_DATA_EOS,
65 }
68 }
66
69
67 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
70 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
68 FLAG_BYTES_RESPONSE_EOS = 0x02
71 FLAG_BYTES_RESPONSE_EOS = 0x02
69
72
70 FLAGS_BYTES_RESPONSE = {
73 FLAGS_BYTES_RESPONSE = {
71 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
74 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
72 b'eos': FLAG_BYTES_RESPONSE_EOS,
75 b'eos': FLAG_BYTES_RESPONSE_EOS,
73 }
76 }
74
77
75 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
78 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
76 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
79 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
77
80
78 FLAGS_ERROR_RESPONSE = {
81 FLAGS_ERROR_RESPONSE = {
79 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
82 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
80 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
83 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
81 }
84 }
82
85
83 # Maps frame types to their available flags.
86 # Maps frame types to their available flags.
84 FRAME_TYPE_FLAGS = {
87 FRAME_TYPE_FLAGS = {
85 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
88 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
86 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
89 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
87 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
90 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
88 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
91 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
89 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
92 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
90 FRAME_TYPE_TEXT_OUTPUT: {},
93 FRAME_TYPE_TEXT_OUTPUT: {},
91 }
94 }
92
95
93 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
96 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
94
97
98 @attr.s(slots=True)
99 class frameheader(object):
100 """Represents the data in a frame header."""
101
102 length = attr.ib()
103 requestid = attr.ib()
104 typeid = attr.ib()
105 flags = attr.ib()
106
107 @attr.s(slots=True)
108 class frame(object):
109 """Represents a parsed frame."""
110
111 requestid = attr.ib()
112 typeid = attr.ib()
113 flags = attr.ib()
114 payload = attr.ib()
115
95 def makeframe(requestid, frametype, frameflags, payload):
116 def makeframe(requestid, frametype, frameflags, payload):
96 """Assemble a frame into a byte array."""
117 """Assemble a frame into a byte array."""
97 # TODO assert size of payload.
118 # TODO assert size of payload.
98 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
119 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
99
120
100 # 24 bits length
121 # 24 bits length
101 # 16 bits request id
122 # 16 bits request id
102 # 4 bits type
123 # 4 bits type
103 # 4 bits flags
124 # 4 bits flags
104
125
105 l = struct.pack(r'<I', len(payload))
126 l = struct.pack(r'<I', len(payload))
106 frame[0:3] = l[0:3]
127 frame[0:3] = l[0:3]
107 struct.pack_into(r'<H', frame, 3, requestid)
128 struct.pack_into(r'<H', frame, 3, requestid)
108 frame[5] = (frametype << 4) | frameflags
129 frame[5] = (frametype << 4) | frameflags
109 frame[6:] = payload
130 frame[6:] = payload
110
131
111 return frame
132 return frame
112
133
113 def makeframefromhumanstring(s):
134 def makeframefromhumanstring(s):
114 """Create a frame from a human readable string
135 """Create a frame from a human readable string
115
136
116 Strings have the form:
137 Strings have the form:
117
138
118 <request-id> <type> <flags> <payload>
139 <request-id> <type> <flags> <payload>
119
140
120 This can be used by user-facing applications and tests for creating
141 This can be used by user-facing applications and tests for creating
121 frames easily without having to type out a bunch of constants.
142 frames easily without having to type out a bunch of constants.
122
143
123 Request ID is an integer.
144 Request ID is an integer.
124
145
125 Frame type and flags can be specified by integer or named constant.
146 Frame type and flags can be specified by integer or named constant.
126
147
127 Flags can be delimited by `|` to bitwise OR them together.
148 Flags can be delimited by `|` to bitwise OR them together.
128 """
149 """
129 requestid, frametype, frameflags, payload = s.split(b' ', 3)
150 requestid, frametype, frameflags, payload = s.split(b' ', 3)
130
151
131 requestid = int(requestid)
152 requestid = int(requestid)
132
153
133 if frametype in FRAME_TYPES:
154 if frametype in FRAME_TYPES:
134 frametype = FRAME_TYPES[frametype]
155 frametype = FRAME_TYPES[frametype]
135 else:
156 else:
136 frametype = int(frametype)
157 frametype = int(frametype)
137
158
138 finalflags = 0
159 finalflags = 0
139 validflags = FRAME_TYPE_FLAGS[frametype]
160 validflags = FRAME_TYPE_FLAGS[frametype]
140 for flag in frameflags.split(b'|'):
161 for flag in frameflags.split(b'|'):
141 if flag in validflags:
162 if flag in validflags:
142 finalflags |= validflags[flag]
163 finalflags |= validflags[flag]
143 else:
164 else:
144 finalflags |= int(flag)
165 finalflags |= int(flag)
145
166
146 payload = util.unescapestr(payload)
167 payload = util.unescapestr(payload)
147
168
148 return makeframe(requestid, frametype, finalflags, payload)
169 return makeframe(requestid, frametype, finalflags, payload)
149
170
150 def parseheader(data):
171 def parseheader(data):
151 """Parse a unified framing protocol frame header from a buffer.
172 """Parse a unified framing protocol frame header from a buffer.
152
173
153 The header is expected to be in the buffer at offset 0 and the
174 The header is expected to be in the buffer at offset 0 and the
154 buffer is expected to be large enough to hold a full header.
175 buffer is expected to be large enough to hold a full header.
155 """
176 """
156 # 24 bits payload length (little endian)
177 # 24 bits payload length (little endian)
157 # 4 bits frame type
178 # 4 bits frame type
158 # 4 bits frame flags
179 # 4 bits frame flags
159 # ... payload
180 # ... payload
160 framelength = data[0] + 256 * data[1] + 16384 * data[2]
181 framelength = data[0] + 256 * data[1] + 16384 * data[2]
161 requestid = struct.unpack_from(r'<H', data, 3)[0]
182 requestid = struct.unpack_from(r'<H', data, 3)[0]
162 typeflags = data[5]
183 typeflags = data[5]
163
184
164 frametype = (typeflags & 0xf0) >> 4
185 frametype = (typeflags & 0xf0) >> 4
165 frameflags = typeflags & 0x0f
186 frameflags = typeflags & 0x0f
166
187
167 return requestid, frametype, frameflags, framelength
188 return frameheader(framelength, requestid, frametype, frameflags)
168
189
169 def readframe(fh):
190 def readframe(fh):
170 """Read a unified framing protocol frame from a file object.
191 """Read a unified framing protocol frame from a file object.
171
192
172 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
193 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
173 None if no frame is available. May raise if a malformed frame is
194 None if no frame is available. May raise if a malformed frame is
174 seen.
195 seen.
175 """
196 """
176 header = bytearray(FRAME_HEADER_SIZE)
197 header = bytearray(FRAME_HEADER_SIZE)
177
198
178 readcount = fh.readinto(header)
199 readcount = fh.readinto(header)
179
200
180 if readcount == 0:
201 if readcount == 0:
181 return None
202 return None
182
203
183 if readcount != FRAME_HEADER_SIZE:
204 if readcount != FRAME_HEADER_SIZE:
184 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
205 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
185 (readcount, header))
206 (readcount, header))
186
207
187 requestid, frametype, frameflags, framelength = parseheader(header)
208 h = parseheader(header)
188
209
189 payload = fh.read(framelength)
210 payload = fh.read(h.length)
190 if len(payload) != framelength:
211 if len(payload) != h.length:
191 raise error.Abort(_('frame length error: expected %d; got %d') %
212 raise error.Abort(_('frame length error: expected %d; got %d') %
192 (framelength, len(payload)))
213 (h.length, len(payload)))
193
214
194 return requestid, frametype, frameflags, payload
215 return frame(h.requestid, h.typeid, h.flags, payload)
195
216
196 def createcommandframes(requestid, cmd, args, datafh=None):
217 def createcommandframes(requestid, cmd, args, datafh=None):
197 """Create frames necessary to transmit a request to run a command.
218 """Create frames necessary to transmit a request to run a command.
198
219
199 This is a generator of bytearrays. Each item represents a frame
220 This is a generator of bytearrays. Each item represents a frame
200 ready to be sent over the wire to a peer.
221 ready to be sent over the wire to a peer.
201 """
222 """
202 flags = 0
223 flags = 0
203 if args:
224 if args:
204 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
225 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
205 if datafh:
226 if datafh:
206 flags |= FLAG_COMMAND_NAME_HAVE_DATA
227 flags |= FLAG_COMMAND_NAME_HAVE_DATA
207
228
208 if not flags:
229 if not flags:
209 flags |= FLAG_COMMAND_NAME_EOS
230 flags |= FLAG_COMMAND_NAME_EOS
210
231
211 yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
232 yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
212
233
213 for i, k in enumerate(sorted(args)):
234 for i, k in enumerate(sorted(args)):
214 v = args[k]
235 v = args[k]
215 last = i == len(args) - 1
236 last = i == len(args) - 1
216
237
217 # TODO handle splitting of argument values across frames.
238 # TODO handle splitting of argument values across frames.
218 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
239 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
219 offset = 0
240 offset = 0
220 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
241 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
221 offset += ARGUMENT_FRAME_HEADER.size
242 offset += ARGUMENT_FRAME_HEADER.size
222 payload[offset:offset + len(k)] = k
243 payload[offset:offset + len(k)] = k
223 offset += len(k)
244 offset += len(k)
224 payload[offset:offset + len(v)] = v
245 payload[offset:offset + len(v)] = v
225
246
226 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
247 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
227 yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
248 yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
228
249
229 if datafh:
250 if datafh:
230 while True:
251 while True:
231 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
252 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
232
253
233 done = False
254 done = False
234 if len(data) == DEFAULT_MAX_FRAME_SIZE:
255 if len(data) == DEFAULT_MAX_FRAME_SIZE:
235 flags = FLAG_COMMAND_DATA_CONTINUATION
256 flags = FLAG_COMMAND_DATA_CONTINUATION
236 else:
257 else:
237 flags = FLAG_COMMAND_DATA_EOS
258 flags = FLAG_COMMAND_DATA_EOS
238 assert datafh.read(1) == b''
259 assert datafh.read(1) == b''
239 done = True
260 done = True
240
261
241 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
262 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
242
263
243 if done:
264 if done:
244 break
265 break
245
266
246 def createbytesresponseframesfrombytes(requestid, data,
267 def createbytesresponseframesfrombytes(requestid, data,
247 maxframesize=DEFAULT_MAX_FRAME_SIZE):
268 maxframesize=DEFAULT_MAX_FRAME_SIZE):
248 """Create a raw frame to send a bytes response from static bytes input.
269 """Create a raw frame to send a bytes response from static bytes input.
249
270
250 Returns a generator of bytearrays.
271 Returns a generator of bytearrays.
251 """
272 """
252
273
253 # Simple case of a single frame.
274 # Simple case of a single frame.
254 if len(data) <= maxframesize:
275 if len(data) <= maxframesize:
255 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
276 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
256 FLAG_BYTES_RESPONSE_EOS, data)
277 FLAG_BYTES_RESPONSE_EOS, data)
257 return
278 return
258
279
259 offset = 0
280 offset = 0
260 while True:
281 while True:
261 chunk = data[offset:offset + maxframesize]
282 chunk = data[offset:offset + maxframesize]
262 offset += len(chunk)
283 offset += len(chunk)
263 done = offset == len(data)
284 done = offset == len(data)
264
285
265 if done:
286 if done:
266 flags = FLAG_BYTES_RESPONSE_EOS
287 flags = FLAG_BYTES_RESPONSE_EOS
267 else:
288 else:
268 flags = FLAG_BYTES_RESPONSE_CONTINUATION
289 flags = FLAG_BYTES_RESPONSE_CONTINUATION
269
290
270 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
291 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
271
292
272 if done:
293 if done:
273 break
294 break
274
295
275 def createerrorframe(requestid, msg, protocol=False, application=False):
296 def createerrorframe(requestid, msg, protocol=False, application=False):
276 # TODO properly handle frame size limits.
297 # TODO properly handle frame size limits.
277 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
298 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
278
299
279 flags = 0
300 flags = 0
280 if protocol:
301 if protocol:
281 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
302 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
282 if application:
303 if application:
283 flags |= FLAG_ERROR_RESPONSE_APPLICATION
304 flags |= FLAG_ERROR_RESPONSE_APPLICATION
284
305
285 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
306 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
286
307
287 def createtextoutputframe(requestid, atoms):
308 def createtextoutputframe(requestid, atoms):
288 """Create a text output frame to render text to people.
309 """Create a text output frame to render text to people.
289
310
290 ``atoms`` is a 3-tuple of (formatting string, args, labels).
311 ``atoms`` is a 3-tuple of (formatting string, args, labels).
291
312
292 The formatting string contains ``%s`` tokens to be replaced by the
313 The formatting string contains ``%s`` tokens to be replaced by the
293 corresponding indexed entry in ``args``. ``labels`` is an iterable of
314 corresponding indexed entry in ``args``. ``labels`` is an iterable of
294 formatters to be applied at rendering time. In terms of the ``ui``
315 formatters to be applied at rendering time. In terms of the ``ui``
295 class, each atom corresponds to a ``ui.write()``.
316 class, each atom corresponds to a ``ui.write()``.
296 """
317 """
297 bytesleft = DEFAULT_MAX_FRAME_SIZE
318 bytesleft = DEFAULT_MAX_FRAME_SIZE
298 atomchunks = []
319 atomchunks = []
299
320
300 for (formatting, args, labels) in atoms:
321 for (formatting, args, labels) in atoms:
301 if len(args) > 255:
322 if len(args) > 255:
302 raise ValueError('cannot use more than 255 formatting arguments')
323 raise ValueError('cannot use more than 255 formatting arguments')
303 if len(labels) > 255:
324 if len(labels) > 255:
304 raise ValueError('cannot use more than 255 labels')
325 raise ValueError('cannot use more than 255 labels')
305
326
306 # TODO look for localstr, other types here?
327 # TODO look for localstr, other types here?
307
328
308 if not isinstance(formatting, bytes):
329 if not isinstance(formatting, bytes):
309 raise ValueError('must use bytes formatting strings')
330 raise ValueError('must use bytes formatting strings')
310 for arg in args:
331 for arg in args:
311 if not isinstance(arg, bytes):
332 if not isinstance(arg, bytes):
312 raise ValueError('must use bytes for arguments')
333 raise ValueError('must use bytes for arguments')
313 for label in labels:
334 for label in labels:
314 if not isinstance(label, bytes):
335 if not isinstance(label, bytes):
315 raise ValueError('must use bytes for labels')
336 raise ValueError('must use bytes for labels')
316
337
317 # Formatting string must be UTF-8.
338 # Formatting string must be UTF-8.
318 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
339 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
319
340
320 # Arguments must be UTF-8.
341 # Arguments must be UTF-8.
321 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
342 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
322
343
323 # Labels must be ASCII.
344 # Labels must be ASCII.
324 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
345 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
325 for l in labels]
346 for l in labels]
326
347
327 if len(formatting) > 65535:
348 if len(formatting) > 65535:
328 raise ValueError('formatting string cannot be longer than 64k')
349 raise ValueError('formatting string cannot be longer than 64k')
329
350
330 if any(len(a) > 65535 for a in args):
351 if any(len(a) > 65535 for a in args):
331 raise ValueError('argument string cannot be longer than 64k')
352 raise ValueError('argument string cannot be longer than 64k')
332
353
333 if any(len(l) > 255 for l in labels):
354 if any(len(l) > 255 for l in labels):
334 raise ValueError('label string cannot be longer than 255 bytes')
355 raise ValueError('label string cannot be longer than 255 bytes')
335
356
336 chunks = [
357 chunks = [
337 struct.pack(r'<H', len(formatting)),
358 struct.pack(r'<H', len(formatting)),
338 struct.pack(r'<BB', len(labels), len(args)),
359 struct.pack(r'<BB', len(labels), len(args)),
339 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
360 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
340 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
361 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
341 ]
362 ]
342 chunks.append(formatting)
363 chunks.append(formatting)
343 chunks.extend(labels)
364 chunks.extend(labels)
344 chunks.extend(args)
365 chunks.extend(args)
345
366
346 atom = b''.join(chunks)
367 atom = b''.join(chunks)
347 atomchunks.append(atom)
368 atomchunks.append(atom)
348 bytesleft -= len(atom)
369 bytesleft -= len(atom)
349
370
350 if bytesleft < 0:
371 if bytesleft < 0:
351 raise ValueError('cannot encode data in a single frame')
372 raise ValueError('cannot encode data in a single frame')
352
373
353 yield makeframe(requestid, FRAME_TYPE_TEXT_OUTPUT, 0, b''.join(atomchunks))
374 yield makeframe(requestid, FRAME_TYPE_TEXT_OUTPUT, 0, b''.join(atomchunks))
354
375
355 class serverreactor(object):
376 class serverreactor(object):
356 """Holds state of a server handling frame-based protocol requests.
377 """Holds state of a server handling frame-based protocol requests.
357
378
358 This class is the "brain" of the unified frame-based protocol server
379 This class is the "brain" of the unified frame-based protocol server
359 component. While the protocol is stateless from the perspective of
380 component. While the protocol is stateless from the perspective of
360 requests/commands, something needs to track which frames have been
381 requests/commands, something needs to track which frames have been
361 received, what frames to expect, etc. This class is that thing.
382 received, what frames to expect, etc. This class is that thing.
362
383
363 Instances are modeled as a state machine of sorts. Instances are also
384 Instances are modeled as a state machine of sorts. Instances are also
364 reactionary to external events. The point of this class is to encapsulate
385 reactionary to external events. The point of this class is to encapsulate
365 the state of the connection and the exchange of frames, not to perform
386 the state of the connection and the exchange of frames, not to perform
366 work. Instead, callers tell this class when something occurs, like a
387 work. Instead, callers tell this class when something occurs, like a
367 frame arriving. If that activity is worthy of a follow-up action (say
388 frame arriving. If that activity is worthy of a follow-up action (say
368 *run a command*), the return value of that handler will say so.
389 *run a command*), the return value of that handler will say so.
369
390
370 I/O and CPU intensive operations are purposefully delegated outside of
391 I/O and CPU intensive operations are purposefully delegated outside of
371 this class.
392 this class.
372
393
373 Consumers are expected to tell instances when events occur. They do so by
394 Consumers are expected to tell instances when events occur. They do so by
374 calling the various ``on*`` methods. These methods return a 2-tuple
395 calling the various ``on*`` methods. These methods return a 2-tuple
375 describing any follow-up action(s) to take. The first element is the
396 describing any follow-up action(s) to take. The first element is the
376 name of an action to perform. The second is a data structure (usually
397 name of an action to perform. The second is a data structure (usually
377 a dict) specific to that action that contains more information. e.g.
398 a dict) specific to that action that contains more information. e.g.
378 if the server wants to send frames back to the client, the data structure
399 if the server wants to send frames back to the client, the data structure
379 will contain a reference to those frames.
400 will contain a reference to those frames.
380
401
381 Valid actions that consumers can be instructed to take are:
402 Valid actions that consumers can be instructed to take are:
382
403
383 sendframes
404 sendframes
384 Indicates that frames should be sent to the client. The ``framegen``
405 Indicates that frames should be sent to the client. The ``framegen``
385 key contains a generator of frames that should be sent. The server
406 key contains a generator of frames that should be sent. The server
386 assumes that all frames are sent to the client.
407 assumes that all frames are sent to the client.
387
408
388 error
409 error
389 Indicates that an error occurred. Consumer should probably abort.
410 Indicates that an error occurred. Consumer should probably abort.
390
411
391 runcommand
412 runcommand
392 Indicates that the consumer should run a wire protocol command. Details
413 Indicates that the consumer should run a wire protocol command. Details
393 of the command to run are given in the data structure.
414 of the command to run are given in the data structure.
394
415
395 wantframe
416 wantframe
396 Indicates that nothing of interest happened and the server is waiting on
417 Indicates that nothing of interest happened and the server is waiting on
397 more frames from the client before anything interesting can be done.
418 more frames from the client before anything interesting can be done.
398
419
399 noop
420 noop
400 Indicates no additional action is required.
421 Indicates no additional action is required.
401
422
402 Known Issues
423 Known Issues
403 ------------
424 ------------
404
425
405 There are no limits to the number of partially received commands or their
426 There are no limits to the number of partially received commands or their
406 size. A malicious client could stream command request data and exhaust the
427 size. A malicious client could stream command request data and exhaust the
407 server's memory.
428 server's memory.
408
429
409 Partially received commands are not acted upon when end of input is
430 Partially received commands are not acted upon when end of input is
410 reached. Should the server error if it receives a partial request?
431 reached. Should the server error if it receives a partial request?
411 Should the client send a message to abort a partially transmitted request
432 Should the client send a message to abort a partially transmitted request
412 to facilitate graceful shutdown?
433 to facilitate graceful shutdown?
413
434
414 Active requests that haven't been responded to aren't tracked. This means
435 Active requests that haven't been responded to aren't tracked. This means
415 that if we receive a command and instruct its dispatch, another command
436 that if we receive a command and instruct its dispatch, another command
416 with its request ID can come in over the wire and there will be a race
437 with its request ID can come in over the wire and there will be a race
417 between who responds to what.
438 between who responds to what.
418 """
439 """
419
440
420 def __init__(self, deferoutput=False):
441 def __init__(self, deferoutput=False):
421 """Construct a new server reactor.
442 """Construct a new server reactor.
422
443
423 ``deferoutput`` can be used to indicate that no output frames should be
444 ``deferoutput`` can be used to indicate that no output frames should be
424 instructed to be sent until input has been exhausted. In this mode,
445 instructed to be sent until input has been exhausted. In this mode,
425 events that would normally generate output frames (such as a command
446 events that would normally generate output frames (such as a command
426 response being ready) will instead defer instructing the consumer to
447 response being ready) will instead defer instructing the consumer to
427 send those frames. This is useful for half-duplex transports where the
448 send those frames. This is useful for half-duplex transports where the
428 sender cannot receive until all data has been transmitted.
449 sender cannot receive until all data has been transmitted.
429 """
450 """
430 self._deferoutput = deferoutput
451 self._deferoutput = deferoutput
431 self._state = 'idle'
452 self._state = 'idle'
432 self._bufferedframegens = []
453 self._bufferedframegens = []
433 # request id -> dict of commands that are actively being received.
454 # request id -> dict of commands that are actively being received.
434 self._receivingcommands = {}
455 self._receivingcommands = {}
435
456
436 def onframerecv(self, requestid, frametype, frameflags, payload):
457 def onframerecv(self, frame):
437 """Process a frame that has been received off the wire.
458 """Process a frame that has been received off the wire.
438
459
439 Returns a dict with an ``action`` key that details what action,
460 Returns a dict with an ``action`` key that details what action,
440 if any, the consumer should take next.
461 if any, the consumer should take next.
441 """
462 """
442 handlers = {
463 handlers = {
443 'idle': self._onframeidle,
464 'idle': self._onframeidle,
444 'command-receiving': self._onframecommandreceiving,
465 'command-receiving': self._onframecommandreceiving,
445 'errored': self._onframeerrored,
466 'errored': self._onframeerrored,
446 }
467 }
447
468
448 meth = handlers.get(self._state)
469 meth = handlers.get(self._state)
449 if not meth:
470 if not meth:
450 raise error.ProgrammingError('unhandled state: %s' % self._state)
471 raise error.ProgrammingError('unhandled state: %s' % self._state)
451
472
452 return meth(requestid, frametype, frameflags, payload)
473 return meth(frame)
453
474
454 def onbytesresponseready(self, requestid, data):
475 def onbytesresponseready(self, requestid, data):
455 """Signal that a bytes response is ready to be sent to the client.
476 """Signal that a bytes response is ready to be sent to the client.
456
477
457 The raw bytes response is passed as an argument.
478 The raw bytes response is passed as an argument.
458 """
479 """
459 framegen = createbytesresponseframesfrombytes(requestid, data)
480 framegen = createbytesresponseframesfrombytes(requestid, data)
460
481
461 if self._deferoutput:
482 if self._deferoutput:
462 self._bufferedframegens.append(framegen)
483 self._bufferedframegens.append(framegen)
463 return 'noop', {}
484 return 'noop', {}
464 else:
485 else:
465 return 'sendframes', {
486 return 'sendframes', {
466 'framegen': framegen,
487 'framegen': framegen,
467 }
488 }
468
489
469 def oninputeof(self):
490 def oninputeof(self):
470 """Signals that end of input has been received.
491 """Signals that end of input has been received.
471
492
472 No more frames will be received. All pending activity should be
493 No more frames will be received. All pending activity should be
473 completed.
494 completed.
474 """
495 """
475 # TODO should we do anything about in-flight commands?
496 # TODO should we do anything about in-flight commands?
476
497
477 if not self._deferoutput or not self._bufferedframegens:
498 if not self._deferoutput or not self._bufferedframegens:
478 return 'noop', {}
499 return 'noop', {}
479
500
480 # If we buffered all our responses, emit those.
501 # If we buffered all our responses, emit those.
481 def makegen():
502 def makegen():
482 for gen in self._bufferedframegens:
503 for gen in self._bufferedframegens:
483 for frame in gen:
504 for frame in gen:
484 yield frame
505 yield frame
485
506
486 return 'sendframes', {
507 return 'sendframes', {
487 'framegen': makegen(),
508 'framegen': makegen(),
488 }
509 }
489
510
490 def onapplicationerror(self, requestid, msg):
511 def onapplicationerror(self, requestid, msg):
491 return 'sendframes', {
512 return 'sendframes', {
492 'framegen': createerrorframe(requestid, msg, application=True),
513 'framegen': createerrorframe(requestid, msg, application=True),
493 }
514 }
494
515
495 def _makeerrorresult(self, msg):
516 def _makeerrorresult(self, msg):
496 return 'error', {
517 return 'error', {
497 'message': msg,
518 'message': msg,
498 }
519 }
499
520
500 def _makeruncommandresult(self, requestid):
521 def _makeruncommandresult(self, requestid):
501 entry = self._receivingcommands[requestid]
522 entry = self._receivingcommands[requestid]
502 del self._receivingcommands[requestid]
523 del self._receivingcommands[requestid]
503
524
504 if self._receivingcommands:
525 if self._receivingcommands:
505 self._state = 'command-receiving'
526 self._state = 'command-receiving'
506 else:
527 else:
507 self._state = 'idle'
528 self._state = 'idle'
508
529
509 return 'runcommand', {
530 return 'runcommand', {
510 'requestid': requestid,
531 'requestid': requestid,
511 'command': entry['command'],
532 'command': entry['command'],
512 'args': entry['args'],
533 'args': entry['args'],
513 'data': entry['data'].getvalue() if entry['data'] else None,
534 'data': entry['data'].getvalue() if entry['data'] else None,
514 }
535 }
515
536
516 def _makewantframeresult(self):
537 def _makewantframeresult(self):
517 return 'wantframe', {
538 return 'wantframe', {
518 'state': self._state,
539 'state': self._state,
519 }
540 }
520
541
521 def _onframeidle(self, requestid, frametype, frameflags, payload):
542 def _onframeidle(self, frame):
522 # The only frame type that should be received in this state is a
543 # The only frame type that should be received in this state is a
523 # command request.
544 # command request.
524 if frametype != FRAME_TYPE_COMMAND_NAME:
545 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
525 self._state = 'errored'
546 self._state = 'errored'
526 return self._makeerrorresult(
547 return self._makeerrorresult(
527 _('expected command frame; got %d') % frametype)
548 _('expected command frame; got %d') % frame.typeid)
528
549
529 if requestid in self._receivingcommands:
550 if frame.requestid in self._receivingcommands:
530 self._state = 'errored'
551 self._state = 'errored'
531 return self._makeerrorresult(
552 return self._makeerrorresult(
532 _('request with ID %d already received') % requestid)
553 _('request with ID %d already received') % frame.requestid)
533
554
534 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
555 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
535 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
556 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
536
557
537 self._receivingcommands[requestid] = {
558 self._receivingcommands[frame.requestid] = {
538 'command': payload,
559 'command': frame.payload,
539 'args': {},
560 'args': {},
540 'data': None,
561 'data': None,
541 'expectingargs': expectingargs,
562 'expectingargs': expectingargs,
542 'expectingdata': expectingdata,
563 'expectingdata': expectingdata,
543 }
564 }
544
565
545 if frameflags & FLAG_COMMAND_NAME_EOS:
566 if frame.flags & FLAG_COMMAND_NAME_EOS:
546 return self._makeruncommandresult(requestid)
567 return self._makeruncommandresult(frame.requestid)
547
568
548 if expectingargs or expectingdata:
569 if expectingargs or expectingdata:
549 self._state = 'command-receiving'
570 self._state = 'command-receiving'
550 return self._makewantframeresult()
571 return self._makewantframeresult()
551 else:
572 else:
552 self._state = 'errored'
573 self._state = 'errored'
553 return self._makeerrorresult(_('missing frame flags on '
574 return self._makeerrorresult(_('missing frame flags on '
554 'command frame'))
575 'command frame'))
555
576
556 def _onframecommandreceiving(self, requestid, frametype, frameflags,
577 def _onframecommandreceiving(self, frame):
557 payload):
558 # It could be a new command request. Process it as such.
578 # It could be a new command request. Process it as such.
559 if frametype == FRAME_TYPE_COMMAND_NAME:
579 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
560 return self._onframeidle(requestid, frametype, frameflags, payload)
580 return self._onframeidle(frame)
561
581
562 # All other frames should be related to a command that is currently
582 # All other frames should be related to a command that is currently
563 # receiving.
583 # receiving.
564 if requestid not in self._receivingcommands:
584 if frame.requestid not in self._receivingcommands:
565 self._state = 'errored'
585 self._state = 'errored'
566 return self._makeerrorresult(
586 return self._makeerrorresult(
567 _('received frame for request that is not receiving: %d') %
587 _('received frame for request that is not receiving: %d') %
568 requestid)
588 frame.requestid)
569
589
570 entry = self._receivingcommands[requestid]
590 entry = self._receivingcommands[frame.requestid]
571
591
572 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
592 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
573 if not entry['expectingargs']:
593 if not entry['expectingargs']:
574 self._state = 'errored'
594 self._state = 'errored'
575 return self._makeerrorresult(_(
595 return self._makeerrorresult(_(
576 'received command argument frame for request that is not '
596 'received command argument frame for request that is not '
577 'expecting arguments: %d') % requestid)
597 'expecting arguments: %d') % frame.requestid)
578
598
579 return self._handlecommandargsframe(requestid, entry, frametype,
599 return self._handlecommandargsframe(frame, entry)
580 frameflags, payload)
581
600
582 elif frametype == FRAME_TYPE_COMMAND_DATA:
601 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
583 if not entry['expectingdata']:
602 if not entry['expectingdata']:
584 self._state = 'errored'
603 self._state = 'errored'
585 return self._makeerrorresult(_(
604 return self._makeerrorresult(_(
586 'received command data frame for request that is not '
605 'received command data frame for request that is not '
587 'expecting data: %d') % requestid)
606 'expecting data: %d') % frame.requestid)
588
607
589 if entry['data'] is None:
608 if entry['data'] is None:
590 entry['data'] = util.bytesio()
609 entry['data'] = util.bytesio()
591
610
592 return self._handlecommanddataframe(requestid, entry, frametype,
611 return self._handlecommanddataframe(frame, entry)
593 frameflags, payload)
594
612
595 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
613 def _handlecommandargsframe(self, frame, entry):
596 payload):
597 # The frame and state of command should have already been validated.
614 # The frame and state of command should have already been validated.
598 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
615 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
599
616
600 offset = 0
617 offset = 0
601 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
618 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
602 offset += ARGUMENT_FRAME_HEADER.size
619 offset += ARGUMENT_FRAME_HEADER.size
603
620
604 # The argument name MUST fit inside the frame.
621 # The argument name MUST fit inside the frame.
605 argname = bytes(payload[offset:offset + namesize])
622 argname = bytes(frame.payload[offset:offset + namesize])
606 offset += namesize
623 offset += namesize
607
624
608 if len(argname) != namesize:
625 if len(argname) != namesize:
609 self._state = 'errored'
626 self._state = 'errored'
610 return self._makeerrorresult(_('malformed argument frame: '
627 return self._makeerrorresult(_('malformed argument frame: '
611 'partial argument name'))
628 'partial argument name'))
612
629
613 argvalue = bytes(payload[offset:])
630 argvalue = bytes(frame.payload[offset:])
614
631
615 # Argument value spans multiple frames. Record our active state
632 # Argument value spans multiple frames. Record our active state
616 # and wait for the next frame.
633 # and wait for the next frame.
617 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
634 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
618 raise error.ProgrammingError('not yet implemented')
635 raise error.ProgrammingError('not yet implemented')
619
636
620 # Common case: the argument value is completely contained in this
637 # Common case: the argument value is completely contained in this
621 # frame.
638 # frame.
622
639
623 if len(argvalue) != valuesize:
640 if len(argvalue) != valuesize:
624 self._state = 'errored'
641 self._state = 'errored'
625 return self._makeerrorresult(_('malformed argument frame: '
642 return self._makeerrorresult(_('malformed argument frame: '
626 'partial argument value'))
643 'partial argument value'))
627
644
628 entry['args'][argname] = argvalue
645 entry['args'][argname] = argvalue
629
646
630 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
647 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
631 if entry['expectingdata']:
648 if entry['expectingdata']:
632 # TODO signal request to run a command once we don't
649 # TODO signal request to run a command once we don't
633 # buffer data frames.
650 # buffer data frames.
634 return self._makewantframeresult()
651 return self._makewantframeresult()
635 else:
652 else:
636 return self._makeruncommandresult(requestid)
653 return self._makeruncommandresult(frame.requestid)
637 else:
654 else:
638 return self._makewantframeresult()
655 return self._makewantframeresult()
639
656
640 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
657 def _handlecommanddataframe(self, frame, entry):
641 payload):
658 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
642 assert frametype == FRAME_TYPE_COMMAND_DATA
643
659
644 # TODO support streaming data instead of buffering it.
660 # TODO support streaming data instead of buffering it.
645 entry['data'].write(payload)
661 entry['data'].write(frame.payload)
646
662
647 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
663 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
648 return self._makewantframeresult()
664 return self._makewantframeresult()
649 elif frameflags & FLAG_COMMAND_DATA_EOS:
665 elif frame.flags & FLAG_COMMAND_DATA_EOS:
650 entry['data'].seek(0)
666 entry['data'].seek(0)
651 return self._makeruncommandresult(requestid)
667 return self._makeruncommandresult(frame.requestid)
652 else:
668 else:
653 self._state = 'errored'
669 self._state = 'errored'
654 return self._makeerrorresult(_('command data frame without '
670 return self._makeerrorresult(_('command data frame without '
655 'flags'))
671 'flags'))
656
672
657 def _onframeerrored(self, requestid, frametype, frameflags, payload):
673 def _onframeerrored(self, frame):
658 return self._makeerrorresult(_('server already errored'))
674 return self._makeerrorresult(_('server already errored'))
@@ -1,1043 +1,1042
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 encoding,
16 encoding,
17 error,
17 error,
18 hook,
18 hook,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 wireprotoframing,
22 wireprotoframing,
23 wireprototypes,
23 wireprototypes,
24 )
24 )
25
25
26 stringio = util.stringio
26 stringio = util.stringio
27
27
28 urlerr = util.urlerr
28 urlerr = util.urlerr
29 urlreq = util.urlreq
29 urlreq = util.urlreq
30
30
31 HTTP_OK = 200
31 HTTP_OK = 200
32
32
33 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE = 'application/mercurial-0.1'
34 HGTYPE2 = 'application/mercurial-0.2'
34 HGTYPE2 = 'application/mercurial-0.2'
35 HGERRTYPE = 'application/hg-error'
35 HGERRTYPE = 'application/hg-error'
36 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
36 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
37
37
38 HTTPV2 = wireprototypes.HTTPV2
38 HTTPV2 = wireprototypes.HTTPV2
39 SSHV1 = wireprototypes.SSHV1
39 SSHV1 = wireprototypes.SSHV1
40 SSHV2 = wireprototypes.SSHV2
40 SSHV2 = wireprototypes.SSHV2
41
41
42 def decodevaluefromheaders(req, headerprefix):
42 def decodevaluefromheaders(req, headerprefix):
43 """Decode a long value from multiple HTTP request headers.
43 """Decode a long value from multiple HTTP request headers.
44
44
45 Returns the value as a bytes, not a str.
45 Returns the value as a bytes, not a str.
46 """
46 """
47 chunks = []
47 chunks = []
48 i = 1
48 i = 1
49 while True:
49 while True:
50 v = req.headers.get(b'%s-%d' % (headerprefix, i))
50 v = req.headers.get(b'%s-%d' % (headerprefix, i))
51 if v is None:
51 if v is None:
52 break
52 break
53 chunks.append(pycompat.bytesurl(v))
53 chunks.append(pycompat.bytesurl(v))
54 i += 1
54 i += 1
55
55
56 return ''.join(chunks)
56 return ''.join(chunks)
57
57
58 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
58 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
59 def __init__(self, req, ui, checkperm):
59 def __init__(self, req, ui, checkperm):
60 self._req = req
60 self._req = req
61 self._ui = ui
61 self._ui = ui
62 self._checkperm = checkperm
62 self._checkperm = checkperm
63
63
64 @property
64 @property
65 def name(self):
65 def name(self):
66 return 'http-v1'
66 return 'http-v1'
67
67
68 def getargs(self, args):
68 def getargs(self, args):
69 knownargs = self._args()
69 knownargs = self._args()
70 data = {}
70 data = {}
71 keys = args.split()
71 keys = args.split()
72 for k in keys:
72 for k in keys:
73 if k == '*':
73 if k == '*':
74 star = {}
74 star = {}
75 for key in knownargs.keys():
75 for key in knownargs.keys():
76 if key != 'cmd' and key not in keys:
76 if key != 'cmd' and key not in keys:
77 star[key] = knownargs[key][0]
77 star[key] = knownargs[key][0]
78 data['*'] = star
78 data['*'] = star
79 else:
79 else:
80 data[k] = knownargs[k][0]
80 data[k] = knownargs[k][0]
81 return [data[k] for k in keys]
81 return [data[k] for k in keys]
82
82
83 def _args(self):
83 def _args(self):
84 args = self._req.qsparams.asdictoflists()
84 args = self._req.qsparams.asdictoflists()
85 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
85 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
86 if postlen:
86 if postlen:
87 args.update(urlreq.parseqs(
87 args.update(urlreq.parseqs(
88 self._req.bodyfh.read(postlen), keep_blank_values=True))
88 self._req.bodyfh.read(postlen), keep_blank_values=True))
89 return args
89 return args
90
90
91 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
91 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
92 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
92 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
93 return args
93 return args
94
94
95 def forwardpayload(self, fp):
95 def forwardpayload(self, fp):
96 # Existing clients *always* send Content-Length.
96 # Existing clients *always* send Content-Length.
97 length = int(self._req.headers[b'Content-Length'])
97 length = int(self._req.headers[b'Content-Length'])
98
98
99 # If httppostargs is used, we need to read Content-Length
99 # If httppostargs is used, we need to read Content-Length
100 # minus the amount that was consumed by args.
100 # minus the amount that was consumed by args.
101 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
101 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
102 for s in util.filechunkiter(self._req.bodyfh, limit=length):
102 for s in util.filechunkiter(self._req.bodyfh, limit=length):
103 fp.write(s)
103 fp.write(s)
104
104
105 @contextlib.contextmanager
105 @contextlib.contextmanager
106 def mayberedirectstdio(self):
106 def mayberedirectstdio(self):
107 oldout = self._ui.fout
107 oldout = self._ui.fout
108 olderr = self._ui.ferr
108 olderr = self._ui.ferr
109
109
110 out = util.stringio()
110 out = util.stringio()
111
111
112 try:
112 try:
113 self._ui.fout = out
113 self._ui.fout = out
114 self._ui.ferr = out
114 self._ui.ferr = out
115 yield out
115 yield out
116 finally:
116 finally:
117 self._ui.fout = oldout
117 self._ui.fout = oldout
118 self._ui.ferr = olderr
118 self._ui.ferr = olderr
119
119
120 def client(self):
120 def client(self):
121 return 'remote:%s:%s:%s' % (
121 return 'remote:%s:%s:%s' % (
122 self._req.urlscheme,
122 self._req.urlscheme,
123 urlreq.quote(self._req.remotehost or ''),
123 urlreq.quote(self._req.remotehost or ''),
124 urlreq.quote(self._req.remoteuser or ''))
124 urlreq.quote(self._req.remoteuser or ''))
125
125
126 def addcapabilities(self, repo, caps):
126 def addcapabilities(self, repo, caps):
127 caps.append(b'batch')
127 caps.append(b'batch')
128
128
129 caps.append('httpheader=%d' %
129 caps.append('httpheader=%d' %
130 repo.ui.configint('server', 'maxhttpheaderlen'))
130 repo.ui.configint('server', 'maxhttpheaderlen'))
131 if repo.ui.configbool('experimental', 'httppostargs'):
131 if repo.ui.configbool('experimental', 'httppostargs'):
132 caps.append('httppostargs')
132 caps.append('httppostargs')
133
133
134 # FUTURE advertise 0.2rx once support is implemented
134 # FUTURE advertise 0.2rx once support is implemented
135 # FUTURE advertise minrx and mintx after consulting config option
135 # FUTURE advertise minrx and mintx after consulting config option
136 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
136 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
137
137
138 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
138 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
139 if compengines:
139 if compengines:
140 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
140 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
141 for e in compengines)
141 for e in compengines)
142 caps.append('compression=%s' % comptypes)
142 caps.append('compression=%s' % comptypes)
143
143
144 return caps
144 return caps
145
145
146 def checkperm(self, perm):
146 def checkperm(self, perm):
147 return self._checkperm(perm)
147 return self._checkperm(perm)
148
148
149 # This method exists mostly so that extensions like remotefilelog can
149 # This method exists mostly so that extensions like remotefilelog can
150 # disable a kludgey legacy method only over http. As of early 2018,
150 # disable a kludgey legacy method only over http. As of early 2018,
151 # there are no other known users, so with any luck we can discard this
151 # there are no other known users, so with any luck we can discard this
152 # hook if remotefilelog becomes a first-party extension.
152 # hook if remotefilelog becomes a first-party extension.
153 def iscmd(cmd):
153 def iscmd(cmd):
154 return cmd in wireproto.commands
154 return cmd in wireproto.commands
155
155
156 def handlewsgirequest(rctx, req, res, checkperm):
156 def handlewsgirequest(rctx, req, res, checkperm):
157 """Possibly process a wire protocol request.
157 """Possibly process a wire protocol request.
158
158
159 If the current request is a wire protocol request, the request is
159 If the current request is a wire protocol request, the request is
160 processed by this function.
160 processed by this function.
161
161
162 ``req`` is a ``parsedrequest`` instance.
162 ``req`` is a ``parsedrequest`` instance.
163 ``res`` is a ``wsgiresponse`` instance.
163 ``res`` is a ``wsgiresponse`` instance.
164
164
165 Returns a bool indicating if the request was serviced. If set, the caller
165 Returns a bool indicating if the request was serviced. If set, the caller
166 should stop processing the request, as a response has already been issued.
166 should stop processing the request, as a response has already been issued.
167 """
167 """
168 # Avoid cycle involving hg module.
168 # Avoid cycle involving hg module.
169 from .hgweb import common as hgwebcommon
169 from .hgweb import common as hgwebcommon
170
170
171 repo = rctx.repo
171 repo = rctx.repo
172
172
173 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
173 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
174 # string parameter. If it isn't present, this isn't a wire protocol
174 # string parameter. If it isn't present, this isn't a wire protocol
175 # request.
175 # request.
176 if 'cmd' not in req.qsparams:
176 if 'cmd' not in req.qsparams:
177 return False
177 return False
178
178
179 cmd = req.qsparams['cmd']
179 cmd = req.qsparams['cmd']
180
180
181 # The "cmd" request parameter is used by both the wire protocol and hgweb.
181 # The "cmd" request parameter is used by both the wire protocol and hgweb.
182 # While not all wire protocol commands are available for all transports,
182 # While not all wire protocol commands are available for all transports,
183 # if we see a "cmd" value that resembles a known wire protocol command, we
183 # if we see a "cmd" value that resembles a known wire protocol command, we
184 # route it to a protocol handler. This is better than routing possible
184 # route it to a protocol handler. This is better than routing possible
185 # wire protocol requests to hgweb because it prevents hgweb from using
185 # wire protocol requests to hgweb because it prevents hgweb from using
186 # known wire protocol commands and it is less confusing for machine
186 # known wire protocol commands and it is less confusing for machine
187 # clients.
187 # clients.
188 if not iscmd(cmd):
188 if not iscmd(cmd):
189 return False
189 return False
190
190
191 # The "cmd" query string argument is only valid on the root path of the
191 # The "cmd" query string argument is only valid on the root path of the
192 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
192 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
193 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
193 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
194 # in this case. We send an HTTP 404 for backwards compatibility reasons.
194 # in this case. We send an HTTP 404 for backwards compatibility reasons.
195 if req.dispatchpath:
195 if req.dispatchpath:
196 res.status = hgwebcommon.statusmessage(404)
196 res.status = hgwebcommon.statusmessage(404)
197 res.headers['Content-Type'] = HGTYPE
197 res.headers['Content-Type'] = HGTYPE
198 # TODO This is not a good response to issue for this request. This
198 # TODO This is not a good response to issue for this request. This
199 # is mostly for BC for now.
199 # is mostly for BC for now.
200 res.setbodybytes('0\n%s\n' % b'Not Found')
200 res.setbodybytes('0\n%s\n' % b'Not Found')
201 return True
201 return True
202
202
203 proto = httpv1protocolhandler(req, repo.ui,
203 proto = httpv1protocolhandler(req, repo.ui,
204 lambda perm: checkperm(rctx, req, perm))
204 lambda perm: checkperm(rctx, req, perm))
205
205
206 # The permissions checker should be the only thing that can raise an
206 # The permissions checker should be the only thing that can raise an
207 # ErrorResponse. It is kind of a layer violation to catch an hgweb
207 # ErrorResponse. It is kind of a layer violation to catch an hgweb
208 # exception here. So consider refactoring into a exception type that
208 # exception here. So consider refactoring into a exception type that
209 # is associated with the wire protocol.
209 # is associated with the wire protocol.
210 try:
210 try:
211 _callhttp(repo, req, res, proto, cmd)
211 _callhttp(repo, req, res, proto, cmd)
212 except hgwebcommon.ErrorResponse as e:
212 except hgwebcommon.ErrorResponse as e:
213 for k, v in e.headers:
213 for k, v in e.headers:
214 res.headers[k] = v
214 res.headers[k] = v
215 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
215 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
216 # TODO This response body assumes the failed command was
216 # TODO This response body assumes the failed command was
217 # "unbundle." That assumption is not always valid.
217 # "unbundle." That assumption is not always valid.
218 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
218 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
219
219
220 return True
220 return True
221
221
222 def handlewsgiapirequest(rctx, req, res, checkperm):
222 def handlewsgiapirequest(rctx, req, res, checkperm):
223 """Handle requests to /api/*."""
223 """Handle requests to /api/*."""
224 assert req.dispatchparts[0] == b'api'
224 assert req.dispatchparts[0] == b'api'
225
225
226 repo = rctx.repo
226 repo = rctx.repo
227
227
228 # This whole URL space is experimental for now. But we want to
228 # This whole URL space is experimental for now. But we want to
229 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
229 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
230 if not repo.ui.configbool('experimental', 'web.apiserver'):
230 if not repo.ui.configbool('experimental', 'web.apiserver'):
231 res.status = b'404 Not Found'
231 res.status = b'404 Not Found'
232 res.headers[b'Content-Type'] = b'text/plain'
232 res.headers[b'Content-Type'] = b'text/plain'
233 res.setbodybytes(_('Experimental API server endpoint not enabled'))
233 res.setbodybytes(_('Experimental API server endpoint not enabled'))
234 return
234 return
235
235
236 # The URL space is /api/<protocol>/*. The structure of URLs under varies
236 # The URL space is /api/<protocol>/*. The structure of URLs under varies
237 # by <protocol>.
237 # by <protocol>.
238
238
239 # Registered APIs are made available via config options of the name of
239 # Registered APIs are made available via config options of the name of
240 # the protocol.
240 # the protocol.
241 availableapis = set()
241 availableapis = set()
242 for k, v in API_HANDLERS.items():
242 for k, v in API_HANDLERS.items():
243 section, option = v['config']
243 section, option = v['config']
244 if repo.ui.configbool(section, option):
244 if repo.ui.configbool(section, option):
245 availableapis.add(k)
245 availableapis.add(k)
246
246
247 # Requests to /api/ list available APIs.
247 # Requests to /api/ list available APIs.
248 if req.dispatchparts == [b'api']:
248 if req.dispatchparts == [b'api']:
249 res.status = b'200 OK'
249 res.status = b'200 OK'
250 res.headers[b'Content-Type'] = b'text/plain'
250 res.headers[b'Content-Type'] = b'text/plain'
251 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
251 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
252 'one of the following:\n')]
252 'one of the following:\n')]
253 if availableapis:
253 if availableapis:
254 lines.extend(sorted(availableapis))
254 lines.extend(sorted(availableapis))
255 else:
255 else:
256 lines.append(_('(no available APIs)\n'))
256 lines.append(_('(no available APIs)\n'))
257 res.setbodybytes(b'\n'.join(lines))
257 res.setbodybytes(b'\n'.join(lines))
258 return
258 return
259
259
260 proto = req.dispatchparts[1]
260 proto = req.dispatchparts[1]
261
261
262 if proto not in API_HANDLERS:
262 if proto not in API_HANDLERS:
263 res.status = b'404 Not Found'
263 res.status = b'404 Not Found'
264 res.headers[b'Content-Type'] = b'text/plain'
264 res.headers[b'Content-Type'] = b'text/plain'
265 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
265 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
266 proto, b', '.join(sorted(availableapis))))
266 proto, b', '.join(sorted(availableapis))))
267 return
267 return
268
268
269 if proto not in availableapis:
269 if proto not in availableapis:
270 res.status = b'404 Not Found'
270 res.status = b'404 Not Found'
271 res.headers[b'Content-Type'] = b'text/plain'
271 res.headers[b'Content-Type'] = b'text/plain'
272 res.setbodybytes(_('API %s not enabled\n') % proto)
272 res.setbodybytes(_('API %s not enabled\n') % proto)
273 return
273 return
274
274
275 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
275 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
276 req.dispatchparts[2:])
276 req.dispatchparts[2:])
277
277
278 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
278 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
279 from .hgweb import common as hgwebcommon
279 from .hgweb import common as hgwebcommon
280
280
281 # URL space looks like: <permissions>/<command>, where <permission> can
281 # URL space looks like: <permissions>/<command>, where <permission> can
282 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
282 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
283
283
284 # Root URL does nothing meaningful... yet.
284 # Root URL does nothing meaningful... yet.
285 if not urlparts:
285 if not urlparts:
286 res.status = b'200 OK'
286 res.status = b'200 OK'
287 res.headers[b'Content-Type'] = b'text/plain'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('HTTP version 2 API handler'))
288 res.setbodybytes(_('HTTP version 2 API handler'))
289 return
289 return
290
290
291 if len(urlparts) == 1:
291 if len(urlparts) == 1:
292 res.status = b'404 Not Found'
292 res.status = b'404 Not Found'
293 res.headers[b'Content-Type'] = b'text/plain'
293 res.headers[b'Content-Type'] = b'text/plain'
294 res.setbodybytes(_('do not know how to process %s\n') %
294 res.setbodybytes(_('do not know how to process %s\n') %
295 req.dispatchpath)
295 req.dispatchpath)
296 return
296 return
297
297
298 permission, command = urlparts[0:2]
298 permission, command = urlparts[0:2]
299
299
300 if permission not in (b'ro', b'rw'):
300 if permission not in (b'ro', b'rw'):
301 res.status = b'404 Not Found'
301 res.status = b'404 Not Found'
302 res.headers[b'Content-Type'] = b'text/plain'
302 res.headers[b'Content-Type'] = b'text/plain'
303 res.setbodybytes(_('unknown permission: %s') % permission)
303 res.setbodybytes(_('unknown permission: %s') % permission)
304 return
304 return
305
305
306 if req.method != 'POST':
306 if req.method != 'POST':
307 res.status = b'405 Method Not Allowed'
307 res.status = b'405 Method Not Allowed'
308 res.headers[b'Allow'] = b'POST'
308 res.headers[b'Allow'] = b'POST'
309 res.setbodybytes(_('commands require POST requests'))
309 res.setbodybytes(_('commands require POST requests'))
310 return
310 return
311
311
312 # At some point we'll want to use our own API instead of recycling the
312 # At some point we'll want to use our own API instead of recycling the
313 # behavior of version 1 of the wire protocol...
313 # behavior of version 1 of the wire protocol...
314 # TODO return reasonable responses - not responses that overload the
314 # TODO return reasonable responses - not responses that overload the
315 # HTTP status line message for error reporting.
315 # HTTP status line message for error reporting.
316 try:
316 try:
317 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
317 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
318 except hgwebcommon.ErrorResponse as e:
318 except hgwebcommon.ErrorResponse as e:
319 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
319 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
320 for k, v in e.headers:
320 for k, v in e.headers:
321 res.headers[k] = v
321 res.headers[k] = v
322 res.setbodybytes('permission denied')
322 res.setbodybytes('permission denied')
323 return
323 return
324
324
325 # We have a special endpoint to reflect the request back at the client.
325 # We have a special endpoint to reflect the request back at the client.
326 if command == b'debugreflect':
326 if command == b'debugreflect':
327 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
327 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
328 return
328 return
329
329
330 # Extra commands that we handle that aren't really wire protocol
330 # Extra commands that we handle that aren't really wire protocol
331 # commands. Think extra hard before making this hackery available to
331 # commands. Think extra hard before making this hackery available to
332 # extension.
332 # extension.
333 extracommands = {'multirequest'}
333 extracommands = {'multirequest'}
334
334
335 if command not in wireproto.commands and command not in extracommands:
335 if command not in wireproto.commands and command not in extracommands:
336 res.status = b'404 Not Found'
336 res.status = b'404 Not Found'
337 res.headers[b'Content-Type'] = b'text/plain'
337 res.headers[b'Content-Type'] = b'text/plain'
338 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
338 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
339 return
339 return
340
340
341 repo = rctx.repo
341 repo = rctx.repo
342 ui = repo.ui
342 ui = repo.ui
343
343
344 proto = httpv2protocolhandler(req, ui)
344 proto = httpv2protocolhandler(req, ui)
345
345
346 if (not wireproto.commands.commandavailable(command, proto)
346 if (not wireproto.commands.commandavailable(command, proto)
347 and command not in extracommands):
347 and command not in extracommands):
348 res.status = b'404 Not Found'
348 res.status = b'404 Not Found'
349 res.headers[b'Content-Type'] = b'text/plain'
349 res.headers[b'Content-Type'] = b'text/plain'
350 res.setbodybytes(_('invalid wire protocol command: %s') % command)
350 res.setbodybytes(_('invalid wire protocol command: %s') % command)
351 return
351 return
352
352
353 if req.headers.get(b'Accept') != FRAMINGTYPE:
353 if req.headers.get(b'Accept') != FRAMINGTYPE:
354 res.status = b'406 Not Acceptable'
354 res.status = b'406 Not Acceptable'
355 res.headers[b'Content-Type'] = b'text/plain'
355 res.headers[b'Content-Type'] = b'text/plain'
356 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
356 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
357 % FRAMINGTYPE)
357 % FRAMINGTYPE)
358 return
358 return
359
359
360 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
360 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
361 res.status = b'415 Unsupported Media Type'
361 res.status = b'415 Unsupported Media Type'
362 # TODO we should send a response with appropriate media type,
362 # TODO we should send a response with appropriate media type,
363 # since client does Accept it.
363 # since client does Accept it.
364 res.headers[b'Content-Type'] = b'text/plain'
364 res.headers[b'Content-Type'] = b'text/plain'
365 res.setbodybytes(_('client MUST send Content-Type header with '
365 res.setbodybytes(_('client MUST send Content-Type header with '
366 'value: %s\n') % FRAMINGTYPE)
366 'value: %s\n') % FRAMINGTYPE)
367 return
367 return
368
368
369 _processhttpv2request(ui, repo, req, res, permission, command, proto)
369 _processhttpv2request(ui, repo, req, res, permission, command, proto)
370
370
371 def _processhttpv2reflectrequest(ui, repo, req, res):
371 def _processhttpv2reflectrequest(ui, repo, req, res):
372 """Reads unified frame protocol request and dumps out state to client.
372 """Reads unified frame protocol request and dumps out state to client.
373
373
374 This special endpoint can be used to help debug the wire protocol.
374 This special endpoint can be used to help debug the wire protocol.
375
375
376 Instead of routing the request through the normal dispatch mechanism,
376 Instead of routing the request through the normal dispatch mechanism,
377 we instead read all frames, decode them, and feed them into our state
377 we instead read all frames, decode them, and feed them into our state
378 tracker. We then dump the log of all that activity back out to the
378 tracker. We then dump the log of all that activity back out to the
379 client.
379 client.
380 """
380 """
381 import json
381 import json
382
382
383 # Reflection APIs have a history of being abused, accidentally disclosing
383 # Reflection APIs have a history of being abused, accidentally disclosing
384 # sensitive data, etc. So we have a config knob.
384 # sensitive data, etc. So we have a config knob.
385 if not ui.configbool('experimental', 'web.api.debugreflect'):
385 if not ui.configbool('experimental', 'web.api.debugreflect'):
386 res.status = b'404 Not Found'
386 res.status = b'404 Not Found'
387 res.headers[b'Content-Type'] = b'text/plain'
387 res.headers[b'Content-Type'] = b'text/plain'
388 res.setbodybytes(_('debugreflect service not available'))
388 res.setbodybytes(_('debugreflect service not available'))
389 return
389 return
390
390
391 # We assume we have a unified framing protocol request body.
391 # We assume we have a unified framing protocol request body.
392
392
393 reactor = wireprotoframing.serverreactor()
393 reactor = wireprotoframing.serverreactor()
394 states = []
394 states = []
395
395
396 while True:
396 while True:
397 frame = wireprotoframing.readframe(req.bodyfh)
397 frame = wireprotoframing.readframe(req.bodyfh)
398
398
399 if not frame:
399 if not frame:
400 states.append(b'received: <no frame>')
400 states.append(b'received: <no frame>')
401 break
401 break
402
402
403 requestid, frametype, frameflags, payload = frame
403 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
404 states.append(b'received: %d %d %d %s' % (frametype, frameflags,
404 frame.requestid,
405 requestid, payload))
405 frame.payload))
406
406
407 action, meta = reactor.onframerecv(requestid, frametype, frameflags,
407 action, meta = reactor.onframerecv(frame)
408 payload)
409 states.append(json.dumps((action, meta), sort_keys=True,
408 states.append(json.dumps((action, meta), sort_keys=True,
410 separators=(', ', ': ')))
409 separators=(', ', ': ')))
411
410
412 action, meta = reactor.oninputeof()
411 action, meta = reactor.oninputeof()
413 meta['action'] = action
412 meta['action'] = action
414 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
413 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
415
414
416 res.status = b'200 OK'
415 res.status = b'200 OK'
417 res.headers[b'Content-Type'] = b'text/plain'
416 res.headers[b'Content-Type'] = b'text/plain'
418 res.setbodybytes(b'\n'.join(states))
417 res.setbodybytes(b'\n'.join(states))
419
418
420 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
419 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
421 """Post-validation handler for HTTPv2 requests.
420 """Post-validation handler for HTTPv2 requests.
422
421
423 Called when the HTTP request contains unified frame-based protocol
422 Called when the HTTP request contains unified frame-based protocol
424 frames for evaluation.
423 frames for evaluation.
425 """
424 """
426 # TODO Some HTTP clients are full duplex and can receive data before
425 # TODO Some HTTP clients are full duplex and can receive data before
427 # the entire request is transmitted. Figure out a way to indicate support
426 # the entire request is transmitted. Figure out a way to indicate support
428 # for that so we can opt into full duplex mode.
427 # for that so we can opt into full duplex mode.
429 reactor = wireprotoframing.serverreactor(deferoutput=True)
428 reactor = wireprotoframing.serverreactor(deferoutput=True)
430 seencommand = False
429 seencommand = False
431
430
432 while True:
431 while True:
433 frame = wireprotoframing.readframe(req.bodyfh)
432 frame = wireprotoframing.readframe(req.bodyfh)
434 if not frame:
433 if not frame:
435 break
434 break
436
435
437 action, meta = reactor.onframerecv(*frame)
436 action, meta = reactor.onframerecv(frame)
438
437
439 if action == 'wantframe':
438 if action == 'wantframe':
440 # Need more data before we can do anything.
439 # Need more data before we can do anything.
441 continue
440 continue
442 elif action == 'runcommand':
441 elif action == 'runcommand':
443 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
442 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
444 reqcommand, reactor, meta,
443 reqcommand, reactor, meta,
445 issubsequent=seencommand)
444 issubsequent=seencommand)
446
445
447 if sentoutput:
446 if sentoutput:
448 return
447 return
449
448
450 seencommand = True
449 seencommand = True
451
450
452 elif action == 'error':
451 elif action == 'error':
453 # TODO define proper error mechanism.
452 # TODO define proper error mechanism.
454 res.status = b'200 OK'
453 res.status = b'200 OK'
455 res.headers[b'Content-Type'] = b'text/plain'
454 res.headers[b'Content-Type'] = b'text/plain'
456 res.setbodybytes(meta['message'] + b'\n')
455 res.setbodybytes(meta['message'] + b'\n')
457 return
456 return
458 else:
457 else:
459 raise error.ProgrammingError(
458 raise error.ProgrammingError(
460 'unhandled action from frame processor: %s' % action)
459 'unhandled action from frame processor: %s' % action)
461
460
462 action, meta = reactor.oninputeof()
461 action, meta = reactor.oninputeof()
463 if action == 'sendframes':
462 if action == 'sendframes':
464 # We assume we haven't started sending the response yet. If we're
463 # We assume we haven't started sending the response yet. If we're
465 # wrong, the response type will raise an exception.
464 # wrong, the response type will raise an exception.
466 res.status = b'200 OK'
465 res.status = b'200 OK'
467 res.headers[b'Content-Type'] = FRAMINGTYPE
466 res.headers[b'Content-Type'] = FRAMINGTYPE
468 res.setbodygen(meta['framegen'])
467 res.setbodygen(meta['framegen'])
469 elif action == 'noop':
468 elif action == 'noop':
470 pass
469 pass
471 else:
470 else:
472 raise error.ProgrammingError('unhandled action from frame processor: %s'
471 raise error.ProgrammingError('unhandled action from frame processor: %s'
473 % action)
472 % action)
474
473
475 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
474 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
476 command, issubsequent):
475 command, issubsequent):
477 """Dispatch a wire protocol command made from HTTPv2 requests.
476 """Dispatch a wire protocol command made from HTTPv2 requests.
478
477
479 The authenticated permission (``authedperm``) along with the original
478 The authenticated permission (``authedperm``) along with the original
480 command from the URL (``reqcommand``) are passed in.
479 command from the URL (``reqcommand``) are passed in.
481 """
480 """
482 # We already validated that the session has permissions to perform the
481 # We already validated that the session has permissions to perform the
483 # actions in ``authedperm``. In the unified frame protocol, the canonical
482 # actions in ``authedperm``. In the unified frame protocol, the canonical
484 # command to run is expressed in a frame. However, the URL also requested
483 # command to run is expressed in a frame. However, the URL also requested
485 # to run a specific command. We need to be careful that the command we
484 # to run a specific command. We need to be careful that the command we
486 # run doesn't have permissions requirements greater than what was granted
485 # run doesn't have permissions requirements greater than what was granted
487 # by ``authedperm``.
486 # by ``authedperm``.
488 #
487 #
489 # Our rule for this is we only allow one command per HTTP request and
488 # Our rule for this is we only allow one command per HTTP request and
490 # that command must match the command in the URL. However, we make
489 # that command must match the command in the URL. However, we make
491 # an exception for the ``multirequest`` URL. This URL is allowed to
490 # an exception for the ``multirequest`` URL. This URL is allowed to
492 # execute multiple commands. We double check permissions of each command
491 # execute multiple commands. We double check permissions of each command
493 # as it is invoked to ensure there is no privilege escalation.
492 # as it is invoked to ensure there is no privilege escalation.
494 # TODO consider allowing multiple commands to regular command URLs
493 # TODO consider allowing multiple commands to regular command URLs
495 # iff each command is the same.
494 # iff each command is the same.
496
495
497 proto = httpv2protocolhandler(req, ui, args=command['args'])
496 proto = httpv2protocolhandler(req, ui, args=command['args'])
498
497
499 if reqcommand == b'multirequest':
498 if reqcommand == b'multirequest':
500 if not wireproto.commands.commandavailable(command['command'], proto):
499 if not wireproto.commands.commandavailable(command['command'], proto):
501 # TODO proper error mechanism
500 # TODO proper error mechanism
502 res.status = b'200 OK'
501 res.status = b'200 OK'
503 res.headers[b'Content-Type'] = b'text/plain'
502 res.headers[b'Content-Type'] = b'text/plain'
504 res.setbodybytes(_('wire protocol command not available: %s') %
503 res.setbodybytes(_('wire protocol command not available: %s') %
505 command['command'])
504 command['command'])
506 return True
505 return True
507
506
508 assert authedperm in (b'ro', b'rw')
507 assert authedperm in (b'ro', b'rw')
509 wirecommand = wireproto.commands[command['command']]
508 wirecommand = wireproto.commands[command['command']]
510 assert wirecommand.permission in ('push', 'pull')
509 assert wirecommand.permission in ('push', 'pull')
511
510
512 if authedperm == b'ro' and wirecommand.permission != 'pull':
511 if authedperm == b'ro' and wirecommand.permission != 'pull':
513 # TODO proper error mechanism
512 # TODO proper error mechanism
514 res.status = b'403 Forbidden'
513 res.status = b'403 Forbidden'
515 res.headers[b'Content-Type'] = b'text/plain'
514 res.headers[b'Content-Type'] = b'text/plain'
516 res.setbodybytes(_('insufficient permissions to execute '
515 res.setbodybytes(_('insufficient permissions to execute '
517 'command: %s') % command['command'])
516 'command: %s') % command['command'])
518 return True
517 return True
519
518
520 # TODO should we also call checkperm() here? Maybe not if we're going
519 # TODO should we also call checkperm() here? Maybe not if we're going
521 # to overhaul that API. The granted scope from the URL check should
520 # to overhaul that API. The granted scope from the URL check should
522 # be good enough.
521 # be good enough.
523
522
524 else:
523 else:
525 # Don't allow multiple commands outside of ``multirequest`` URL.
524 # Don't allow multiple commands outside of ``multirequest`` URL.
526 if issubsequent:
525 if issubsequent:
527 # TODO proper error mechanism
526 # TODO proper error mechanism
528 res.status = b'200 OK'
527 res.status = b'200 OK'
529 res.headers[b'Content-Type'] = b'text/plain'
528 res.headers[b'Content-Type'] = b'text/plain'
530 res.setbodybytes(_('multiple commands cannot be issued to this '
529 res.setbodybytes(_('multiple commands cannot be issued to this '
531 'URL'))
530 'URL'))
532 return True
531 return True
533
532
534 if reqcommand != command['command']:
533 if reqcommand != command['command']:
535 # TODO define proper error mechanism
534 # TODO define proper error mechanism
536 res.status = b'200 OK'
535 res.status = b'200 OK'
537 res.headers[b'Content-Type'] = b'text/plain'
536 res.headers[b'Content-Type'] = b'text/plain'
538 res.setbodybytes(_('command in frame must match command in URL'))
537 res.setbodybytes(_('command in frame must match command in URL'))
539 return True
538 return True
540
539
541 rsp = wireproto.dispatch(repo, proto, command['command'])
540 rsp = wireproto.dispatch(repo, proto, command['command'])
542
541
543 res.status = b'200 OK'
542 res.status = b'200 OK'
544 res.headers[b'Content-Type'] = FRAMINGTYPE
543 res.headers[b'Content-Type'] = FRAMINGTYPE
545
544
546 if isinstance(rsp, wireprototypes.bytesresponse):
545 if isinstance(rsp, wireprototypes.bytesresponse):
547 action, meta = reactor.onbytesresponseready(command['requestid'],
546 action, meta = reactor.onbytesresponseready(command['requestid'],
548 rsp.data)
547 rsp.data)
549 else:
548 else:
550 action, meta = reactor.onapplicationerror(
549 action, meta = reactor.onapplicationerror(
551 _('unhandled response type from wire proto command'))
550 _('unhandled response type from wire proto command'))
552
551
553 if action == 'sendframes':
552 if action == 'sendframes':
554 res.setbodygen(meta['framegen'])
553 res.setbodygen(meta['framegen'])
555 return True
554 return True
556 elif action == 'noop':
555 elif action == 'noop':
557 pass
556 pass
558 else:
557 else:
559 raise error.ProgrammingError('unhandled event from reactor: %s' %
558 raise error.ProgrammingError('unhandled event from reactor: %s' %
560 action)
559 action)
561
560
562 # Maps API name to metadata so custom API can be registered.
561 # Maps API name to metadata so custom API can be registered.
563 API_HANDLERS = {
562 API_HANDLERS = {
564 HTTPV2: {
563 HTTPV2: {
565 'config': ('experimental', 'web.api.http-v2'),
564 'config': ('experimental', 'web.api.http-v2'),
566 'handler': _handlehttpv2request,
565 'handler': _handlehttpv2request,
567 },
566 },
568 }
567 }
569
568
570 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
569 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
571 def __init__(self, req, ui, args=None):
570 def __init__(self, req, ui, args=None):
572 self._req = req
571 self._req = req
573 self._ui = ui
572 self._ui = ui
574 self._args = args
573 self._args = args
575
574
576 @property
575 @property
577 def name(self):
576 def name(self):
578 return HTTPV2
577 return HTTPV2
579
578
580 def getargs(self, args):
579 def getargs(self, args):
581 data = {}
580 data = {}
582 for k in args.split():
581 for k in args.split():
583 if k == '*':
582 if k == '*':
584 raise NotImplementedError('do not support * args')
583 raise NotImplementedError('do not support * args')
585 else:
584 else:
586 data[k] = self._args[k]
585 data[k] = self._args[k]
587
586
588 return [data[k] for k in args.split()]
587 return [data[k] for k in args.split()]
589
588
590 def forwardpayload(self, fp):
589 def forwardpayload(self, fp):
591 raise NotImplementedError
590 raise NotImplementedError
592
591
593 @contextlib.contextmanager
592 @contextlib.contextmanager
594 def mayberedirectstdio(self):
593 def mayberedirectstdio(self):
595 raise NotImplementedError
594 raise NotImplementedError
596
595
597 def client(self):
596 def client(self):
598 raise NotImplementedError
597 raise NotImplementedError
599
598
600 def addcapabilities(self, repo, caps):
599 def addcapabilities(self, repo, caps):
601 return caps
600 return caps
602
601
603 def checkperm(self, perm):
602 def checkperm(self, perm):
604 raise NotImplementedError
603 raise NotImplementedError
605
604
606 def _httpresponsetype(ui, req, prefer_uncompressed):
605 def _httpresponsetype(ui, req, prefer_uncompressed):
607 """Determine the appropriate response type and compression settings.
606 """Determine the appropriate response type and compression settings.
608
607
609 Returns a tuple of (mediatype, compengine, engineopts).
608 Returns a tuple of (mediatype, compengine, engineopts).
610 """
609 """
611 # Determine the response media type and compression engine based
610 # Determine the response media type and compression engine based
612 # on the request parameters.
611 # on the request parameters.
613 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
612 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
614
613
615 if '0.2' in protocaps:
614 if '0.2' in protocaps:
616 # All clients are expected to support uncompressed data.
615 # All clients are expected to support uncompressed data.
617 if prefer_uncompressed:
616 if prefer_uncompressed:
618 return HGTYPE2, util._noopengine(), {}
617 return HGTYPE2, util._noopengine(), {}
619
618
620 # Default as defined by wire protocol spec.
619 # Default as defined by wire protocol spec.
621 compformats = ['zlib', 'none']
620 compformats = ['zlib', 'none']
622 for cap in protocaps:
621 for cap in protocaps:
623 if cap.startswith('comp='):
622 if cap.startswith('comp='):
624 compformats = cap[5:].split(',')
623 compformats = cap[5:].split(',')
625 break
624 break
626
625
627 # Now find an agreed upon compression format.
626 # Now find an agreed upon compression format.
628 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
627 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
629 if engine.wireprotosupport().name in compformats:
628 if engine.wireprotosupport().name in compformats:
630 opts = {}
629 opts = {}
631 level = ui.configint('server', '%slevel' % engine.name())
630 level = ui.configint('server', '%slevel' % engine.name())
632 if level is not None:
631 if level is not None:
633 opts['level'] = level
632 opts['level'] = level
634
633
635 return HGTYPE2, engine, opts
634 return HGTYPE2, engine, opts
636
635
637 # No mutually supported compression format. Fall back to the
636 # No mutually supported compression format. Fall back to the
638 # legacy protocol.
637 # legacy protocol.
639
638
640 # Don't allow untrusted settings because disabling compression or
639 # Don't allow untrusted settings because disabling compression or
641 # setting a very high compression level could lead to flooding
640 # setting a very high compression level could lead to flooding
642 # the server's network or CPU.
641 # the server's network or CPU.
643 opts = {'level': ui.configint('server', 'zliblevel')}
642 opts = {'level': ui.configint('server', 'zliblevel')}
644 return HGTYPE, util.compengines['zlib'], opts
643 return HGTYPE, util.compengines['zlib'], opts
645
644
646 def _callhttp(repo, req, res, proto, cmd):
645 def _callhttp(repo, req, res, proto, cmd):
647 # Avoid cycle involving hg module.
646 # Avoid cycle involving hg module.
648 from .hgweb import common as hgwebcommon
647 from .hgweb import common as hgwebcommon
649
648
650 def genversion2(gen, engine, engineopts):
649 def genversion2(gen, engine, engineopts):
651 # application/mercurial-0.2 always sends a payload header
650 # application/mercurial-0.2 always sends a payload header
652 # identifying the compression engine.
651 # identifying the compression engine.
653 name = engine.wireprotosupport().name
652 name = engine.wireprotosupport().name
654 assert 0 < len(name) < 256
653 assert 0 < len(name) < 256
655 yield struct.pack('B', len(name))
654 yield struct.pack('B', len(name))
656 yield name
655 yield name
657
656
658 for chunk in gen:
657 for chunk in gen:
659 yield chunk
658 yield chunk
660
659
661 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
660 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
662 if code == HTTP_OK:
661 if code == HTTP_OK:
663 res.status = '200 Script output follows'
662 res.status = '200 Script output follows'
664 else:
663 else:
665 res.status = hgwebcommon.statusmessage(code)
664 res.status = hgwebcommon.statusmessage(code)
666
665
667 res.headers['Content-Type'] = contenttype
666 res.headers['Content-Type'] = contenttype
668
667
669 if bodybytes is not None:
668 if bodybytes is not None:
670 res.setbodybytes(bodybytes)
669 res.setbodybytes(bodybytes)
671 if bodygen is not None:
670 if bodygen is not None:
672 res.setbodygen(bodygen)
671 res.setbodygen(bodygen)
673
672
674 if not wireproto.commands.commandavailable(cmd, proto):
673 if not wireproto.commands.commandavailable(cmd, proto):
675 setresponse(HTTP_OK, HGERRTYPE,
674 setresponse(HTTP_OK, HGERRTYPE,
676 _('requested wire protocol command is not available over '
675 _('requested wire protocol command is not available over '
677 'HTTP'))
676 'HTTP'))
678 return
677 return
679
678
680 proto.checkperm(wireproto.commands[cmd].permission)
679 proto.checkperm(wireproto.commands[cmd].permission)
681
680
682 rsp = wireproto.dispatch(repo, proto, cmd)
681 rsp = wireproto.dispatch(repo, proto, cmd)
683
682
684 if isinstance(rsp, bytes):
683 if isinstance(rsp, bytes):
685 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
684 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
686 elif isinstance(rsp, wireprototypes.bytesresponse):
685 elif isinstance(rsp, wireprototypes.bytesresponse):
687 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
686 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
688 elif isinstance(rsp, wireprototypes.streamreslegacy):
687 elif isinstance(rsp, wireprototypes.streamreslegacy):
689 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
688 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
690 elif isinstance(rsp, wireprototypes.streamres):
689 elif isinstance(rsp, wireprototypes.streamres):
691 gen = rsp.gen
690 gen = rsp.gen
692
691
693 # This code for compression should not be streamres specific. It
692 # This code for compression should not be streamres specific. It
694 # is here because we only compress streamres at the moment.
693 # is here because we only compress streamres at the moment.
695 mediatype, engine, engineopts = _httpresponsetype(
694 mediatype, engine, engineopts = _httpresponsetype(
696 repo.ui, req, rsp.prefer_uncompressed)
695 repo.ui, req, rsp.prefer_uncompressed)
697 gen = engine.compressstream(gen, engineopts)
696 gen = engine.compressstream(gen, engineopts)
698
697
699 if mediatype == HGTYPE2:
698 if mediatype == HGTYPE2:
700 gen = genversion2(gen, engine, engineopts)
699 gen = genversion2(gen, engine, engineopts)
701
700
702 setresponse(HTTP_OK, mediatype, bodygen=gen)
701 setresponse(HTTP_OK, mediatype, bodygen=gen)
703 elif isinstance(rsp, wireprototypes.pushres):
702 elif isinstance(rsp, wireprototypes.pushres):
704 rsp = '%d\n%s' % (rsp.res, rsp.output)
703 rsp = '%d\n%s' % (rsp.res, rsp.output)
705 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
704 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
706 elif isinstance(rsp, wireprototypes.pusherr):
705 elif isinstance(rsp, wireprototypes.pusherr):
707 rsp = '0\n%s\n' % rsp.res
706 rsp = '0\n%s\n' % rsp.res
708 res.drain = True
707 res.drain = True
709 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
708 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
710 elif isinstance(rsp, wireprototypes.ooberror):
709 elif isinstance(rsp, wireprototypes.ooberror):
711 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
710 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
712 else:
711 else:
713 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
712 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
714
713
715 def _sshv1respondbytes(fout, value):
714 def _sshv1respondbytes(fout, value):
716 """Send a bytes response for protocol version 1."""
715 """Send a bytes response for protocol version 1."""
717 fout.write('%d\n' % len(value))
716 fout.write('%d\n' % len(value))
718 fout.write(value)
717 fout.write(value)
719 fout.flush()
718 fout.flush()
720
719
721 def _sshv1respondstream(fout, source):
720 def _sshv1respondstream(fout, source):
722 write = fout.write
721 write = fout.write
723 for chunk in source.gen:
722 for chunk in source.gen:
724 write(chunk)
723 write(chunk)
725 fout.flush()
724 fout.flush()
726
725
727 def _sshv1respondooberror(fout, ferr, rsp):
726 def _sshv1respondooberror(fout, ferr, rsp):
728 ferr.write(b'%s\n-\n' % rsp)
727 ferr.write(b'%s\n-\n' % rsp)
729 ferr.flush()
728 ferr.flush()
730 fout.write(b'\n')
729 fout.write(b'\n')
731 fout.flush()
730 fout.flush()
732
731
733 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
732 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
734 """Handler for requests services via version 1 of SSH protocol."""
733 """Handler for requests services via version 1 of SSH protocol."""
735 def __init__(self, ui, fin, fout):
734 def __init__(self, ui, fin, fout):
736 self._ui = ui
735 self._ui = ui
737 self._fin = fin
736 self._fin = fin
738 self._fout = fout
737 self._fout = fout
739
738
740 @property
739 @property
741 def name(self):
740 def name(self):
742 return wireprototypes.SSHV1
741 return wireprototypes.SSHV1
743
742
744 def getargs(self, args):
743 def getargs(self, args):
745 data = {}
744 data = {}
746 keys = args.split()
745 keys = args.split()
747 for n in xrange(len(keys)):
746 for n in xrange(len(keys)):
748 argline = self._fin.readline()[:-1]
747 argline = self._fin.readline()[:-1]
749 arg, l = argline.split()
748 arg, l = argline.split()
750 if arg not in keys:
749 if arg not in keys:
751 raise error.Abort(_("unexpected parameter %r") % arg)
750 raise error.Abort(_("unexpected parameter %r") % arg)
752 if arg == '*':
751 if arg == '*':
753 star = {}
752 star = {}
754 for k in xrange(int(l)):
753 for k in xrange(int(l)):
755 argline = self._fin.readline()[:-1]
754 argline = self._fin.readline()[:-1]
756 arg, l = argline.split()
755 arg, l = argline.split()
757 val = self._fin.read(int(l))
756 val = self._fin.read(int(l))
758 star[arg] = val
757 star[arg] = val
759 data['*'] = star
758 data['*'] = star
760 else:
759 else:
761 val = self._fin.read(int(l))
760 val = self._fin.read(int(l))
762 data[arg] = val
761 data[arg] = val
763 return [data[k] for k in keys]
762 return [data[k] for k in keys]
764
763
765 def forwardpayload(self, fpout):
764 def forwardpayload(self, fpout):
766 # We initially send an empty response. This tells the client it is
765 # We initially send an empty response. This tells the client it is
767 # OK to start sending data. If a client sees any other response, it
766 # OK to start sending data. If a client sees any other response, it
768 # interprets it as an error.
767 # interprets it as an error.
769 _sshv1respondbytes(self._fout, b'')
768 _sshv1respondbytes(self._fout, b'')
770
769
771 # The file is in the form:
770 # The file is in the form:
772 #
771 #
773 # <chunk size>\n<chunk>
772 # <chunk size>\n<chunk>
774 # ...
773 # ...
775 # 0\n
774 # 0\n
776 count = int(self._fin.readline())
775 count = int(self._fin.readline())
777 while count:
776 while count:
778 fpout.write(self._fin.read(count))
777 fpout.write(self._fin.read(count))
779 count = int(self._fin.readline())
778 count = int(self._fin.readline())
780
779
781 @contextlib.contextmanager
780 @contextlib.contextmanager
782 def mayberedirectstdio(self):
781 def mayberedirectstdio(self):
783 yield None
782 yield None
784
783
785 def client(self):
784 def client(self):
786 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
785 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
787 return 'remote:ssh:' + client
786 return 'remote:ssh:' + client
788
787
789 def addcapabilities(self, repo, caps):
788 def addcapabilities(self, repo, caps):
790 caps.append(b'batch')
789 caps.append(b'batch')
791 return caps
790 return caps
792
791
793 def checkperm(self, perm):
792 def checkperm(self, perm):
794 pass
793 pass
795
794
796 class sshv2protocolhandler(sshv1protocolhandler):
795 class sshv2protocolhandler(sshv1protocolhandler):
797 """Protocol handler for version 2 of the SSH protocol."""
796 """Protocol handler for version 2 of the SSH protocol."""
798
797
799 @property
798 @property
800 def name(self):
799 def name(self):
801 return wireprototypes.SSHV2
800 return wireprototypes.SSHV2
802
801
803 def _runsshserver(ui, repo, fin, fout, ev):
802 def _runsshserver(ui, repo, fin, fout, ev):
804 # This function operates like a state machine of sorts. The following
803 # This function operates like a state machine of sorts. The following
805 # states are defined:
804 # states are defined:
806 #
805 #
807 # protov1-serving
806 # protov1-serving
808 # Server is in protocol version 1 serving mode. Commands arrive on
807 # Server is in protocol version 1 serving mode. Commands arrive on
809 # new lines. These commands are processed in this state, one command
808 # new lines. These commands are processed in this state, one command
810 # after the other.
809 # after the other.
811 #
810 #
812 # protov2-serving
811 # protov2-serving
813 # Server is in protocol version 2 serving mode.
812 # Server is in protocol version 2 serving mode.
814 #
813 #
815 # upgrade-initial
814 # upgrade-initial
816 # The server is going to process an upgrade request.
815 # The server is going to process an upgrade request.
817 #
816 #
818 # upgrade-v2-filter-legacy-handshake
817 # upgrade-v2-filter-legacy-handshake
819 # The protocol is being upgraded to version 2. The server is expecting
818 # The protocol is being upgraded to version 2. The server is expecting
820 # the legacy handshake from version 1.
819 # the legacy handshake from version 1.
821 #
820 #
822 # upgrade-v2-finish
821 # upgrade-v2-finish
823 # The upgrade to version 2 of the protocol is imminent.
822 # The upgrade to version 2 of the protocol is imminent.
824 #
823 #
825 # shutdown
824 # shutdown
826 # The server is shutting down, possibly in reaction to a client event.
825 # The server is shutting down, possibly in reaction to a client event.
827 #
826 #
828 # And here are their transitions:
827 # And here are their transitions:
829 #
828 #
830 # protov1-serving -> shutdown
829 # protov1-serving -> shutdown
831 # When server receives an empty request or encounters another
830 # When server receives an empty request or encounters another
832 # error.
831 # error.
833 #
832 #
834 # protov1-serving -> upgrade-initial
833 # protov1-serving -> upgrade-initial
835 # An upgrade request line was seen.
834 # An upgrade request line was seen.
836 #
835 #
837 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
836 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
838 # Upgrade to version 2 in progress. Server is expecting to
837 # Upgrade to version 2 in progress. Server is expecting to
839 # process a legacy handshake.
838 # process a legacy handshake.
840 #
839 #
841 # upgrade-v2-filter-legacy-handshake -> shutdown
840 # upgrade-v2-filter-legacy-handshake -> shutdown
842 # Client did not fulfill upgrade handshake requirements.
841 # Client did not fulfill upgrade handshake requirements.
843 #
842 #
844 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
843 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
845 # Client fulfilled version 2 upgrade requirements. Finishing that
844 # Client fulfilled version 2 upgrade requirements. Finishing that
846 # upgrade.
845 # upgrade.
847 #
846 #
848 # upgrade-v2-finish -> protov2-serving
847 # upgrade-v2-finish -> protov2-serving
849 # Protocol upgrade to version 2 complete. Server can now speak protocol
848 # Protocol upgrade to version 2 complete. Server can now speak protocol
850 # version 2.
849 # version 2.
851 #
850 #
852 # protov2-serving -> protov1-serving
851 # protov2-serving -> protov1-serving
853 # Ths happens by default since protocol version 2 is the same as
852 # Ths happens by default since protocol version 2 is the same as
854 # version 1 except for the handshake.
853 # version 1 except for the handshake.
855
854
856 state = 'protov1-serving'
855 state = 'protov1-serving'
857 proto = sshv1protocolhandler(ui, fin, fout)
856 proto = sshv1protocolhandler(ui, fin, fout)
858 protoswitched = False
857 protoswitched = False
859
858
860 while not ev.is_set():
859 while not ev.is_set():
861 if state == 'protov1-serving':
860 if state == 'protov1-serving':
862 # Commands are issued on new lines.
861 # Commands are issued on new lines.
863 request = fin.readline()[:-1]
862 request = fin.readline()[:-1]
864
863
865 # Empty lines signal to terminate the connection.
864 # Empty lines signal to terminate the connection.
866 if not request:
865 if not request:
867 state = 'shutdown'
866 state = 'shutdown'
868 continue
867 continue
869
868
870 # It looks like a protocol upgrade request. Transition state to
869 # It looks like a protocol upgrade request. Transition state to
871 # handle it.
870 # handle it.
872 if request.startswith(b'upgrade '):
871 if request.startswith(b'upgrade '):
873 if protoswitched:
872 if protoswitched:
874 _sshv1respondooberror(fout, ui.ferr,
873 _sshv1respondooberror(fout, ui.ferr,
875 b'cannot upgrade protocols multiple '
874 b'cannot upgrade protocols multiple '
876 b'times')
875 b'times')
877 state = 'shutdown'
876 state = 'shutdown'
878 continue
877 continue
879
878
880 state = 'upgrade-initial'
879 state = 'upgrade-initial'
881 continue
880 continue
882
881
883 available = wireproto.commands.commandavailable(request, proto)
882 available = wireproto.commands.commandavailable(request, proto)
884
883
885 # This command isn't available. Send an empty response and go
884 # This command isn't available. Send an empty response and go
886 # back to waiting for a new command.
885 # back to waiting for a new command.
887 if not available:
886 if not available:
888 _sshv1respondbytes(fout, b'')
887 _sshv1respondbytes(fout, b'')
889 continue
888 continue
890
889
891 rsp = wireproto.dispatch(repo, proto, request)
890 rsp = wireproto.dispatch(repo, proto, request)
892
891
893 if isinstance(rsp, bytes):
892 if isinstance(rsp, bytes):
894 _sshv1respondbytes(fout, rsp)
893 _sshv1respondbytes(fout, rsp)
895 elif isinstance(rsp, wireprototypes.bytesresponse):
894 elif isinstance(rsp, wireprototypes.bytesresponse):
896 _sshv1respondbytes(fout, rsp.data)
895 _sshv1respondbytes(fout, rsp.data)
897 elif isinstance(rsp, wireprototypes.streamres):
896 elif isinstance(rsp, wireprototypes.streamres):
898 _sshv1respondstream(fout, rsp)
897 _sshv1respondstream(fout, rsp)
899 elif isinstance(rsp, wireprototypes.streamreslegacy):
898 elif isinstance(rsp, wireprototypes.streamreslegacy):
900 _sshv1respondstream(fout, rsp)
899 _sshv1respondstream(fout, rsp)
901 elif isinstance(rsp, wireprototypes.pushres):
900 elif isinstance(rsp, wireprototypes.pushres):
902 _sshv1respondbytes(fout, b'')
901 _sshv1respondbytes(fout, b'')
903 _sshv1respondbytes(fout, b'%d' % rsp.res)
902 _sshv1respondbytes(fout, b'%d' % rsp.res)
904 elif isinstance(rsp, wireprototypes.pusherr):
903 elif isinstance(rsp, wireprototypes.pusherr):
905 _sshv1respondbytes(fout, rsp.res)
904 _sshv1respondbytes(fout, rsp.res)
906 elif isinstance(rsp, wireprototypes.ooberror):
905 elif isinstance(rsp, wireprototypes.ooberror):
907 _sshv1respondooberror(fout, ui.ferr, rsp.message)
906 _sshv1respondooberror(fout, ui.ferr, rsp.message)
908 else:
907 else:
909 raise error.ProgrammingError('unhandled response type from '
908 raise error.ProgrammingError('unhandled response type from '
910 'wire protocol command: %s' % rsp)
909 'wire protocol command: %s' % rsp)
911
910
912 # For now, protocol version 2 serving just goes back to version 1.
911 # For now, protocol version 2 serving just goes back to version 1.
913 elif state == 'protov2-serving':
912 elif state == 'protov2-serving':
914 state = 'protov1-serving'
913 state = 'protov1-serving'
915 continue
914 continue
916
915
917 elif state == 'upgrade-initial':
916 elif state == 'upgrade-initial':
918 # We should never transition into this state if we've switched
917 # We should never transition into this state if we've switched
919 # protocols.
918 # protocols.
920 assert not protoswitched
919 assert not protoswitched
921 assert proto.name == wireprototypes.SSHV1
920 assert proto.name == wireprototypes.SSHV1
922
921
923 # Expected: upgrade <token> <capabilities>
922 # Expected: upgrade <token> <capabilities>
924 # If we get something else, the request is malformed. It could be
923 # If we get something else, the request is malformed. It could be
925 # from a future client that has altered the upgrade line content.
924 # from a future client that has altered the upgrade line content.
926 # We treat this as an unknown command.
925 # We treat this as an unknown command.
927 try:
926 try:
928 token, caps = request.split(b' ')[1:]
927 token, caps = request.split(b' ')[1:]
929 except ValueError:
928 except ValueError:
930 _sshv1respondbytes(fout, b'')
929 _sshv1respondbytes(fout, b'')
931 state = 'protov1-serving'
930 state = 'protov1-serving'
932 continue
931 continue
933
932
934 # Send empty response if we don't support upgrading protocols.
933 # Send empty response if we don't support upgrading protocols.
935 if not ui.configbool('experimental', 'sshserver.support-v2'):
934 if not ui.configbool('experimental', 'sshserver.support-v2'):
936 _sshv1respondbytes(fout, b'')
935 _sshv1respondbytes(fout, b'')
937 state = 'protov1-serving'
936 state = 'protov1-serving'
938 continue
937 continue
939
938
940 try:
939 try:
941 caps = urlreq.parseqs(caps)
940 caps = urlreq.parseqs(caps)
942 except ValueError:
941 except ValueError:
943 _sshv1respondbytes(fout, b'')
942 _sshv1respondbytes(fout, b'')
944 state = 'protov1-serving'
943 state = 'protov1-serving'
945 continue
944 continue
946
945
947 # We don't see an upgrade request to protocol version 2. Ignore
946 # We don't see an upgrade request to protocol version 2. Ignore
948 # the upgrade request.
947 # the upgrade request.
949 wantedprotos = caps.get(b'proto', [b''])[0]
948 wantedprotos = caps.get(b'proto', [b''])[0]
950 if SSHV2 not in wantedprotos:
949 if SSHV2 not in wantedprotos:
951 _sshv1respondbytes(fout, b'')
950 _sshv1respondbytes(fout, b'')
952 state = 'protov1-serving'
951 state = 'protov1-serving'
953 continue
952 continue
954
953
955 # It looks like we can honor this upgrade request to protocol 2.
954 # It looks like we can honor this upgrade request to protocol 2.
956 # Filter the rest of the handshake protocol request lines.
955 # Filter the rest of the handshake protocol request lines.
957 state = 'upgrade-v2-filter-legacy-handshake'
956 state = 'upgrade-v2-filter-legacy-handshake'
958 continue
957 continue
959
958
960 elif state == 'upgrade-v2-filter-legacy-handshake':
959 elif state == 'upgrade-v2-filter-legacy-handshake':
961 # Client should have sent legacy handshake after an ``upgrade``
960 # Client should have sent legacy handshake after an ``upgrade``
962 # request. Expected lines:
961 # request. Expected lines:
963 #
962 #
964 # hello
963 # hello
965 # between
964 # between
966 # pairs 81
965 # pairs 81
967 # 0000...-0000...
966 # 0000...-0000...
968
967
969 ok = True
968 ok = True
970 for line in (b'hello', b'between', b'pairs 81'):
969 for line in (b'hello', b'between', b'pairs 81'):
971 request = fin.readline()[:-1]
970 request = fin.readline()[:-1]
972
971
973 if request != line:
972 if request != line:
974 _sshv1respondooberror(fout, ui.ferr,
973 _sshv1respondooberror(fout, ui.ferr,
975 b'malformed handshake protocol: '
974 b'malformed handshake protocol: '
976 b'missing %s' % line)
975 b'missing %s' % line)
977 ok = False
976 ok = False
978 state = 'shutdown'
977 state = 'shutdown'
979 break
978 break
980
979
981 if not ok:
980 if not ok:
982 continue
981 continue
983
982
984 request = fin.read(81)
983 request = fin.read(81)
985 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
984 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
986 _sshv1respondooberror(fout, ui.ferr,
985 _sshv1respondooberror(fout, ui.ferr,
987 b'malformed handshake protocol: '
986 b'malformed handshake protocol: '
988 b'missing between argument value')
987 b'missing between argument value')
989 state = 'shutdown'
988 state = 'shutdown'
990 continue
989 continue
991
990
992 state = 'upgrade-v2-finish'
991 state = 'upgrade-v2-finish'
993 continue
992 continue
994
993
995 elif state == 'upgrade-v2-finish':
994 elif state == 'upgrade-v2-finish':
996 # Send the upgrade response.
995 # Send the upgrade response.
997 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
996 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
998 servercaps = wireproto.capabilities(repo, proto)
997 servercaps = wireproto.capabilities(repo, proto)
999 rsp = b'capabilities: %s' % servercaps.data
998 rsp = b'capabilities: %s' % servercaps.data
1000 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
999 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1001 fout.flush()
1000 fout.flush()
1002
1001
1003 proto = sshv2protocolhandler(ui, fin, fout)
1002 proto = sshv2protocolhandler(ui, fin, fout)
1004 protoswitched = True
1003 protoswitched = True
1005
1004
1006 state = 'protov2-serving'
1005 state = 'protov2-serving'
1007 continue
1006 continue
1008
1007
1009 elif state == 'shutdown':
1008 elif state == 'shutdown':
1010 break
1009 break
1011
1010
1012 else:
1011 else:
1013 raise error.ProgrammingError('unhandled ssh server state: %s' %
1012 raise error.ProgrammingError('unhandled ssh server state: %s' %
1014 state)
1013 state)
1015
1014
1016 class sshserver(object):
1015 class sshserver(object):
1017 def __init__(self, ui, repo, logfh=None):
1016 def __init__(self, ui, repo, logfh=None):
1018 self._ui = ui
1017 self._ui = ui
1019 self._repo = repo
1018 self._repo = repo
1020 self._fin = ui.fin
1019 self._fin = ui.fin
1021 self._fout = ui.fout
1020 self._fout = ui.fout
1022
1021
1023 # Log write I/O to stdout and stderr if configured.
1022 # Log write I/O to stdout and stderr if configured.
1024 if logfh:
1023 if logfh:
1025 self._fout = util.makeloggingfileobject(
1024 self._fout = util.makeloggingfileobject(
1026 logfh, self._fout, 'o', logdata=True)
1025 logfh, self._fout, 'o', logdata=True)
1027 ui.ferr = util.makeloggingfileobject(
1026 ui.ferr = util.makeloggingfileobject(
1028 logfh, ui.ferr, 'e', logdata=True)
1027 logfh, ui.ferr, 'e', logdata=True)
1029
1028
1030 hook.redirect(True)
1029 hook.redirect(True)
1031 ui.fout = repo.ui.fout = ui.ferr
1030 ui.fout = repo.ui.fout = ui.ferr
1032
1031
1033 # Prevent insertion/deletion of CRs
1032 # Prevent insertion/deletion of CRs
1034 util.setbinary(self._fin)
1033 util.setbinary(self._fin)
1035 util.setbinary(self._fout)
1034 util.setbinary(self._fout)
1036
1035
1037 def serve_forever(self):
1036 def serve_forever(self):
1038 self.serveuntil(threading.Event())
1037 self.serveuntil(threading.Event())
1039 sys.exit(0)
1038 sys.exit(0)
1040
1039
1041 def serveuntil(self, ev):
1040 def serveuntil(self, ev):
1042 """Serve until a threading.Event is set."""
1041 """Serve until a threading.Event is set."""
1043 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
1042 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,573 +1,576
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import unittest
3 import unittest
4
4
5 from mercurial import (
5 from mercurial import (
6 util,
6 util,
7 wireprotoframing as framing,
7 wireprotoframing as framing,
8 )
8 )
9
9
10 ffs = framing.makeframefromhumanstring
10 ffs = framing.makeframefromhumanstring
11
11
12 def makereactor(deferoutput=False):
12 def makereactor(deferoutput=False):
13 return framing.serverreactor(deferoutput=deferoutput)
13 return framing.serverreactor(deferoutput=deferoutput)
14
14
15 def sendframes(reactor, gen):
15 def sendframes(reactor, gen):
16 """Send a generator of frame bytearray to a reactor.
16 """Send a generator of frame bytearray to a reactor.
17
17
18 Emits a generator of results from ``onframerecv()`` calls.
18 Emits a generator of results from ``onframerecv()`` calls.
19 """
19 """
20 for frame in gen:
20 for frame in gen:
21 rid, frametype, frameflags, framelength = framing.parseheader(frame)
21 header = framing.parseheader(frame)
22 payload = frame[framing.FRAME_HEADER_SIZE:]
22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 assert len(payload) == framelength
23 assert len(payload) == header.length
24
24
25 yield reactor.onframerecv(rid, frametype, frameflags, payload)
25 yield reactor.onframerecv(framing.frame(header.requestid,
26 header.typeid,
27 header.flags,
28 payload))
26
29
27 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
28 """Generate frames to run a command and send them to a reactor."""
31 """Generate frames to run a command and send them to a reactor."""
29 return sendframes(reactor,
32 return sendframes(reactor,
30 framing.createcommandframes(rid, cmd, args, datafh))
33 framing.createcommandframes(rid, cmd, args, datafh))
31
34
32 class FrameTests(unittest.TestCase):
35 class FrameTests(unittest.TestCase):
33 def testdataexactframesize(self):
36 def testdataexactframesize(self):
34 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
37 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
35
38
36 frames = list(framing.createcommandframes(1, b'command', {}, data))
39 frames = list(framing.createcommandframes(1, b'command', {}, data))
37 self.assertEqual(frames, [
40 self.assertEqual(frames, [
38 ffs(b'1 command-name have-data command'),
41 ffs(b'1 command-name have-data command'),
39 ffs(b'1 command-data continuation %s' % data.getvalue()),
42 ffs(b'1 command-data continuation %s' % data.getvalue()),
40 ffs(b'1 command-data eos ')
43 ffs(b'1 command-data eos ')
41 ])
44 ])
42
45
43 def testdatamultipleframes(self):
46 def testdatamultipleframes(self):
44 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
47 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
45 frames = list(framing.createcommandframes(1, b'command', {}, data))
48 frames = list(framing.createcommandframes(1, b'command', {}, data))
46 self.assertEqual(frames, [
49 self.assertEqual(frames, [
47 ffs(b'1 command-name have-data command'),
50 ffs(b'1 command-name have-data command'),
48 ffs(b'1 command-data continuation %s' % (
51 ffs(b'1 command-data continuation %s' % (
49 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
52 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
50 ffs(b'1 command-data eos x'),
53 ffs(b'1 command-data eos x'),
51 ])
54 ])
52
55
53 def testargsanddata(self):
56 def testargsanddata(self):
54 data = util.bytesio(b'x' * 100)
57 data = util.bytesio(b'x' * 100)
55
58
56 frames = list(framing.createcommandframes(1, b'command', {
59 frames = list(framing.createcommandframes(1, b'command', {
57 b'key1': b'key1value',
60 b'key1': b'key1value',
58 b'key2': b'key2value',
61 b'key2': b'key2value',
59 b'key3': b'key3value',
62 b'key3': b'key3value',
60 }, data))
63 }, data))
61
64
62 self.assertEqual(frames, [
65 self.assertEqual(frames, [
63 ffs(b'1 command-name have-args|have-data command'),
66 ffs(b'1 command-name have-args|have-data command'),
64 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
67 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
65 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
68 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
66 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
69 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
67 ffs(b'1 command-data eos %s' % data.getvalue()),
70 ffs(b'1 command-data eos %s' % data.getvalue()),
68 ])
71 ])
69
72
70 def testtextoutputexcessiveargs(self):
73 def testtextoutputexcessiveargs(self):
71 """At most 255 formatting arguments are allowed."""
74 """At most 255 formatting arguments are allowed."""
72 with self.assertRaisesRegexp(ValueError,
75 with self.assertRaisesRegexp(ValueError,
73 'cannot use more than 255 formatting'):
76 'cannot use more than 255 formatting'):
74 args = [b'x' for i in range(256)]
77 args = [b'x' for i in range(256)]
75 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
78 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
76
79
77 def testtextoutputexcessivelabels(self):
80 def testtextoutputexcessivelabels(self):
78 """At most 255 labels are allowed."""
81 """At most 255 labels are allowed."""
79 with self.assertRaisesRegexp(ValueError,
82 with self.assertRaisesRegexp(ValueError,
80 'cannot use more than 255 labels'):
83 'cannot use more than 255 labels'):
81 labels = [b'l' for i in range(256)]
84 labels = [b'l' for i in range(256)]
82 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
85 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
83
86
84 def testtextoutputformattingstringtype(self):
87 def testtextoutputformattingstringtype(self):
85 """Formatting string must be bytes."""
88 """Formatting string must be bytes."""
86 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
89 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
87 list(framing.createtextoutputframe(1, [
90 list(framing.createtextoutputframe(1, [
88 (b'foo'.decode('ascii'), [], [])]))
91 (b'foo'.decode('ascii'), [], [])]))
89
92
90 def testtextoutputargumentbytes(self):
93 def testtextoutputargumentbytes(self):
91 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
94 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
92 list(framing.createtextoutputframe(1, [
95 list(framing.createtextoutputframe(1, [
93 (b'foo', [b'foo'.decode('ascii')], [])]))
96 (b'foo', [b'foo'.decode('ascii')], [])]))
94
97
95 def testtextoutputlabelbytes(self):
98 def testtextoutputlabelbytes(self):
96 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
99 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
97 list(framing.createtextoutputframe(1, [
100 list(framing.createtextoutputframe(1, [
98 (b'foo', [], [b'foo'.decode('ascii')])]))
101 (b'foo', [], [b'foo'.decode('ascii')])]))
99
102
100 def testtextoutputtoolongformatstring(self):
103 def testtextoutputtoolongformatstring(self):
101 with self.assertRaisesRegexp(ValueError,
104 with self.assertRaisesRegexp(ValueError,
102 'formatting string cannot be longer than'):
105 'formatting string cannot be longer than'):
103 list(framing.createtextoutputframe(1, [
106 list(framing.createtextoutputframe(1, [
104 (b'x' * 65536, [], [])]))
107 (b'x' * 65536, [], [])]))
105
108
106 def testtextoutputtoolongargumentstring(self):
109 def testtextoutputtoolongargumentstring(self):
107 with self.assertRaisesRegexp(ValueError,
110 with self.assertRaisesRegexp(ValueError,
108 'argument string cannot be longer than'):
111 'argument string cannot be longer than'):
109 list(framing.createtextoutputframe(1, [
112 list(framing.createtextoutputframe(1, [
110 (b'bleh', [b'x' * 65536], [])]))
113 (b'bleh', [b'x' * 65536], [])]))
111
114
112 def testtextoutputtoolonglabelstring(self):
115 def testtextoutputtoolonglabelstring(self):
113 with self.assertRaisesRegexp(ValueError,
116 with self.assertRaisesRegexp(ValueError,
114 'label string cannot be longer than'):
117 'label string cannot be longer than'):
115 list(framing.createtextoutputframe(1, [
118 list(framing.createtextoutputframe(1, [
116 (b'bleh', [], [b'x' * 65536])]))
119 (b'bleh', [], [b'x' * 65536])]))
117
120
118 def testtextoutput1simpleatom(self):
121 def testtextoutput1simpleatom(self):
119 val = list(framing.createtextoutputframe(1, [
122 val = list(framing.createtextoutputframe(1, [
120 (b'foo', [], [])]))
123 (b'foo', [], [])]))
121
124
122 self.assertEqual(val, [
125 self.assertEqual(val, [
123 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
126 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
124 ])
127 ])
125
128
126 def testtextoutput2simpleatoms(self):
129 def testtextoutput2simpleatoms(self):
127 val = list(framing.createtextoutputframe(1, [
130 val = list(framing.createtextoutputframe(1, [
128 (b'foo', [], []),
131 (b'foo', [], []),
129 (b'bar', [], []),
132 (b'bar', [], []),
130 ]))
133 ]))
131
134
132 self.assertEqual(val, [
135 self.assertEqual(val, [
133 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
136 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
134 ])
137 ])
135
138
136 def testtextoutput1arg(self):
139 def testtextoutput1arg(self):
137 val = list(framing.createtextoutputframe(1, [
140 val = list(framing.createtextoutputframe(1, [
138 (b'foo %s', [b'val1'], []),
141 (b'foo %s', [b'val1'], []),
139 ]))
142 ]))
140
143
141 self.assertEqual(val, [
144 self.assertEqual(val, [
142 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
145 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
143 ])
146 ])
144
147
145 def testtextoutput2arg(self):
148 def testtextoutput2arg(self):
146 val = list(framing.createtextoutputframe(1, [
149 val = list(framing.createtextoutputframe(1, [
147 (b'foo %s %s', [b'val', b'value'], []),
150 (b'foo %s %s', [b'val', b'value'], []),
148 ]))
151 ]))
149
152
150 self.assertEqual(val, [
153 self.assertEqual(val, [
151 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
154 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
152 br'foo %s %svalvalue'),
155 br'foo %s %svalvalue'),
153 ])
156 ])
154
157
155 def testtextoutput1label(self):
158 def testtextoutput1label(self):
156 val = list(framing.createtextoutputframe(1, [
159 val = list(framing.createtextoutputframe(1, [
157 (b'foo', [], [b'label']),
160 (b'foo', [], [b'label']),
158 ]))
161 ]))
159
162
160 self.assertEqual(val, [
163 self.assertEqual(val, [
161 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
164 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
162 ])
165 ])
163
166
164 def testargandlabel(self):
167 def testargandlabel(self):
165 val = list(framing.createtextoutputframe(1, [
168 val = list(framing.createtextoutputframe(1, [
166 (b'foo %s', [b'arg'], [b'label']),
169 (b'foo %s', [b'arg'], [b'label']),
167 ]))
170 ]))
168
171
169 self.assertEqual(val, [
172 self.assertEqual(val, [
170 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
173 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
171 ])
174 ])
172
175
173 class ServerReactorTests(unittest.TestCase):
176 class ServerReactorTests(unittest.TestCase):
174 def _sendsingleframe(self, reactor, s):
177 def _sendsingleframe(self, reactor, s):
175 results = list(sendframes(reactor, [ffs(s)]))
178 results = list(sendframes(reactor, [ffs(s)]))
176 self.assertEqual(len(results), 1)
179 self.assertEqual(len(results), 1)
177
180
178 return results[0]
181 return results[0]
179
182
180 def assertaction(self, res, expected):
183 def assertaction(self, res, expected):
181 self.assertIsInstance(res, tuple)
184 self.assertIsInstance(res, tuple)
182 self.assertEqual(len(res), 2)
185 self.assertEqual(len(res), 2)
183 self.assertIsInstance(res[1], dict)
186 self.assertIsInstance(res[1], dict)
184 self.assertEqual(res[0], expected)
187 self.assertEqual(res[0], expected)
185
188
186 def assertframesequal(self, frames, framestrings):
189 def assertframesequal(self, frames, framestrings):
187 expected = [ffs(s) for s in framestrings]
190 expected = [ffs(s) for s in framestrings]
188 self.assertEqual(list(frames), expected)
191 self.assertEqual(list(frames), expected)
189
192
190 def test1framecommand(self):
193 def test1framecommand(self):
191 """Receiving a command in a single frame yields request to run it."""
194 """Receiving a command in a single frame yields request to run it."""
192 reactor = makereactor()
195 reactor = makereactor()
193 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
196 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
194 self.assertEqual(len(results), 1)
197 self.assertEqual(len(results), 1)
195 self.assertaction(results[0], 'runcommand')
198 self.assertaction(results[0], 'runcommand')
196 self.assertEqual(results[0][1], {
199 self.assertEqual(results[0][1], {
197 'requestid': 1,
200 'requestid': 1,
198 'command': b'mycommand',
201 'command': b'mycommand',
199 'args': {},
202 'args': {},
200 'data': None,
203 'data': None,
201 })
204 })
202
205
203 result = reactor.oninputeof()
206 result = reactor.oninputeof()
204 self.assertaction(result, 'noop')
207 self.assertaction(result, 'noop')
205
208
206 def test1argument(self):
209 def test1argument(self):
207 reactor = makereactor()
210 reactor = makereactor()
208 results = list(sendcommandframes(reactor, 41, b'mycommand',
211 results = list(sendcommandframes(reactor, 41, b'mycommand',
209 {b'foo': b'bar'}))
212 {b'foo': b'bar'}))
210 self.assertEqual(len(results), 2)
213 self.assertEqual(len(results), 2)
211 self.assertaction(results[0], 'wantframe')
214 self.assertaction(results[0], 'wantframe')
212 self.assertaction(results[1], 'runcommand')
215 self.assertaction(results[1], 'runcommand')
213 self.assertEqual(results[1][1], {
216 self.assertEqual(results[1][1], {
214 'requestid': 41,
217 'requestid': 41,
215 'command': b'mycommand',
218 'command': b'mycommand',
216 'args': {b'foo': b'bar'},
219 'args': {b'foo': b'bar'},
217 'data': None,
220 'data': None,
218 })
221 })
219
222
220 def testmultiarguments(self):
223 def testmultiarguments(self):
221 reactor = makereactor()
224 reactor = makereactor()
222 results = list(sendcommandframes(reactor, 1, b'mycommand',
225 results = list(sendcommandframes(reactor, 1, b'mycommand',
223 {b'foo': b'bar', b'biz': b'baz'}))
226 {b'foo': b'bar', b'biz': b'baz'}))
224 self.assertEqual(len(results), 3)
227 self.assertEqual(len(results), 3)
225 self.assertaction(results[0], 'wantframe')
228 self.assertaction(results[0], 'wantframe')
226 self.assertaction(results[1], 'wantframe')
229 self.assertaction(results[1], 'wantframe')
227 self.assertaction(results[2], 'runcommand')
230 self.assertaction(results[2], 'runcommand')
228 self.assertEqual(results[2][1], {
231 self.assertEqual(results[2][1], {
229 'requestid': 1,
232 'requestid': 1,
230 'command': b'mycommand',
233 'command': b'mycommand',
231 'args': {b'foo': b'bar', b'biz': b'baz'},
234 'args': {b'foo': b'bar', b'biz': b'baz'},
232 'data': None,
235 'data': None,
233 })
236 })
234
237
235 def testsimplecommanddata(self):
238 def testsimplecommanddata(self):
236 reactor = makereactor()
239 reactor = makereactor()
237 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
240 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
238 util.bytesio(b'data!')))
241 util.bytesio(b'data!')))
239 self.assertEqual(len(results), 2)
242 self.assertEqual(len(results), 2)
240 self.assertaction(results[0], 'wantframe')
243 self.assertaction(results[0], 'wantframe')
241 self.assertaction(results[1], 'runcommand')
244 self.assertaction(results[1], 'runcommand')
242 self.assertEqual(results[1][1], {
245 self.assertEqual(results[1][1], {
243 'requestid': 1,
246 'requestid': 1,
244 'command': b'mycommand',
247 'command': b'mycommand',
245 'args': {},
248 'args': {},
246 'data': b'data!',
249 'data': b'data!',
247 })
250 })
248
251
249 def testmultipledataframes(self):
252 def testmultipledataframes(self):
250 frames = [
253 frames = [
251 ffs(b'1 command-name have-data mycommand'),
254 ffs(b'1 command-name have-data mycommand'),
252 ffs(b'1 command-data continuation data1'),
255 ffs(b'1 command-data continuation data1'),
253 ffs(b'1 command-data continuation data2'),
256 ffs(b'1 command-data continuation data2'),
254 ffs(b'1 command-data eos data3'),
257 ffs(b'1 command-data eos data3'),
255 ]
258 ]
256
259
257 reactor = makereactor()
260 reactor = makereactor()
258 results = list(sendframes(reactor, frames))
261 results = list(sendframes(reactor, frames))
259 self.assertEqual(len(results), 4)
262 self.assertEqual(len(results), 4)
260 for i in range(3):
263 for i in range(3):
261 self.assertaction(results[i], 'wantframe')
264 self.assertaction(results[i], 'wantframe')
262 self.assertaction(results[3], 'runcommand')
265 self.assertaction(results[3], 'runcommand')
263 self.assertEqual(results[3][1], {
266 self.assertEqual(results[3][1], {
264 'requestid': 1,
267 'requestid': 1,
265 'command': b'mycommand',
268 'command': b'mycommand',
266 'args': {},
269 'args': {},
267 'data': b'data1data2data3',
270 'data': b'data1data2data3',
268 })
271 })
269
272
270 def testargumentanddata(self):
273 def testargumentanddata(self):
271 frames = [
274 frames = [
272 ffs(b'1 command-name have-args|have-data command'),
275 ffs(b'1 command-name have-args|have-data command'),
273 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
276 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
274 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
277 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
275 ffs(b'1 command-data continuation value1'),
278 ffs(b'1 command-data continuation value1'),
276 ffs(b'1 command-data eos value2'),
279 ffs(b'1 command-data eos value2'),
277 ]
280 ]
278
281
279 reactor = makereactor()
282 reactor = makereactor()
280 results = list(sendframes(reactor, frames))
283 results = list(sendframes(reactor, frames))
281
284
282 self.assertaction(results[-1], 'runcommand')
285 self.assertaction(results[-1], 'runcommand')
283 self.assertEqual(results[-1][1], {
286 self.assertEqual(results[-1][1], {
284 'requestid': 1,
287 'requestid': 1,
285 'command': b'command',
288 'command': b'command',
286 'args': {
289 'args': {
287 b'key': b'val',
290 b'key': b'val',
288 b'foo': b'bar',
291 b'foo': b'bar',
289 },
292 },
290 'data': b'value1value2',
293 'data': b'value1value2',
291 })
294 })
292
295
293 def testunexpectedcommandargument(self):
296 def testunexpectedcommandargument(self):
294 """Command argument frame when not running a command is an error."""
297 """Command argument frame when not running a command is an error."""
295 result = self._sendsingleframe(makereactor(),
298 result = self._sendsingleframe(makereactor(),
296 b'1 command-argument 0 ignored')
299 b'1 command-argument 0 ignored')
297 self.assertaction(result, 'error')
300 self.assertaction(result, 'error')
298 self.assertEqual(result[1], {
301 self.assertEqual(result[1], {
299 'message': b'expected command frame; got 2',
302 'message': b'expected command frame; got 2',
300 })
303 })
301
304
302 def testunexpectedcommandargumentreceiving(self):
305 def testunexpectedcommandargumentreceiving(self):
303 """Same as above but the command is receiving."""
306 """Same as above but the command is receiving."""
304 results = list(sendframes(makereactor(), [
307 results = list(sendframes(makereactor(), [
305 ffs(b'1 command-name have-data command'),
308 ffs(b'1 command-name have-data command'),
306 ffs(b'1 command-argument eoa ignored'),
309 ffs(b'1 command-argument eoa ignored'),
307 ]))
310 ]))
308
311
309 self.assertaction(results[1], 'error')
312 self.assertaction(results[1], 'error')
310 self.assertEqual(results[1][1], {
313 self.assertEqual(results[1][1], {
311 'message': b'received command argument frame for request that is '
314 'message': b'received command argument frame for request that is '
312 b'not expecting arguments: 1',
315 b'not expecting arguments: 1',
313 })
316 })
314
317
315 def testunexpectedcommanddata(self):
318 def testunexpectedcommanddata(self):
316 """Command argument frame when not running a command is an error."""
319 """Command argument frame when not running a command is an error."""
317 result = self._sendsingleframe(makereactor(),
320 result = self._sendsingleframe(makereactor(),
318 b'1 command-data 0 ignored')
321 b'1 command-data 0 ignored')
319 self.assertaction(result, 'error')
322 self.assertaction(result, 'error')
320 self.assertEqual(result[1], {
323 self.assertEqual(result[1], {
321 'message': b'expected command frame; got 3',
324 'message': b'expected command frame; got 3',
322 })
325 })
323
326
324 def testunexpectedcommanddatareceiving(self):
327 def testunexpectedcommanddatareceiving(self):
325 """Same as above except the command is receiving."""
328 """Same as above except the command is receiving."""
326 results = list(sendframes(makereactor(), [
329 results = list(sendframes(makereactor(), [
327 ffs(b'1 command-name have-args command'),
330 ffs(b'1 command-name have-args command'),
328 ffs(b'1 command-data eos ignored'),
331 ffs(b'1 command-data eos ignored'),
329 ]))
332 ]))
330
333
331 self.assertaction(results[1], 'error')
334 self.assertaction(results[1], 'error')
332 self.assertEqual(results[1][1], {
335 self.assertEqual(results[1][1], {
333 'message': b'received command data frame for request that is not '
336 'message': b'received command data frame for request that is not '
334 b'expecting data: 1',
337 b'expecting data: 1',
335 })
338 })
336
339
337 def testmissingcommandframeflags(self):
340 def testmissingcommandframeflags(self):
338 """Command name frame must have flags set."""
341 """Command name frame must have flags set."""
339 result = self._sendsingleframe(makereactor(),
342 result = self._sendsingleframe(makereactor(),
340 b'1 command-name 0 command')
343 b'1 command-name 0 command')
341 self.assertaction(result, 'error')
344 self.assertaction(result, 'error')
342 self.assertEqual(result[1], {
345 self.assertEqual(result[1], {
343 'message': b'missing frame flags on command frame',
346 'message': b'missing frame flags on command frame',
344 })
347 })
345
348
346 def testconflictingrequestid(self):
349 def testconflictingrequestid(self):
347 """Multiple fully serviced commands with same request ID is allowed."""
350 """Multiple fully serviced commands with same request ID is allowed."""
348 results = list(sendframes(makereactor(), [
351 results = list(sendframes(makereactor(), [
349 ffs(b'1 command-name eos command'),
352 ffs(b'1 command-name eos command'),
350 ffs(b'1 command-name eos command'),
353 ffs(b'1 command-name eos command'),
351 ffs(b'1 command-name eos command'),
354 ffs(b'1 command-name eos command'),
352 ]))
355 ]))
353 for i in range(3):
356 for i in range(3):
354 self.assertaction(results[i], 'runcommand')
357 self.assertaction(results[i], 'runcommand')
355 self.assertEqual(results[i][1], {
358 self.assertEqual(results[i][1], {
356 'requestid': 1,
359 'requestid': 1,
357 'command': b'command',
360 'command': b'command',
358 'args': {},
361 'args': {},
359 'data': None,
362 'data': None,
360 })
363 })
361
364
362 def testconflictingrequestid(self):
365 def testconflictingrequestid(self):
363 """Request ID for new command matching in-flight command is illegal."""
366 """Request ID for new command matching in-flight command is illegal."""
364 results = list(sendframes(makereactor(), [
367 results = list(sendframes(makereactor(), [
365 ffs(b'1 command-name have-args command'),
368 ffs(b'1 command-name have-args command'),
366 ffs(b'1 command-name eos command'),
369 ffs(b'1 command-name eos command'),
367 ]))
370 ]))
368
371
369 self.assertaction(results[0], 'wantframe')
372 self.assertaction(results[0], 'wantframe')
370 self.assertaction(results[1], 'error')
373 self.assertaction(results[1], 'error')
371 self.assertEqual(results[1][1], {
374 self.assertEqual(results[1][1], {
372 'message': b'request with ID 1 already received',
375 'message': b'request with ID 1 already received',
373 })
376 })
374
377
375 def testinterleavedcommands(self):
378 def testinterleavedcommands(self):
376 results = list(sendframes(makereactor(), [
379 results = list(sendframes(makereactor(), [
377 ffs(b'1 command-name have-args command1'),
380 ffs(b'1 command-name have-args command1'),
378 ffs(b'3 command-name have-args command3'),
381 ffs(b'3 command-name have-args command3'),
379 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
382 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
380 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
383 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
381 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
384 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
382 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
385 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
383 ]))
386 ]))
384
387
385 self.assertEqual([t[0] for t in results], [
388 self.assertEqual([t[0] for t in results], [
386 'wantframe',
389 'wantframe',
387 'wantframe',
390 'wantframe',
388 'wantframe',
391 'wantframe',
389 'wantframe',
392 'wantframe',
390 'runcommand',
393 'runcommand',
391 'runcommand',
394 'runcommand',
392 ])
395 ])
393
396
394 self.assertEqual(results[4][1], {
397 self.assertEqual(results[4][1], {
395 'requestid': 3,
398 'requestid': 3,
396 'command': 'command3',
399 'command': 'command3',
397 'args': {b'biz': b'baz', b'key': b'val'},
400 'args': {b'biz': b'baz', b'key': b'val'},
398 'data': None,
401 'data': None,
399 })
402 })
400 self.assertEqual(results[5][1], {
403 self.assertEqual(results[5][1], {
401 'requestid': 1,
404 'requestid': 1,
402 'command': 'command1',
405 'command': 'command1',
403 'args': {b'foo': b'bar', b'key1': b'val'},
406 'args': {b'foo': b'bar', b'key1': b'val'},
404 'data': None,
407 'data': None,
405 })
408 })
406
409
407 def testmissingargumentframe(self):
410 def testmissingargumentframe(self):
408 # This test attempts to test behavior when reactor has an incomplete
411 # This test attempts to test behavior when reactor has an incomplete
409 # command request waiting on argument data. But it doesn't handle that
412 # command request waiting on argument data. But it doesn't handle that
410 # scenario yet. So this test does nothing of value.
413 # scenario yet. So this test does nothing of value.
411 frames = [
414 frames = [
412 ffs(b'1 command-name have-args command'),
415 ffs(b'1 command-name have-args command'),
413 ]
416 ]
414
417
415 results = list(sendframes(makereactor(), frames))
418 results = list(sendframes(makereactor(), frames))
416 self.assertaction(results[0], 'wantframe')
419 self.assertaction(results[0], 'wantframe')
417
420
418 def testincompleteargumentname(self):
421 def testincompleteargumentname(self):
419 """Argument frame with incomplete name."""
422 """Argument frame with incomplete name."""
420 frames = [
423 frames = [
421 ffs(b'1 command-name have-args command1'),
424 ffs(b'1 command-name have-args command1'),
422 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
425 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
423 ]
426 ]
424
427
425 results = list(sendframes(makereactor(), frames))
428 results = list(sendframes(makereactor(), frames))
426 self.assertEqual(len(results), 2)
429 self.assertEqual(len(results), 2)
427 self.assertaction(results[0], 'wantframe')
430 self.assertaction(results[0], 'wantframe')
428 self.assertaction(results[1], 'error')
431 self.assertaction(results[1], 'error')
429 self.assertEqual(results[1][1], {
432 self.assertEqual(results[1][1], {
430 'message': b'malformed argument frame: partial argument name',
433 'message': b'malformed argument frame: partial argument name',
431 })
434 })
432
435
433 def testincompleteargumentvalue(self):
436 def testincompleteargumentvalue(self):
434 """Argument frame with incomplete value."""
437 """Argument frame with incomplete value."""
435 frames = [
438 frames = [
436 ffs(b'1 command-name have-args command'),
439 ffs(b'1 command-name have-args command'),
437 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
440 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
438 ]
441 ]
439
442
440 results = list(sendframes(makereactor(), frames))
443 results = list(sendframes(makereactor(), frames))
441 self.assertEqual(len(results), 2)
444 self.assertEqual(len(results), 2)
442 self.assertaction(results[0], 'wantframe')
445 self.assertaction(results[0], 'wantframe')
443 self.assertaction(results[1], 'error')
446 self.assertaction(results[1], 'error')
444 self.assertEqual(results[1][1], {
447 self.assertEqual(results[1][1], {
445 'message': b'malformed argument frame: partial argument value',
448 'message': b'malformed argument frame: partial argument value',
446 })
449 })
447
450
448 def testmissingcommanddataframe(self):
451 def testmissingcommanddataframe(self):
449 # The reactor doesn't currently handle partially received commands.
452 # The reactor doesn't currently handle partially received commands.
450 # So this test is failing to do anything with request 1.
453 # So this test is failing to do anything with request 1.
451 frames = [
454 frames = [
452 ffs(b'1 command-name have-data command1'),
455 ffs(b'1 command-name have-data command1'),
453 ffs(b'3 command-name eos command2'),
456 ffs(b'3 command-name eos command2'),
454 ]
457 ]
455 results = list(sendframes(makereactor(), frames))
458 results = list(sendframes(makereactor(), frames))
456 self.assertEqual(len(results), 2)
459 self.assertEqual(len(results), 2)
457 self.assertaction(results[0], 'wantframe')
460 self.assertaction(results[0], 'wantframe')
458 self.assertaction(results[1], 'runcommand')
461 self.assertaction(results[1], 'runcommand')
459
462
460 def testmissingcommanddataframeflags(self):
463 def testmissingcommanddataframeflags(self):
461 frames = [
464 frames = [
462 ffs(b'1 command-name have-data command1'),
465 ffs(b'1 command-name have-data command1'),
463 ffs(b'1 command-data 0 data'),
466 ffs(b'1 command-data 0 data'),
464 ]
467 ]
465 results = list(sendframes(makereactor(), frames))
468 results = list(sendframes(makereactor(), frames))
466 self.assertEqual(len(results), 2)
469 self.assertEqual(len(results), 2)
467 self.assertaction(results[0], 'wantframe')
470 self.assertaction(results[0], 'wantframe')
468 self.assertaction(results[1], 'error')
471 self.assertaction(results[1], 'error')
469 self.assertEqual(results[1][1], {
472 self.assertEqual(results[1][1], {
470 'message': b'command data frame without flags',
473 'message': b'command data frame without flags',
471 })
474 })
472
475
473 def testframefornonreceivingrequest(self):
476 def testframefornonreceivingrequest(self):
474 """Receiving a frame for a command that is not receiving is illegal."""
477 """Receiving a frame for a command that is not receiving is illegal."""
475 results = list(sendframes(makereactor(), [
478 results = list(sendframes(makereactor(), [
476 ffs(b'1 command-name eos command1'),
479 ffs(b'1 command-name eos command1'),
477 ffs(b'3 command-name have-data command3'),
480 ffs(b'3 command-name have-data command3'),
478 ffs(b'1 command-argument eoa ignored'),
481 ffs(b'1 command-argument eoa ignored'),
479 ]))
482 ]))
480 self.assertaction(results[2], 'error')
483 self.assertaction(results[2], 'error')
481 self.assertEqual(results[2][1], {
484 self.assertEqual(results[2][1], {
482 'message': b'received frame for request that is not receiving: 1',
485 'message': b'received frame for request that is not receiving: 1',
483 })
486 })
484
487
485 def testsimpleresponse(self):
488 def testsimpleresponse(self):
486 """Bytes response to command sends result frames."""
489 """Bytes response to command sends result frames."""
487 reactor = makereactor()
490 reactor = makereactor()
488 list(sendcommandframes(reactor, 1, b'mycommand', {}))
491 list(sendcommandframes(reactor, 1, b'mycommand', {}))
489
492
490 result = reactor.onbytesresponseready(1, b'response')
493 result = reactor.onbytesresponseready(1, b'response')
491 self.assertaction(result, 'sendframes')
494 self.assertaction(result, 'sendframes')
492 self.assertframesequal(result[1]['framegen'], [
495 self.assertframesequal(result[1]['framegen'], [
493 b'1 bytes-response eos response',
496 b'1 bytes-response eos response',
494 ])
497 ])
495
498
496 def testmultiframeresponse(self):
499 def testmultiframeresponse(self):
497 """Bytes response spanning multiple frames is handled."""
500 """Bytes response spanning multiple frames is handled."""
498 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
501 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
499 second = b'y' * 100
502 second = b'y' * 100
500
503
501 reactor = makereactor()
504 reactor = makereactor()
502 list(sendcommandframes(reactor, 1, b'mycommand', {}))
505 list(sendcommandframes(reactor, 1, b'mycommand', {}))
503
506
504 result = reactor.onbytesresponseready(1, first + second)
507 result = reactor.onbytesresponseready(1, first + second)
505 self.assertaction(result, 'sendframes')
508 self.assertaction(result, 'sendframes')
506 self.assertframesequal(result[1]['framegen'], [
509 self.assertframesequal(result[1]['framegen'], [
507 b'1 bytes-response continuation %s' % first,
510 b'1 bytes-response continuation %s' % first,
508 b'1 bytes-response eos %s' % second,
511 b'1 bytes-response eos %s' % second,
509 ])
512 ])
510
513
511 def testapplicationerror(self):
514 def testapplicationerror(self):
512 reactor = makereactor()
515 reactor = makereactor()
513 list(sendcommandframes(reactor, 1, b'mycommand', {}))
516 list(sendcommandframes(reactor, 1, b'mycommand', {}))
514
517
515 result = reactor.onapplicationerror(1, b'some message')
518 result = reactor.onapplicationerror(1, b'some message')
516 self.assertaction(result, 'sendframes')
519 self.assertaction(result, 'sendframes')
517 self.assertframesequal(result[1]['framegen'], [
520 self.assertframesequal(result[1]['framegen'], [
518 b'1 error-response application some message',
521 b'1 error-response application some message',
519 ])
522 ])
520
523
521 def test1commanddeferresponse(self):
524 def test1commanddeferresponse(self):
522 """Responses when in deferred output mode are delayed until EOF."""
525 """Responses when in deferred output mode are delayed until EOF."""
523 reactor = makereactor(deferoutput=True)
526 reactor = makereactor(deferoutput=True)
524 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
527 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
525 self.assertEqual(len(results), 1)
528 self.assertEqual(len(results), 1)
526 self.assertaction(results[0], 'runcommand')
529 self.assertaction(results[0], 'runcommand')
527
530
528 result = reactor.onbytesresponseready(1, b'response')
531 result = reactor.onbytesresponseready(1, b'response')
529 self.assertaction(result, 'noop')
532 self.assertaction(result, 'noop')
530 result = reactor.oninputeof()
533 result = reactor.oninputeof()
531 self.assertaction(result, 'sendframes')
534 self.assertaction(result, 'sendframes')
532 self.assertframesequal(result[1]['framegen'], [
535 self.assertframesequal(result[1]['framegen'], [
533 b'1 bytes-response eos response',
536 b'1 bytes-response eos response',
534 ])
537 ])
535
538
536 def testmultiplecommanddeferresponse(self):
539 def testmultiplecommanddeferresponse(self):
537 reactor = makereactor(deferoutput=True)
540 reactor = makereactor(deferoutput=True)
538 list(sendcommandframes(reactor, 1, b'command1', {}))
541 list(sendcommandframes(reactor, 1, b'command1', {}))
539 list(sendcommandframes(reactor, 3, b'command2', {}))
542 list(sendcommandframes(reactor, 3, b'command2', {}))
540
543
541 result = reactor.onbytesresponseready(1, b'response1')
544 result = reactor.onbytesresponseready(1, b'response1')
542 self.assertaction(result, 'noop')
545 self.assertaction(result, 'noop')
543 result = reactor.onbytesresponseready(3, b'response2')
546 result = reactor.onbytesresponseready(3, b'response2')
544 self.assertaction(result, 'noop')
547 self.assertaction(result, 'noop')
545 result = reactor.oninputeof()
548 result = reactor.oninputeof()
546 self.assertaction(result, 'sendframes')
549 self.assertaction(result, 'sendframes')
547 self.assertframesequal(result[1]['framegen'], [
550 self.assertframesequal(result[1]['framegen'], [
548 b'1 bytes-response eos response1',
551 b'1 bytes-response eos response1',
549 b'3 bytes-response eos response2'
552 b'3 bytes-response eos response2'
550 ])
553 ])
551
554
552 def testrequestidtracking(self):
555 def testrequestidtracking(self):
553 reactor = makereactor(deferoutput=True)
556 reactor = makereactor(deferoutput=True)
554 list(sendcommandframes(reactor, 1, b'command1', {}))
557 list(sendcommandframes(reactor, 1, b'command1', {}))
555 list(sendcommandframes(reactor, 3, b'command2', {}))
558 list(sendcommandframes(reactor, 3, b'command2', {}))
556 list(sendcommandframes(reactor, 5, b'command3', {}))
559 list(sendcommandframes(reactor, 5, b'command3', {}))
557
560
558 # Register results for commands out of order.
561 # Register results for commands out of order.
559 reactor.onbytesresponseready(3, b'response3')
562 reactor.onbytesresponseready(3, b'response3')
560 reactor.onbytesresponseready(1, b'response1')
563 reactor.onbytesresponseready(1, b'response1')
561 reactor.onbytesresponseready(5, b'response5')
564 reactor.onbytesresponseready(5, b'response5')
562
565
563 result = reactor.oninputeof()
566 result = reactor.oninputeof()
564 self.assertaction(result, 'sendframes')
567 self.assertaction(result, 'sendframes')
565 self.assertframesequal(result[1]['framegen'], [
568 self.assertframesequal(result[1]['framegen'], [
566 b'3 bytes-response eos response3',
569 b'3 bytes-response eos response3',
567 b'1 bytes-response eos response1',
570 b'1 bytes-response eos response1',
568 b'5 bytes-response eos response5',
571 b'5 bytes-response eos response5',
569 ])
572 ])
570
573
571 if __name__ == '__main__':
574 if __name__ == '__main__':
572 import silenttestrunner
575 import silenttestrunner
573 silenttestrunner.main(__name__)
576 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now