##// END OF EJS Templates
wireproto: define attr-based classes for representing frames...
Gregory Szorc -
r37079:884a0c16 default
parent child Browse files
Show More
@@ -1,658 +1,674
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import struct
15 15
16 16 from .i18n import _
17 from .thirdparty import (
18 attr,
19 )
17 20 from . import (
18 21 error,
19 22 util,
20 23 )
21 24
22 25 FRAME_HEADER_SIZE = 6
23 26 DEFAULT_MAX_FRAME_SIZE = 32768
24 27
25 28 FRAME_TYPE_COMMAND_NAME = 0x01
26 29 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
27 30 FRAME_TYPE_COMMAND_DATA = 0x03
28 31 FRAME_TYPE_BYTES_RESPONSE = 0x04
29 32 FRAME_TYPE_ERROR_RESPONSE = 0x05
30 33 FRAME_TYPE_TEXT_OUTPUT = 0x06
31 34
32 35 FRAME_TYPES = {
33 36 b'command-name': FRAME_TYPE_COMMAND_NAME,
34 37 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
35 38 b'command-data': FRAME_TYPE_COMMAND_DATA,
36 39 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
37 40 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
38 41 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
39 42 }
40 43
41 44 FLAG_COMMAND_NAME_EOS = 0x01
42 45 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
43 46 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
44 47
45 48 FLAGS_COMMAND = {
46 49 b'eos': FLAG_COMMAND_NAME_EOS,
47 50 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
48 51 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
49 52 }
50 53
51 54 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
52 55 FLAG_COMMAND_ARGUMENT_EOA = 0x02
53 56
54 57 FLAGS_COMMAND_ARGUMENT = {
55 58 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
56 59 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
57 60 }
58 61
59 62 FLAG_COMMAND_DATA_CONTINUATION = 0x01
60 63 FLAG_COMMAND_DATA_EOS = 0x02
61 64
62 65 FLAGS_COMMAND_DATA = {
63 66 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
64 67 b'eos': FLAG_COMMAND_DATA_EOS,
65 68 }
66 69
67 70 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
68 71 FLAG_BYTES_RESPONSE_EOS = 0x02
69 72
70 73 FLAGS_BYTES_RESPONSE = {
71 74 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
72 75 b'eos': FLAG_BYTES_RESPONSE_EOS,
73 76 }
74 77
75 78 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
76 79 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
77 80
78 81 FLAGS_ERROR_RESPONSE = {
79 82 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
80 83 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
81 84 }
82 85
83 86 # Maps frame types to their available flags.
84 87 FRAME_TYPE_FLAGS = {
85 88 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
86 89 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
87 90 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
88 91 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
89 92 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
90 93 FRAME_TYPE_TEXT_OUTPUT: {},
91 94 }
92 95
93 96 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
94 97
98 @attr.s(slots=True)
99 class frameheader(object):
100 """Represents the data in a frame header."""
101
102 length = attr.ib()
103 requestid = attr.ib()
104 typeid = attr.ib()
105 flags = attr.ib()
106
107 @attr.s(slots=True)
108 class frame(object):
109 """Represents a parsed frame."""
110
111 requestid = attr.ib()
112 typeid = attr.ib()
113 flags = attr.ib()
114 payload = attr.ib()
115
95 116 def makeframe(requestid, frametype, frameflags, payload):
96 117 """Assemble a frame into a byte array."""
97 118 # TODO assert size of payload.
98 119 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
99 120
100 121 # 24 bits length
101 122 # 16 bits request id
102 123 # 4 bits type
103 124 # 4 bits flags
104 125
105 126 l = struct.pack(r'<I', len(payload))
106 127 frame[0:3] = l[0:3]
107 128 struct.pack_into(r'<H', frame, 3, requestid)
108 129 frame[5] = (frametype << 4) | frameflags
109 130 frame[6:] = payload
110 131
111 132 return frame
112 133
113 134 def makeframefromhumanstring(s):
114 135 """Create a frame from a human readable string
115 136
116 137 Strings have the form:
117 138
118 139 <request-id> <type> <flags> <payload>
119 140
120 141 This can be used by user-facing applications and tests for creating
121 142 frames easily without having to type out a bunch of constants.
122 143
123 144 Request ID is an integer.
124 145
125 146 Frame type and flags can be specified by integer or named constant.
126 147
127 148 Flags can be delimited by `|` to bitwise OR them together.
128 149 """
129 150 requestid, frametype, frameflags, payload = s.split(b' ', 3)
130 151
131 152 requestid = int(requestid)
132 153
133 154 if frametype in FRAME_TYPES:
134 155 frametype = FRAME_TYPES[frametype]
135 156 else:
136 157 frametype = int(frametype)
137 158
138 159 finalflags = 0
139 160 validflags = FRAME_TYPE_FLAGS[frametype]
140 161 for flag in frameflags.split(b'|'):
141 162 if flag in validflags:
142 163 finalflags |= validflags[flag]
143 164 else:
144 165 finalflags |= int(flag)
145 166
146 167 payload = util.unescapestr(payload)
147 168
148 169 return makeframe(requestid, frametype, finalflags, payload)
149 170
150 171 def parseheader(data):
151 172 """Parse a unified framing protocol frame header from a buffer.
152 173
153 174 The header is expected to be in the buffer at offset 0 and the
154 175 buffer is expected to be large enough to hold a full header.
155 176 """
156 177 # 24 bits payload length (little endian)
157 178 # 4 bits frame type
158 179 # 4 bits frame flags
159 180 # ... payload
160 181 framelength = data[0] + 256 * data[1] + 16384 * data[2]
161 182 requestid = struct.unpack_from(r'<H', data, 3)[0]
162 183 typeflags = data[5]
163 184
164 185 frametype = (typeflags & 0xf0) >> 4
165 186 frameflags = typeflags & 0x0f
166 187
167 return requestid, frametype, frameflags, framelength
188 return frameheader(framelength, requestid, frametype, frameflags)
168 189
169 190 def readframe(fh):
170 191 """Read a unified framing protocol frame from a file object.
171 192
172 193 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
173 194 None if no frame is available. May raise if a malformed frame is
174 195 seen.
175 196 """
176 197 header = bytearray(FRAME_HEADER_SIZE)
177 198
178 199 readcount = fh.readinto(header)
179 200
180 201 if readcount == 0:
181 202 return None
182 203
183 204 if readcount != FRAME_HEADER_SIZE:
184 205 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
185 206 (readcount, header))
186 207
187 requestid, frametype, frameflags, framelength = parseheader(header)
208 h = parseheader(header)
188 209
189 payload = fh.read(framelength)
190 if len(payload) != framelength:
210 payload = fh.read(h.length)
211 if len(payload) != h.length:
191 212 raise error.Abort(_('frame length error: expected %d; got %d') %
192 (framelength, len(payload)))
213 (h.length, len(payload)))
193 214
194 return requestid, frametype, frameflags, payload
215 return frame(h.requestid, h.typeid, h.flags, payload)
195 216
196 217 def createcommandframes(requestid, cmd, args, datafh=None):
197 218 """Create frames necessary to transmit a request to run a command.
198 219
199 220 This is a generator of bytearrays. Each item represents a frame
200 221 ready to be sent over the wire to a peer.
201 222 """
202 223 flags = 0
203 224 if args:
204 225 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
205 226 if datafh:
206 227 flags |= FLAG_COMMAND_NAME_HAVE_DATA
207 228
208 229 if not flags:
209 230 flags |= FLAG_COMMAND_NAME_EOS
210 231
211 232 yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
212 233
213 234 for i, k in enumerate(sorted(args)):
214 235 v = args[k]
215 236 last = i == len(args) - 1
216 237
217 238 # TODO handle splitting of argument values across frames.
218 239 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
219 240 offset = 0
220 241 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
221 242 offset += ARGUMENT_FRAME_HEADER.size
222 243 payload[offset:offset + len(k)] = k
223 244 offset += len(k)
224 245 payload[offset:offset + len(v)] = v
225 246
226 247 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
227 248 yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
228 249
229 250 if datafh:
230 251 while True:
231 252 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
232 253
233 254 done = False
234 255 if len(data) == DEFAULT_MAX_FRAME_SIZE:
235 256 flags = FLAG_COMMAND_DATA_CONTINUATION
236 257 else:
237 258 flags = FLAG_COMMAND_DATA_EOS
238 259 assert datafh.read(1) == b''
239 260 done = True
240 261
241 262 yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
242 263
243 264 if done:
244 265 break
245 266
246 267 def createbytesresponseframesfrombytes(requestid, data,
247 268 maxframesize=DEFAULT_MAX_FRAME_SIZE):
248 269 """Create a raw frame to send a bytes response from static bytes input.
249 270
250 271 Returns a generator of bytearrays.
251 272 """
252 273
253 274 # Simple case of a single frame.
254 275 if len(data) <= maxframesize:
255 276 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
256 277 FLAG_BYTES_RESPONSE_EOS, data)
257 278 return
258 279
259 280 offset = 0
260 281 while True:
261 282 chunk = data[offset:offset + maxframesize]
262 283 offset += len(chunk)
263 284 done = offset == len(data)
264 285
265 286 if done:
266 287 flags = FLAG_BYTES_RESPONSE_EOS
267 288 else:
268 289 flags = FLAG_BYTES_RESPONSE_CONTINUATION
269 290
270 291 yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
271 292
272 293 if done:
273 294 break
274 295
275 296 def createerrorframe(requestid, msg, protocol=False, application=False):
276 297 # TODO properly handle frame size limits.
277 298 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
278 299
279 300 flags = 0
280 301 if protocol:
281 302 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
282 303 if application:
283 304 flags |= FLAG_ERROR_RESPONSE_APPLICATION
284 305
285 306 yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
286 307
287 308 def createtextoutputframe(requestid, atoms):
288 309 """Create a text output frame to render text to people.
289 310
290 311 ``atoms`` is a 3-tuple of (formatting string, args, labels).
291 312
292 313 The formatting string contains ``%s`` tokens to be replaced by the
293 314 corresponding indexed entry in ``args``. ``labels`` is an iterable of
294 315 formatters to be applied at rendering time. In terms of the ``ui``
295 316 class, each atom corresponds to a ``ui.write()``.
296 317 """
297 318 bytesleft = DEFAULT_MAX_FRAME_SIZE
298 319 atomchunks = []
299 320
300 321 for (formatting, args, labels) in atoms:
301 322 if len(args) > 255:
302 323 raise ValueError('cannot use more than 255 formatting arguments')
303 324 if len(labels) > 255:
304 325 raise ValueError('cannot use more than 255 labels')
305 326
306 327 # TODO look for localstr, other types here?
307 328
308 329 if not isinstance(formatting, bytes):
309 330 raise ValueError('must use bytes formatting strings')
310 331 for arg in args:
311 332 if not isinstance(arg, bytes):
312 333 raise ValueError('must use bytes for arguments')
313 334 for label in labels:
314 335 if not isinstance(label, bytes):
315 336 raise ValueError('must use bytes for labels')
316 337
317 338 # Formatting string must be UTF-8.
318 339 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
319 340
320 341 # Arguments must be UTF-8.
321 342 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
322 343
323 344 # Labels must be ASCII.
324 345 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
325 346 for l in labels]
326 347
327 348 if len(formatting) > 65535:
328 349 raise ValueError('formatting string cannot be longer than 64k')
329 350
330 351 if any(len(a) > 65535 for a in args):
331 352 raise ValueError('argument string cannot be longer than 64k')
332 353
333 354 if any(len(l) > 255 for l in labels):
334 355 raise ValueError('label string cannot be longer than 255 bytes')
335 356
336 357 chunks = [
337 358 struct.pack(r'<H', len(formatting)),
338 359 struct.pack(r'<BB', len(labels), len(args)),
339 360 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
340 361 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
341 362 ]
342 363 chunks.append(formatting)
343 364 chunks.extend(labels)
344 365 chunks.extend(args)
345 366
346 367 atom = b''.join(chunks)
347 368 atomchunks.append(atom)
348 369 bytesleft -= len(atom)
349 370
350 371 if bytesleft < 0:
351 372 raise ValueError('cannot encode data in a single frame')
352 373
353 374 yield makeframe(requestid, FRAME_TYPE_TEXT_OUTPUT, 0, b''.join(atomchunks))
354 375
355 376 class serverreactor(object):
356 377 """Holds state of a server handling frame-based protocol requests.
357 378
358 379 This class is the "brain" of the unified frame-based protocol server
359 380 component. While the protocol is stateless from the perspective of
360 381 requests/commands, something needs to track which frames have been
361 382 received, what frames to expect, etc. This class is that thing.
362 383
363 384 Instances are modeled as a state machine of sorts. Instances are also
364 385 reactionary to external events. The point of this class is to encapsulate
365 386 the state of the connection and the exchange of frames, not to perform
366 387 work. Instead, callers tell this class when something occurs, like a
367 388 frame arriving. If that activity is worthy of a follow-up action (say
368 389 *run a command*), the return value of that handler will say so.
369 390
370 391 I/O and CPU intensive operations are purposefully delegated outside of
371 392 this class.
372 393
373 394 Consumers are expected to tell instances when events occur. They do so by
374 395 calling the various ``on*`` methods. These methods return a 2-tuple
375 396 describing any follow-up action(s) to take. The first element is the
376 397 name of an action to perform. The second is a data structure (usually
377 398 a dict) specific to that action that contains more information. e.g.
378 399 if the server wants to send frames back to the client, the data structure
379 400 will contain a reference to those frames.
380 401
381 402 Valid actions that consumers can be instructed to take are:
382 403
383 404 sendframes
384 405 Indicates that frames should be sent to the client. The ``framegen``
385 406 key contains a generator of frames that should be sent. The server
386 407 assumes that all frames are sent to the client.
387 408
388 409 error
389 410 Indicates that an error occurred. Consumer should probably abort.
390 411
391 412 runcommand
392 413 Indicates that the consumer should run a wire protocol command. Details
393 414 of the command to run are given in the data structure.
394 415
395 416 wantframe
396 417 Indicates that nothing of interest happened and the server is waiting on
397 418 more frames from the client before anything interesting can be done.
398 419
399 420 noop
400 421 Indicates no additional action is required.
401 422
402 423 Known Issues
403 424 ------------
404 425
405 426 There are no limits to the number of partially received commands or their
406 427 size. A malicious client could stream command request data and exhaust the
407 428 server's memory.
408 429
409 430 Partially received commands are not acted upon when end of input is
410 431 reached. Should the server error if it receives a partial request?
411 432 Should the client send a message to abort a partially transmitted request
412 433 to facilitate graceful shutdown?
413 434
414 435 Active requests that haven't been responded to aren't tracked. This means
415 436 that if we receive a command and instruct its dispatch, another command
416 437 with its request ID can come in over the wire and there will be a race
417 438 between who responds to what.
418 439 """
419 440
420 441 def __init__(self, deferoutput=False):
421 442 """Construct a new server reactor.
422 443
423 444 ``deferoutput`` can be used to indicate that no output frames should be
424 445 instructed to be sent until input has been exhausted. In this mode,
425 446 events that would normally generate output frames (such as a command
426 447 response being ready) will instead defer instructing the consumer to
427 448 send those frames. This is useful for half-duplex transports where the
428 449 sender cannot receive until all data has been transmitted.
429 450 """
430 451 self._deferoutput = deferoutput
431 452 self._state = 'idle'
432 453 self._bufferedframegens = []
433 454 # request id -> dict of commands that are actively being received.
434 455 self._receivingcommands = {}
435 456
436 def onframerecv(self, requestid, frametype, frameflags, payload):
457 def onframerecv(self, frame):
437 458 """Process a frame that has been received off the wire.
438 459
439 460 Returns a dict with an ``action`` key that details what action,
440 461 if any, the consumer should take next.
441 462 """
442 463 handlers = {
443 464 'idle': self._onframeidle,
444 465 'command-receiving': self._onframecommandreceiving,
445 466 'errored': self._onframeerrored,
446 467 }
447 468
448 469 meth = handlers.get(self._state)
449 470 if not meth:
450 471 raise error.ProgrammingError('unhandled state: %s' % self._state)
451 472
452 return meth(requestid, frametype, frameflags, payload)
473 return meth(frame)
453 474
454 475 def onbytesresponseready(self, requestid, data):
455 476 """Signal that a bytes response is ready to be sent to the client.
456 477
457 478 The raw bytes response is passed as an argument.
458 479 """
459 480 framegen = createbytesresponseframesfrombytes(requestid, data)
460 481
461 482 if self._deferoutput:
462 483 self._bufferedframegens.append(framegen)
463 484 return 'noop', {}
464 485 else:
465 486 return 'sendframes', {
466 487 'framegen': framegen,
467 488 }
468 489
469 490 def oninputeof(self):
470 491 """Signals that end of input has been received.
471 492
472 493 No more frames will be received. All pending activity should be
473 494 completed.
474 495 """
475 496 # TODO should we do anything about in-flight commands?
476 497
477 498 if not self._deferoutput or not self._bufferedframegens:
478 499 return 'noop', {}
479 500
480 501 # If we buffered all our responses, emit those.
481 502 def makegen():
482 503 for gen in self._bufferedframegens:
483 504 for frame in gen:
484 505 yield frame
485 506
486 507 return 'sendframes', {
487 508 'framegen': makegen(),
488 509 }
489 510
490 511 def onapplicationerror(self, requestid, msg):
491 512 return 'sendframes', {
492 513 'framegen': createerrorframe(requestid, msg, application=True),
493 514 }
494 515
495 516 def _makeerrorresult(self, msg):
496 517 return 'error', {
497 518 'message': msg,
498 519 }
499 520
500 521 def _makeruncommandresult(self, requestid):
501 522 entry = self._receivingcommands[requestid]
502 523 del self._receivingcommands[requestid]
503 524
504 525 if self._receivingcommands:
505 526 self._state = 'command-receiving'
506 527 else:
507 528 self._state = 'idle'
508 529
509 530 return 'runcommand', {
510 531 'requestid': requestid,
511 532 'command': entry['command'],
512 533 'args': entry['args'],
513 534 'data': entry['data'].getvalue() if entry['data'] else None,
514 535 }
515 536
516 537 def _makewantframeresult(self):
517 538 return 'wantframe', {
518 539 'state': self._state,
519 540 }
520 541
521 def _onframeidle(self, requestid, frametype, frameflags, payload):
542 def _onframeidle(self, frame):
522 543 # The only frame type that should be received in this state is a
523 544 # command request.
524 if frametype != FRAME_TYPE_COMMAND_NAME:
545 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
525 546 self._state = 'errored'
526 547 return self._makeerrorresult(
527 _('expected command frame; got %d') % frametype)
548 _('expected command frame; got %d') % frame.typeid)
528 549
529 if requestid in self._receivingcommands:
550 if frame.requestid in self._receivingcommands:
530 551 self._state = 'errored'
531 552 return self._makeerrorresult(
532 _('request with ID %d already received') % requestid)
553 _('request with ID %d already received') % frame.requestid)
533 554
534 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
535 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
555 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
556 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
536 557
537 self._receivingcommands[requestid] = {
538 'command': payload,
558 self._receivingcommands[frame.requestid] = {
559 'command': frame.payload,
539 560 'args': {},
540 561 'data': None,
541 562 'expectingargs': expectingargs,
542 563 'expectingdata': expectingdata,
543 564 }
544 565
545 if frameflags & FLAG_COMMAND_NAME_EOS:
546 return self._makeruncommandresult(requestid)
566 if frame.flags & FLAG_COMMAND_NAME_EOS:
567 return self._makeruncommandresult(frame.requestid)
547 568
548 569 if expectingargs or expectingdata:
549 570 self._state = 'command-receiving'
550 571 return self._makewantframeresult()
551 572 else:
552 573 self._state = 'errored'
553 574 return self._makeerrorresult(_('missing frame flags on '
554 575 'command frame'))
555 576
556 def _onframecommandreceiving(self, requestid, frametype, frameflags,
557 payload):
577 def _onframecommandreceiving(self, frame):
558 578 # It could be a new command request. Process it as such.
559 if frametype == FRAME_TYPE_COMMAND_NAME:
560 return self._onframeidle(requestid, frametype, frameflags, payload)
579 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
580 return self._onframeidle(frame)
561 581
562 582 # All other frames should be related to a command that is currently
563 583 # receiving.
564 if requestid not in self._receivingcommands:
584 if frame.requestid not in self._receivingcommands:
565 585 self._state = 'errored'
566 586 return self._makeerrorresult(
567 587 _('received frame for request that is not receiving: %d') %
568 requestid)
588 frame.requestid)
569 589
570 entry = self._receivingcommands[requestid]
590 entry = self._receivingcommands[frame.requestid]
571 591
572 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
592 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
573 593 if not entry['expectingargs']:
574 594 self._state = 'errored'
575 595 return self._makeerrorresult(_(
576 596 'received command argument frame for request that is not '
577 'expecting arguments: %d') % requestid)
597 'expecting arguments: %d') % frame.requestid)
578 598
579 return self._handlecommandargsframe(requestid, entry, frametype,
580 frameflags, payload)
599 return self._handlecommandargsframe(frame, entry)
581 600
582 elif frametype == FRAME_TYPE_COMMAND_DATA:
601 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
583 602 if not entry['expectingdata']:
584 603 self._state = 'errored'
585 604 return self._makeerrorresult(_(
586 605 'received command data frame for request that is not '
587 'expecting data: %d') % requestid)
606 'expecting data: %d') % frame.requestid)
588 607
589 608 if entry['data'] is None:
590 609 entry['data'] = util.bytesio()
591 610
592 return self._handlecommanddataframe(requestid, entry, frametype,
593 frameflags, payload)
611 return self._handlecommanddataframe(frame, entry)
594 612
595 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
596 payload):
613 def _handlecommandargsframe(self, frame, entry):
597 614 # The frame and state of command should have already been validated.
598 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
615 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
599 616
600 617 offset = 0
601 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
618 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
602 619 offset += ARGUMENT_FRAME_HEADER.size
603 620
604 621 # The argument name MUST fit inside the frame.
605 argname = bytes(payload[offset:offset + namesize])
622 argname = bytes(frame.payload[offset:offset + namesize])
606 623 offset += namesize
607 624
608 625 if len(argname) != namesize:
609 626 self._state = 'errored'
610 627 return self._makeerrorresult(_('malformed argument frame: '
611 628 'partial argument name'))
612 629
613 argvalue = bytes(payload[offset:])
630 argvalue = bytes(frame.payload[offset:])
614 631
615 632 # Argument value spans multiple frames. Record our active state
616 633 # and wait for the next frame.
617 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
634 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
618 635 raise error.ProgrammingError('not yet implemented')
619 636
620 637 # Common case: the argument value is completely contained in this
621 638 # frame.
622 639
623 640 if len(argvalue) != valuesize:
624 641 self._state = 'errored'
625 642 return self._makeerrorresult(_('malformed argument frame: '
626 643 'partial argument value'))
627 644
628 645 entry['args'][argname] = argvalue
629 646
630 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
647 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
631 648 if entry['expectingdata']:
632 649 # TODO signal request to run a command once we don't
633 650 # buffer data frames.
634 651 return self._makewantframeresult()
635 652 else:
636 return self._makeruncommandresult(requestid)
653 return self._makeruncommandresult(frame.requestid)
637 654 else:
638 655 return self._makewantframeresult()
639 656
640 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
641 payload):
642 assert frametype == FRAME_TYPE_COMMAND_DATA
657 def _handlecommanddataframe(self, frame, entry):
658 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
643 659
644 660 # TODO support streaming data instead of buffering it.
645 entry['data'].write(payload)
661 entry['data'].write(frame.payload)
646 662
647 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
663 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
648 664 return self._makewantframeresult()
649 elif frameflags & FLAG_COMMAND_DATA_EOS:
665 elif frame.flags & FLAG_COMMAND_DATA_EOS:
650 666 entry['data'].seek(0)
651 return self._makeruncommandresult(requestid)
667 return self._makeruncommandresult(frame.requestid)
652 668 else:
653 669 self._state = 'errored'
654 670 return self._makeerrorresult(_('command data frame without '
655 671 'flags'))
656 672
657 def _onframeerrored(self, requestid, frametype, frameflags, payload):
673 def _onframeerrored(self, frame):
658 674 return self._makeerrorresult(_('server already errored'))
@@ -1,1043 +1,1042
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 wireprotoframing,
23 23 wireprototypes,
24 24 )
25 25
26 26 stringio = util.stringio
27 27
28 28 urlerr = util.urlerr
29 29 urlreq = util.urlreq
30 30
31 31 HTTP_OK = 200
32 32
33 33 HGTYPE = 'application/mercurial-0.1'
34 34 HGTYPE2 = 'application/mercurial-0.2'
35 35 HGERRTYPE = 'application/hg-error'
36 36 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
37 37
38 38 HTTPV2 = wireprototypes.HTTPV2
39 39 SSHV1 = wireprototypes.SSHV1
40 40 SSHV2 = wireprototypes.SSHV2
41 41
42 42 def decodevaluefromheaders(req, headerprefix):
43 43 """Decode a long value from multiple HTTP request headers.
44 44
45 45 Returns the value as a bytes, not a str.
46 46 """
47 47 chunks = []
48 48 i = 1
49 49 while True:
50 50 v = req.headers.get(b'%s-%d' % (headerprefix, i))
51 51 if v is None:
52 52 break
53 53 chunks.append(pycompat.bytesurl(v))
54 54 i += 1
55 55
56 56 return ''.join(chunks)
57 57
58 58 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
59 59 def __init__(self, req, ui, checkperm):
60 60 self._req = req
61 61 self._ui = ui
62 62 self._checkperm = checkperm
63 63
64 64 @property
65 65 def name(self):
66 66 return 'http-v1'
67 67
68 68 def getargs(self, args):
69 69 knownargs = self._args()
70 70 data = {}
71 71 keys = args.split()
72 72 for k in keys:
73 73 if k == '*':
74 74 star = {}
75 75 for key in knownargs.keys():
76 76 if key != 'cmd' and key not in keys:
77 77 star[key] = knownargs[key][0]
78 78 data['*'] = star
79 79 else:
80 80 data[k] = knownargs[k][0]
81 81 return [data[k] for k in keys]
82 82
83 83 def _args(self):
84 84 args = self._req.qsparams.asdictoflists()
85 85 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
86 86 if postlen:
87 87 args.update(urlreq.parseqs(
88 88 self._req.bodyfh.read(postlen), keep_blank_values=True))
89 89 return args
90 90
91 91 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
92 92 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
93 93 return args
94 94
95 95 def forwardpayload(self, fp):
96 96 # Existing clients *always* send Content-Length.
97 97 length = int(self._req.headers[b'Content-Length'])
98 98
99 99 # If httppostargs is used, we need to read Content-Length
100 100 # minus the amount that was consumed by args.
101 101 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
102 102 for s in util.filechunkiter(self._req.bodyfh, limit=length):
103 103 fp.write(s)
104 104
105 105 @contextlib.contextmanager
106 106 def mayberedirectstdio(self):
107 107 oldout = self._ui.fout
108 108 olderr = self._ui.ferr
109 109
110 110 out = util.stringio()
111 111
112 112 try:
113 113 self._ui.fout = out
114 114 self._ui.ferr = out
115 115 yield out
116 116 finally:
117 117 self._ui.fout = oldout
118 118 self._ui.ferr = olderr
119 119
120 120 def client(self):
121 121 return 'remote:%s:%s:%s' % (
122 122 self._req.urlscheme,
123 123 urlreq.quote(self._req.remotehost or ''),
124 124 urlreq.quote(self._req.remoteuser or ''))
125 125
126 126 def addcapabilities(self, repo, caps):
127 127 caps.append(b'batch')
128 128
129 129 caps.append('httpheader=%d' %
130 130 repo.ui.configint('server', 'maxhttpheaderlen'))
131 131 if repo.ui.configbool('experimental', 'httppostargs'):
132 132 caps.append('httppostargs')
133 133
134 134 # FUTURE advertise 0.2rx once support is implemented
135 135 # FUTURE advertise minrx and mintx after consulting config option
136 136 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
137 137
138 138 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
139 139 if compengines:
140 140 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
141 141 for e in compengines)
142 142 caps.append('compression=%s' % comptypes)
143 143
144 144 return caps
145 145
146 146 def checkperm(self, perm):
147 147 return self._checkperm(perm)
148 148
149 149 # This method exists mostly so that extensions like remotefilelog can
150 150 # disable a kludgey legacy method only over http. As of early 2018,
151 151 # there are no other known users, so with any luck we can discard this
152 152 # hook if remotefilelog becomes a first-party extension.
153 153 def iscmd(cmd):
154 154 return cmd in wireproto.commands
155 155
156 156 def handlewsgirequest(rctx, req, res, checkperm):
157 157 """Possibly process a wire protocol request.
158 158
159 159 If the current request is a wire protocol request, the request is
160 160 processed by this function.
161 161
162 162 ``req`` is a ``parsedrequest`` instance.
163 163 ``res`` is a ``wsgiresponse`` instance.
164 164
165 165 Returns a bool indicating if the request was serviced. If set, the caller
166 166 should stop processing the request, as a response has already been issued.
167 167 """
168 168 # Avoid cycle involving hg module.
169 169 from .hgweb import common as hgwebcommon
170 170
171 171 repo = rctx.repo
172 172
173 173 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
174 174 # string parameter. If it isn't present, this isn't a wire protocol
175 175 # request.
176 176 if 'cmd' not in req.qsparams:
177 177 return False
178 178
179 179 cmd = req.qsparams['cmd']
180 180
181 181 # The "cmd" request parameter is used by both the wire protocol and hgweb.
182 182 # While not all wire protocol commands are available for all transports,
183 183 # if we see a "cmd" value that resembles a known wire protocol command, we
184 184 # route it to a protocol handler. This is better than routing possible
185 185 # wire protocol requests to hgweb because it prevents hgweb from using
186 186 # known wire protocol commands and it is less confusing for machine
187 187 # clients.
188 188 if not iscmd(cmd):
189 189 return False
190 190
191 191 # The "cmd" query string argument is only valid on the root path of the
192 192 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
193 193 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
194 194 # in this case. We send an HTTP 404 for backwards compatibility reasons.
195 195 if req.dispatchpath:
196 196 res.status = hgwebcommon.statusmessage(404)
197 197 res.headers['Content-Type'] = HGTYPE
198 198 # TODO This is not a good response to issue for this request. This
199 199 # is mostly for BC for now.
200 200 res.setbodybytes('0\n%s\n' % b'Not Found')
201 201 return True
202 202
203 203 proto = httpv1protocolhandler(req, repo.ui,
204 204 lambda perm: checkperm(rctx, req, perm))
205 205
206 206 # The permissions checker should be the only thing that can raise an
207 207 # ErrorResponse. It is kind of a layer violation to catch an hgweb
208 208 # exception here. So consider refactoring into a exception type that
209 209 # is associated with the wire protocol.
210 210 try:
211 211 _callhttp(repo, req, res, proto, cmd)
212 212 except hgwebcommon.ErrorResponse as e:
213 213 for k, v in e.headers:
214 214 res.headers[k] = v
215 215 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
216 216 # TODO This response body assumes the failed command was
217 217 # "unbundle." That assumption is not always valid.
218 218 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
219 219
220 220 return True
221 221
222 222 def handlewsgiapirequest(rctx, req, res, checkperm):
223 223 """Handle requests to /api/*."""
224 224 assert req.dispatchparts[0] == b'api'
225 225
226 226 repo = rctx.repo
227 227
228 228 # This whole URL space is experimental for now. But we want to
229 229 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
230 230 if not repo.ui.configbool('experimental', 'web.apiserver'):
231 231 res.status = b'404 Not Found'
232 232 res.headers[b'Content-Type'] = b'text/plain'
233 233 res.setbodybytes(_('Experimental API server endpoint not enabled'))
234 234 return
235 235
236 236 # The URL space is /api/<protocol>/*. The structure of URLs under varies
237 237 # by <protocol>.
238 238
239 239 # Registered APIs are made available via config options of the name of
240 240 # the protocol.
241 241 availableapis = set()
242 242 for k, v in API_HANDLERS.items():
243 243 section, option = v['config']
244 244 if repo.ui.configbool(section, option):
245 245 availableapis.add(k)
246 246
247 247 # Requests to /api/ list available APIs.
248 248 if req.dispatchparts == [b'api']:
249 249 res.status = b'200 OK'
250 250 res.headers[b'Content-Type'] = b'text/plain'
251 251 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
252 252 'one of the following:\n')]
253 253 if availableapis:
254 254 lines.extend(sorted(availableapis))
255 255 else:
256 256 lines.append(_('(no available APIs)\n'))
257 257 res.setbodybytes(b'\n'.join(lines))
258 258 return
259 259
260 260 proto = req.dispatchparts[1]
261 261
262 262 if proto not in API_HANDLERS:
263 263 res.status = b'404 Not Found'
264 264 res.headers[b'Content-Type'] = b'text/plain'
265 265 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
266 266 proto, b', '.join(sorted(availableapis))))
267 267 return
268 268
269 269 if proto not in availableapis:
270 270 res.status = b'404 Not Found'
271 271 res.headers[b'Content-Type'] = b'text/plain'
272 272 res.setbodybytes(_('API %s not enabled\n') % proto)
273 273 return
274 274
275 275 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
276 276 req.dispatchparts[2:])
277 277
278 278 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
279 279 from .hgweb import common as hgwebcommon
280 280
281 281 # URL space looks like: <permissions>/<command>, where <permission> can
282 282 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
283 283
284 284 # Root URL does nothing meaningful... yet.
285 285 if not urlparts:
286 286 res.status = b'200 OK'
287 287 res.headers[b'Content-Type'] = b'text/plain'
288 288 res.setbodybytes(_('HTTP version 2 API handler'))
289 289 return
290 290
291 291 if len(urlparts) == 1:
292 292 res.status = b'404 Not Found'
293 293 res.headers[b'Content-Type'] = b'text/plain'
294 294 res.setbodybytes(_('do not know how to process %s\n') %
295 295 req.dispatchpath)
296 296 return
297 297
298 298 permission, command = urlparts[0:2]
299 299
300 300 if permission not in (b'ro', b'rw'):
301 301 res.status = b'404 Not Found'
302 302 res.headers[b'Content-Type'] = b'text/plain'
303 303 res.setbodybytes(_('unknown permission: %s') % permission)
304 304 return
305 305
306 306 if req.method != 'POST':
307 307 res.status = b'405 Method Not Allowed'
308 308 res.headers[b'Allow'] = b'POST'
309 309 res.setbodybytes(_('commands require POST requests'))
310 310 return
311 311
312 312 # At some point we'll want to use our own API instead of recycling the
313 313 # behavior of version 1 of the wire protocol...
314 314 # TODO return reasonable responses - not responses that overload the
315 315 # HTTP status line message for error reporting.
316 316 try:
317 317 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
318 318 except hgwebcommon.ErrorResponse as e:
319 319 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
320 320 for k, v in e.headers:
321 321 res.headers[k] = v
322 322 res.setbodybytes('permission denied')
323 323 return
324 324
325 325 # We have a special endpoint to reflect the request back at the client.
326 326 if command == b'debugreflect':
327 327 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
328 328 return
329 329
330 330 # Extra commands that we handle that aren't really wire protocol
331 331 # commands. Think extra hard before making this hackery available to
332 332 # extension.
333 333 extracommands = {'multirequest'}
334 334
335 335 if command not in wireproto.commands and command not in extracommands:
336 336 res.status = b'404 Not Found'
337 337 res.headers[b'Content-Type'] = b'text/plain'
338 338 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
339 339 return
340 340
341 341 repo = rctx.repo
342 342 ui = repo.ui
343 343
344 344 proto = httpv2protocolhandler(req, ui)
345 345
346 346 if (not wireproto.commands.commandavailable(command, proto)
347 347 and command not in extracommands):
348 348 res.status = b'404 Not Found'
349 349 res.headers[b'Content-Type'] = b'text/plain'
350 350 res.setbodybytes(_('invalid wire protocol command: %s') % command)
351 351 return
352 352
353 353 if req.headers.get(b'Accept') != FRAMINGTYPE:
354 354 res.status = b'406 Not Acceptable'
355 355 res.headers[b'Content-Type'] = b'text/plain'
356 356 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
357 357 % FRAMINGTYPE)
358 358 return
359 359
360 360 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
361 361 res.status = b'415 Unsupported Media Type'
362 362 # TODO we should send a response with appropriate media type,
363 363 # since client does Accept it.
364 364 res.headers[b'Content-Type'] = b'text/plain'
365 365 res.setbodybytes(_('client MUST send Content-Type header with '
366 366 'value: %s\n') % FRAMINGTYPE)
367 367 return
368 368
369 369 _processhttpv2request(ui, repo, req, res, permission, command, proto)
370 370
371 371 def _processhttpv2reflectrequest(ui, repo, req, res):
372 372 """Reads unified frame protocol request and dumps out state to client.
373 373
374 374 This special endpoint can be used to help debug the wire protocol.
375 375
376 376 Instead of routing the request through the normal dispatch mechanism,
377 377 we instead read all frames, decode them, and feed them into our state
378 378 tracker. We then dump the log of all that activity back out to the
379 379 client.
380 380 """
381 381 import json
382 382
383 383 # Reflection APIs have a history of being abused, accidentally disclosing
384 384 # sensitive data, etc. So we have a config knob.
385 385 if not ui.configbool('experimental', 'web.api.debugreflect'):
386 386 res.status = b'404 Not Found'
387 387 res.headers[b'Content-Type'] = b'text/plain'
388 388 res.setbodybytes(_('debugreflect service not available'))
389 389 return
390 390
391 391 # We assume we have a unified framing protocol request body.
392 392
393 393 reactor = wireprotoframing.serverreactor()
394 394 states = []
395 395
396 396 while True:
397 397 frame = wireprotoframing.readframe(req.bodyfh)
398 398
399 399 if not frame:
400 400 states.append(b'received: <no frame>')
401 401 break
402 402
403 requestid, frametype, frameflags, payload = frame
404 states.append(b'received: %d %d %d %s' % (frametype, frameflags,
405 requestid, payload))
403 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
404 frame.requestid,
405 frame.payload))
406 406
407 action, meta = reactor.onframerecv(requestid, frametype, frameflags,
408 payload)
407 action, meta = reactor.onframerecv(frame)
409 408 states.append(json.dumps((action, meta), sort_keys=True,
410 409 separators=(', ', ': ')))
411 410
412 411 action, meta = reactor.oninputeof()
413 412 meta['action'] = action
414 413 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
415 414
416 415 res.status = b'200 OK'
417 416 res.headers[b'Content-Type'] = b'text/plain'
418 417 res.setbodybytes(b'\n'.join(states))
419 418
420 419 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
421 420 """Post-validation handler for HTTPv2 requests.
422 421
423 422 Called when the HTTP request contains unified frame-based protocol
424 423 frames for evaluation.
425 424 """
426 425 # TODO Some HTTP clients are full duplex and can receive data before
427 426 # the entire request is transmitted. Figure out a way to indicate support
428 427 # for that so we can opt into full duplex mode.
429 428 reactor = wireprotoframing.serverreactor(deferoutput=True)
430 429 seencommand = False
431 430
432 431 while True:
433 432 frame = wireprotoframing.readframe(req.bodyfh)
434 433 if not frame:
435 434 break
436 435
437 action, meta = reactor.onframerecv(*frame)
436 action, meta = reactor.onframerecv(frame)
438 437
439 438 if action == 'wantframe':
440 439 # Need more data before we can do anything.
441 440 continue
442 441 elif action == 'runcommand':
443 442 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
444 443 reqcommand, reactor, meta,
445 444 issubsequent=seencommand)
446 445
447 446 if sentoutput:
448 447 return
449 448
450 449 seencommand = True
451 450
452 451 elif action == 'error':
453 452 # TODO define proper error mechanism.
454 453 res.status = b'200 OK'
455 454 res.headers[b'Content-Type'] = b'text/plain'
456 455 res.setbodybytes(meta['message'] + b'\n')
457 456 return
458 457 else:
459 458 raise error.ProgrammingError(
460 459 'unhandled action from frame processor: %s' % action)
461 460
462 461 action, meta = reactor.oninputeof()
463 462 if action == 'sendframes':
464 463 # We assume we haven't started sending the response yet. If we're
465 464 # wrong, the response type will raise an exception.
466 465 res.status = b'200 OK'
467 466 res.headers[b'Content-Type'] = FRAMINGTYPE
468 467 res.setbodygen(meta['framegen'])
469 468 elif action == 'noop':
470 469 pass
471 470 else:
472 471 raise error.ProgrammingError('unhandled action from frame processor: %s'
473 472 % action)
474 473
475 474 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
476 475 command, issubsequent):
477 476 """Dispatch a wire protocol command made from HTTPv2 requests.
478 477
479 478 The authenticated permission (``authedperm``) along with the original
480 479 command from the URL (``reqcommand``) are passed in.
481 480 """
482 481 # We already validated that the session has permissions to perform the
483 482 # actions in ``authedperm``. In the unified frame protocol, the canonical
484 483 # command to run is expressed in a frame. However, the URL also requested
485 484 # to run a specific command. We need to be careful that the command we
486 485 # run doesn't have permissions requirements greater than what was granted
487 486 # by ``authedperm``.
488 487 #
489 488 # Our rule for this is we only allow one command per HTTP request and
490 489 # that command must match the command in the URL. However, we make
491 490 # an exception for the ``multirequest`` URL. This URL is allowed to
492 491 # execute multiple commands. We double check permissions of each command
493 492 # as it is invoked to ensure there is no privilege escalation.
494 493 # TODO consider allowing multiple commands to regular command URLs
495 494 # iff each command is the same.
496 495
497 496 proto = httpv2protocolhandler(req, ui, args=command['args'])
498 497
499 498 if reqcommand == b'multirequest':
500 499 if not wireproto.commands.commandavailable(command['command'], proto):
501 500 # TODO proper error mechanism
502 501 res.status = b'200 OK'
503 502 res.headers[b'Content-Type'] = b'text/plain'
504 503 res.setbodybytes(_('wire protocol command not available: %s') %
505 504 command['command'])
506 505 return True
507 506
508 507 assert authedperm in (b'ro', b'rw')
509 508 wirecommand = wireproto.commands[command['command']]
510 509 assert wirecommand.permission in ('push', 'pull')
511 510
512 511 if authedperm == b'ro' and wirecommand.permission != 'pull':
513 512 # TODO proper error mechanism
514 513 res.status = b'403 Forbidden'
515 514 res.headers[b'Content-Type'] = b'text/plain'
516 515 res.setbodybytes(_('insufficient permissions to execute '
517 516 'command: %s') % command['command'])
518 517 return True
519 518
520 519 # TODO should we also call checkperm() here? Maybe not if we're going
521 520 # to overhaul that API. The granted scope from the URL check should
522 521 # be good enough.
523 522
524 523 else:
525 524 # Don't allow multiple commands outside of ``multirequest`` URL.
526 525 if issubsequent:
527 526 # TODO proper error mechanism
528 527 res.status = b'200 OK'
529 528 res.headers[b'Content-Type'] = b'text/plain'
530 529 res.setbodybytes(_('multiple commands cannot be issued to this '
531 530 'URL'))
532 531 return True
533 532
534 533 if reqcommand != command['command']:
535 534 # TODO define proper error mechanism
536 535 res.status = b'200 OK'
537 536 res.headers[b'Content-Type'] = b'text/plain'
538 537 res.setbodybytes(_('command in frame must match command in URL'))
539 538 return True
540 539
541 540 rsp = wireproto.dispatch(repo, proto, command['command'])
542 541
543 542 res.status = b'200 OK'
544 543 res.headers[b'Content-Type'] = FRAMINGTYPE
545 544
546 545 if isinstance(rsp, wireprototypes.bytesresponse):
547 546 action, meta = reactor.onbytesresponseready(command['requestid'],
548 547 rsp.data)
549 548 else:
550 549 action, meta = reactor.onapplicationerror(
551 550 _('unhandled response type from wire proto command'))
552 551
553 552 if action == 'sendframes':
554 553 res.setbodygen(meta['framegen'])
555 554 return True
556 555 elif action == 'noop':
557 556 pass
558 557 else:
559 558 raise error.ProgrammingError('unhandled event from reactor: %s' %
560 559 action)
561 560
562 561 # Maps API name to metadata so custom API can be registered.
563 562 API_HANDLERS = {
564 563 HTTPV2: {
565 564 'config': ('experimental', 'web.api.http-v2'),
566 565 'handler': _handlehttpv2request,
567 566 },
568 567 }
569 568
570 569 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
571 570 def __init__(self, req, ui, args=None):
572 571 self._req = req
573 572 self._ui = ui
574 573 self._args = args
575 574
576 575 @property
577 576 def name(self):
578 577 return HTTPV2
579 578
580 579 def getargs(self, args):
581 580 data = {}
582 581 for k in args.split():
583 582 if k == '*':
584 583 raise NotImplementedError('do not support * args')
585 584 else:
586 585 data[k] = self._args[k]
587 586
588 587 return [data[k] for k in args.split()]
589 588
590 589 def forwardpayload(self, fp):
591 590 raise NotImplementedError
592 591
593 592 @contextlib.contextmanager
594 593 def mayberedirectstdio(self):
595 594 raise NotImplementedError
596 595
597 596 def client(self):
598 597 raise NotImplementedError
599 598
600 599 def addcapabilities(self, repo, caps):
601 600 return caps
602 601
603 602 def checkperm(self, perm):
604 603 raise NotImplementedError
605 604
606 605 def _httpresponsetype(ui, req, prefer_uncompressed):
607 606 """Determine the appropriate response type and compression settings.
608 607
609 608 Returns a tuple of (mediatype, compengine, engineopts).
610 609 """
611 610 # Determine the response media type and compression engine based
612 611 # on the request parameters.
613 612 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
614 613
615 614 if '0.2' in protocaps:
616 615 # All clients are expected to support uncompressed data.
617 616 if prefer_uncompressed:
618 617 return HGTYPE2, util._noopengine(), {}
619 618
620 619 # Default as defined by wire protocol spec.
621 620 compformats = ['zlib', 'none']
622 621 for cap in protocaps:
623 622 if cap.startswith('comp='):
624 623 compformats = cap[5:].split(',')
625 624 break
626 625
627 626 # Now find an agreed upon compression format.
628 627 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
629 628 if engine.wireprotosupport().name in compformats:
630 629 opts = {}
631 630 level = ui.configint('server', '%slevel' % engine.name())
632 631 if level is not None:
633 632 opts['level'] = level
634 633
635 634 return HGTYPE2, engine, opts
636 635
637 636 # No mutually supported compression format. Fall back to the
638 637 # legacy protocol.
639 638
640 639 # Don't allow untrusted settings because disabling compression or
641 640 # setting a very high compression level could lead to flooding
642 641 # the server's network or CPU.
643 642 opts = {'level': ui.configint('server', 'zliblevel')}
644 643 return HGTYPE, util.compengines['zlib'], opts
645 644
646 645 def _callhttp(repo, req, res, proto, cmd):
647 646 # Avoid cycle involving hg module.
648 647 from .hgweb import common as hgwebcommon
649 648
650 649 def genversion2(gen, engine, engineopts):
651 650 # application/mercurial-0.2 always sends a payload header
652 651 # identifying the compression engine.
653 652 name = engine.wireprotosupport().name
654 653 assert 0 < len(name) < 256
655 654 yield struct.pack('B', len(name))
656 655 yield name
657 656
658 657 for chunk in gen:
659 658 yield chunk
660 659
661 660 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
662 661 if code == HTTP_OK:
663 662 res.status = '200 Script output follows'
664 663 else:
665 664 res.status = hgwebcommon.statusmessage(code)
666 665
667 666 res.headers['Content-Type'] = contenttype
668 667
669 668 if bodybytes is not None:
670 669 res.setbodybytes(bodybytes)
671 670 if bodygen is not None:
672 671 res.setbodygen(bodygen)
673 672
674 673 if not wireproto.commands.commandavailable(cmd, proto):
675 674 setresponse(HTTP_OK, HGERRTYPE,
676 675 _('requested wire protocol command is not available over '
677 676 'HTTP'))
678 677 return
679 678
680 679 proto.checkperm(wireproto.commands[cmd].permission)
681 680
682 681 rsp = wireproto.dispatch(repo, proto, cmd)
683 682
684 683 if isinstance(rsp, bytes):
685 684 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
686 685 elif isinstance(rsp, wireprototypes.bytesresponse):
687 686 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
688 687 elif isinstance(rsp, wireprototypes.streamreslegacy):
689 688 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
690 689 elif isinstance(rsp, wireprototypes.streamres):
691 690 gen = rsp.gen
692 691
693 692 # This code for compression should not be streamres specific. It
694 693 # is here because we only compress streamres at the moment.
695 694 mediatype, engine, engineopts = _httpresponsetype(
696 695 repo.ui, req, rsp.prefer_uncompressed)
697 696 gen = engine.compressstream(gen, engineopts)
698 697
699 698 if mediatype == HGTYPE2:
700 699 gen = genversion2(gen, engine, engineopts)
701 700
702 701 setresponse(HTTP_OK, mediatype, bodygen=gen)
703 702 elif isinstance(rsp, wireprototypes.pushres):
704 703 rsp = '%d\n%s' % (rsp.res, rsp.output)
705 704 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
706 705 elif isinstance(rsp, wireprototypes.pusherr):
707 706 rsp = '0\n%s\n' % rsp.res
708 707 res.drain = True
709 708 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
710 709 elif isinstance(rsp, wireprototypes.ooberror):
711 710 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
712 711 else:
713 712 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
714 713
715 714 def _sshv1respondbytes(fout, value):
716 715 """Send a bytes response for protocol version 1."""
717 716 fout.write('%d\n' % len(value))
718 717 fout.write(value)
719 718 fout.flush()
720 719
721 720 def _sshv1respondstream(fout, source):
722 721 write = fout.write
723 722 for chunk in source.gen:
724 723 write(chunk)
725 724 fout.flush()
726 725
727 726 def _sshv1respondooberror(fout, ferr, rsp):
728 727 ferr.write(b'%s\n-\n' % rsp)
729 728 ferr.flush()
730 729 fout.write(b'\n')
731 730 fout.flush()
732 731
733 732 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
734 733 """Handler for requests services via version 1 of SSH protocol."""
735 734 def __init__(self, ui, fin, fout):
736 735 self._ui = ui
737 736 self._fin = fin
738 737 self._fout = fout
739 738
740 739 @property
741 740 def name(self):
742 741 return wireprototypes.SSHV1
743 742
744 743 def getargs(self, args):
745 744 data = {}
746 745 keys = args.split()
747 746 for n in xrange(len(keys)):
748 747 argline = self._fin.readline()[:-1]
749 748 arg, l = argline.split()
750 749 if arg not in keys:
751 750 raise error.Abort(_("unexpected parameter %r") % arg)
752 751 if arg == '*':
753 752 star = {}
754 753 for k in xrange(int(l)):
755 754 argline = self._fin.readline()[:-1]
756 755 arg, l = argline.split()
757 756 val = self._fin.read(int(l))
758 757 star[arg] = val
759 758 data['*'] = star
760 759 else:
761 760 val = self._fin.read(int(l))
762 761 data[arg] = val
763 762 return [data[k] for k in keys]
764 763
765 764 def forwardpayload(self, fpout):
766 765 # We initially send an empty response. This tells the client it is
767 766 # OK to start sending data. If a client sees any other response, it
768 767 # interprets it as an error.
769 768 _sshv1respondbytes(self._fout, b'')
770 769
771 770 # The file is in the form:
772 771 #
773 772 # <chunk size>\n<chunk>
774 773 # ...
775 774 # 0\n
776 775 count = int(self._fin.readline())
777 776 while count:
778 777 fpout.write(self._fin.read(count))
779 778 count = int(self._fin.readline())
780 779
781 780 @contextlib.contextmanager
782 781 def mayberedirectstdio(self):
783 782 yield None
784 783
785 784 def client(self):
786 785 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
787 786 return 'remote:ssh:' + client
788 787
789 788 def addcapabilities(self, repo, caps):
790 789 caps.append(b'batch')
791 790 return caps
792 791
793 792 def checkperm(self, perm):
794 793 pass
795 794
796 795 class sshv2protocolhandler(sshv1protocolhandler):
797 796 """Protocol handler for version 2 of the SSH protocol."""
798 797
799 798 @property
800 799 def name(self):
801 800 return wireprototypes.SSHV2
802 801
803 802 def _runsshserver(ui, repo, fin, fout, ev):
804 803 # This function operates like a state machine of sorts. The following
805 804 # states are defined:
806 805 #
807 806 # protov1-serving
808 807 # Server is in protocol version 1 serving mode. Commands arrive on
809 808 # new lines. These commands are processed in this state, one command
810 809 # after the other.
811 810 #
812 811 # protov2-serving
813 812 # Server is in protocol version 2 serving mode.
814 813 #
815 814 # upgrade-initial
816 815 # The server is going to process an upgrade request.
817 816 #
818 817 # upgrade-v2-filter-legacy-handshake
819 818 # The protocol is being upgraded to version 2. The server is expecting
820 819 # the legacy handshake from version 1.
821 820 #
822 821 # upgrade-v2-finish
823 822 # The upgrade to version 2 of the protocol is imminent.
824 823 #
825 824 # shutdown
826 825 # The server is shutting down, possibly in reaction to a client event.
827 826 #
828 827 # And here are their transitions:
829 828 #
830 829 # protov1-serving -> shutdown
831 830 # When server receives an empty request or encounters another
832 831 # error.
833 832 #
834 833 # protov1-serving -> upgrade-initial
835 834 # An upgrade request line was seen.
836 835 #
837 836 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
838 837 # Upgrade to version 2 in progress. Server is expecting to
839 838 # process a legacy handshake.
840 839 #
841 840 # upgrade-v2-filter-legacy-handshake -> shutdown
842 841 # Client did not fulfill upgrade handshake requirements.
843 842 #
844 843 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
845 844 # Client fulfilled version 2 upgrade requirements. Finishing that
846 845 # upgrade.
847 846 #
848 847 # upgrade-v2-finish -> protov2-serving
849 848 # Protocol upgrade to version 2 complete. Server can now speak protocol
850 849 # version 2.
851 850 #
852 851 # protov2-serving -> protov1-serving
853 852 # Ths happens by default since protocol version 2 is the same as
854 853 # version 1 except for the handshake.
855 854
856 855 state = 'protov1-serving'
857 856 proto = sshv1protocolhandler(ui, fin, fout)
858 857 protoswitched = False
859 858
860 859 while not ev.is_set():
861 860 if state == 'protov1-serving':
862 861 # Commands are issued on new lines.
863 862 request = fin.readline()[:-1]
864 863
865 864 # Empty lines signal to terminate the connection.
866 865 if not request:
867 866 state = 'shutdown'
868 867 continue
869 868
870 869 # It looks like a protocol upgrade request. Transition state to
871 870 # handle it.
872 871 if request.startswith(b'upgrade '):
873 872 if protoswitched:
874 873 _sshv1respondooberror(fout, ui.ferr,
875 874 b'cannot upgrade protocols multiple '
876 875 b'times')
877 876 state = 'shutdown'
878 877 continue
879 878
880 879 state = 'upgrade-initial'
881 880 continue
882 881
883 882 available = wireproto.commands.commandavailable(request, proto)
884 883
885 884 # This command isn't available. Send an empty response and go
886 885 # back to waiting for a new command.
887 886 if not available:
888 887 _sshv1respondbytes(fout, b'')
889 888 continue
890 889
891 890 rsp = wireproto.dispatch(repo, proto, request)
892 891
893 892 if isinstance(rsp, bytes):
894 893 _sshv1respondbytes(fout, rsp)
895 894 elif isinstance(rsp, wireprototypes.bytesresponse):
896 895 _sshv1respondbytes(fout, rsp.data)
897 896 elif isinstance(rsp, wireprototypes.streamres):
898 897 _sshv1respondstream(fout, rsp)
899 898 elif isinstance(rsp, wireprototypes.streamreslegacy):
900 899 _sshv1respondstream(fout, rsp)
901 900 elif isinstance(rsp, wireprototypes.pushres):
902 901 _sshv1respondbytes(fout, b'')
903 902 _sshv1respondbytes(fout, b'%d' % rsp.res)
904 903 elif isinstance(rsp, wireprototypes.pusherr):
905 904 _sshv1respondbytes(fout, rsp.res)
906 905 elif isinstance(rsp, wireprototypes.ooberror):
907 906 _sshv1respondooberror(fout, ui.ferr, rsp.message)
908 907 else:
909 908 raise error.ProgrammingError('unhandled response type from '
910 909 'wire protocol command: %s' % rsp)
911 910
912 911 # For now, protocol version 2 serving just goes back to version 1.
913 912 elif state == 'protov2-serving':
914 913 state = 'protov1-serving'
915 914 continue
916 915
917 916 elif state == 'upgrade-initial':
918 917 # We should never transition into this state if we've switched
919 918 # protocols.
920 919 assert not protoswitched
921 920 assert proto.name == wireprototypes.SSHV1
922 921
923 922 # Expected: upgrade <token> <capabilities>
924 923 # If we get something else, the request is malformed. It could be
925 924 # from a future client that has altered the upgrade line content.
926 925 # We treat this as an unknown command.
927 926 try:
928 927 token, caps = request.split(b' ')[1:]
929 928 except ValueError:
930 929 _sshv1respondbytes(fout, b'')
931 930 state = 'protov1-serving'
932 931 continue
933 932
934 933 # Send empty response if we don't support upgrading protocols.
935 934 if not ui.configbool('experimental', 'sshserver.support-v2'):
936 935 _sshv1respondbytes(fout, b'')
937 936 state = 'protov1-serving'
938 937 continue
939 938
940 939 try:
941 940 caps = urlreq.parseqs(caps)
942 941 except ValueError:
943 942 _sshv1respondbytes(fout, b'')
944 943 state = 'protov1-serving'
945 944 continue
946 945
947 946 # We don't see an upgrade request to protocol version 2. Ignore
948 947 # the upgrade request.
949 948 wantedprotos = caps.get(b'proto', [b''])[0]
950 949 if SSHV2 not in wantedprotos:
951 950 _sshv1respondbytes(fout, b'')
952 951 state = 'protov1-serving'
953 952 continue
954 953
955 954 # It looks like we can honor this upgrade request to protocol 2.
956 955 # Filter the rest of the handshake protocol request lines.
957 956 state = 'upgrade-v2-filter-legacy-handshake'
958 957 continue
959 958
960 959 elif state == 'upgrade-v2-filter-legacy-handshake':
961 960 # Client should have sent legacy handshake after an ``upgrade``
962 961 # request. Expected lines:
963 962 #
964 963 # hello
965 964 # between
966 965 # pairs 81
967 966 # 0000...-0000...
968 967
969 968 ok = True
970 969 for line in (b'hello', b'between', b'pairs 81'):
971 970 request = fin.readline()[:-1]
972 971
973 972 if request != line:
974 973 _sshv1respondooberror(fout, ui.ferr,
975 974 b'malformed handshake protocol: '
976 975 b'missing %s' % line)
977 976 ok = False
978 977 state = 'shutdown'
979 978 break
980 979
981 980 if not ok:
982 981 continue
983 982
984 983 request = fin.read(81)
985 984 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
986 985 _sshv1respondooberror(fout, ui.ferr,
987 986 b'malformed handshake protocol: '
988 987 b'missing between argument value')
989 988 state = 'shutdown'
990 989 continue
991 990
992 991 state = 'upgrade-v2-finish'
993 992 continue
994 993
995 994 elif state == 'upgrade-v2-finish':
996 995 # Send the upgrade response.
997 996 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
998 997 servercaps = wireproto.capabilities(repo, proto)
999 998 rsp = b'capabilities: %s' % servercaps.data
1000 999 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1001 1000 fout.flush()
1002 1001
1003 1002 proto = sshv2protocolhandler(ui, fin, fout)
1004 1003 protoswitched = True
1005 1004
1006 1005 state = 'protov2-serving'
1007 1006 continue
1008 1007
1009 1008 elif state == 'shutdown':
1010 1009 break
1011 1010
1012 1011 else:
1013 1012 raise error.ProgrammingError('unhandled ssh server state: %s' %
1014 1013 state)
1015 1014
1016 1015 class sshserver(object):
1017 1016 def __init__(self, ui, repo, logfh=None):
1018 1017 self._ui = ui
1019 1018 self._repo = repo
1020 1019 self._fin = ui.fin
1021 1020 self._fout = ui.fout
1022 1021
1023 1022 # Log write I/O to stdout and stderr if configured.
1024 1023 if logfh:
1025 1024 self._fout = util.makeloggingfileobject(
1026 1025 logfh, self._fout, 'o', logdata=True)
1027 1026 ui.ferr = util.makeloggingfileobject(
1028 1027 logfh, ui.ferr, 'e', logdata=True)
1029 1028
1030 1029 hook.redirect(True)
1031 1030 ui.fout = repo.ui.fout = ui.ferr
1032 1031
1033 1032 # Prevent insertion/deletion of CRs
1034 1033 util.setbinary(self._fin)
1035 1034 util.setbinary(self._fout)
1036 1035
1037 1036 def serve_forever(self):
1038 1037 self.serveuntil(threading.Event())
1039 1038 sys.exit(0)
1040 1039
1041 1040 def serveuntil(self, ev):
1042 1041 """Serve until a threading.Event is set."""
1043 1042 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,573 +1,576
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial import (
6 6 util,
7 7 wireprotoframing as framing,
8 8 )
9 9
10 10 ffs = framing.makeframefromhumanstring
11 11
12 12 def makereactor(deferoutput=False):
13 13 return framing.serverreactor(deferoutput=deferoutput)
14 14
15 15 def sendframes(reactor, gen):
16 16 """Send a generator of frame bytearray to a reactor.
17 17
18 18 Emits a generator of results from ``onframerecv()`` calls.
19 19 """
20 20 for frame in gen:
21 rid, frametype, frameflags, framelength = framing.parseheader(frame)
21 header = framing.parseheader(frame)
22 22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 assert len(payload) == framelength
23 assert len(payload) == header.length
24 24
25 yield reactor.onframerecv(rid, frametype, frameflags, payload)
25 yield reactor.onframerecv(framing.frame(header.requestid,
26 header.typeid,
27 header.flags,
28 payload))
26 29
27 30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
28 31 """Generate frames to run a command and send them to a reactor."""
29 32 return sendframes(reactor,
30 33 framing.createcommandframes(rid, cmd, args, datafh))
31 34
32 35 class FrameTests(unittest.TestCase):
33 36 def testdataexactframesize(self):
34 37 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
35 38
36 39 frames = list(framing.createcommandframes(1, b'command', {}, data))
37 40 self.assertEqual(frames, [
38 41 ffs(b'1 command-name have-data command'),
39 42 ffs(b'1 command-data continuation %s' % data.getvalue()),
40 43 ffs(b'1 command-data eos ')
41 44 ])
42 45
43 46 def testdatamultipleframes(self):
44 47 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
45 48 frames = list(framing.createcommandframes(1, b'command', {}, data))
46 49 self.assertEqual(frames, [
47 50 ffs(b'1 command-name have-data command'),
48 51 ffs(b'1 command-data continuation %s' % (
49 52 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
50 53 ffs(b'1 command-data eos x'),
51 54 ])
52 55
53 56 def testargsanddata(self):
54 57 data = util.bytesio(b'x' * 100)
55 58
56 59 frames = list(framing.createcommandframes(1, b'command', {
57 60 b'key1': b'key1value',
58 61 b'key2': b'key2value',
59 62 b'key3': b'key3value',
60 63 }, data))
61 64
62 65 self.assertEqual(frames, [
63 66 ffs(b'1 command-name have-args|have-data command'),
64 67 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
65 68 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
66 69 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
67 70 ffs(b'1 command-data eos %s' % data.getvalue()),
68 71 ])
69 72
70 73 def testtextoutputexcessiveargs(self):
71 74 """At most 255 formatting arguments are allowed."""
72 75 with self.assertRaisesRegexp(ValueError,
73 76 'cannot use more than 255 formatting'):
74 77 args = [b'x' for i in range(256)]
75 78 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
76 79
77 80 def testtextoutputexcessivelabels(self):
78 81 """At most 255 labels are allowed."""
79 82 with self.assertRaisesRegexp(ValueError,
80 83 'cannot use more than 255 labels'):
81 84 labels = [b'l' for i in range(256)]
82 85 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
83 86
84 87 def testtextoutputformattingstringtype(self):
85 88 """Formatting string must be bytes."""
86 89 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
87 90 list(framing.createtextoutputframe(1, [
88 91 (b'foo'.decode('ascii'), [], [])]))
89 92
90 93 def testtextoutputargumentbytes(self):
91 94 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
92 95 list(framing.createtextoutputframe(1, [
93 96 (b'foo', [b'foo'.decode('ascii')], [])]))
94 97
95 98 def testtextoutputlabelbytes(self):
96 99 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
97 100 list(framing.createtextoutputframe(1, [
98 101 (b'foo', [], [b'foo'.decode('ascii')])]))
99 102
100 103 def testtextoutputtoolongformatstring(self):
101 104 with self.assertRaisesRegexp(ValueError,
102 105 'formatting string cannot be longer than'):
103 106 list(framing.createtextoutputframe(1, [
104 107 (b'x' * 65536, [], [])]))
105 108
106 109 def testtextoutputtoolongargumentstring(self):
107 110 with self.assertRaisesRegexp(ValueError,
108 111 'argument string cannot be longer than'):
109 112 list(framing.createtextoutputframe(1, [
110 113 (b'bleh', [b'x' * 65536], [])]))
111 114
112 115 def testtextoutputtoolonglabelstring(self):
113 116 with self.assertRaisesRegexp(ValueError,
114 117 'label string cannot be longer than'):
115 118 list(framing.createtextoutputframe(1, [
116 119 (b'bleh', [], [b'x' * 65536])]))
117 120
118 121 def testtextoutput1simpleatom(self):
119 122 val = list(framing.createtextoutputframe(1, [
120 123 (b'foo', [], [])]))
121 124
122 125 self.assertEqual(val, [
123 126 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
124 127 ])
125 128
126 129 def testtextoutput2simpleatoms(self):
127 130 val = list(framing.createtextoutputframe(1, [
128 131 (b'foo', [], []),
129 132 (b'bar', [], []),
130 133 ]))
131 134
132 135 self.assertEqual(val, [
133 136 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
134 137 ])
135 138
136 139 def testtextoutput1arg(self):
137 140 val = list(framing.createtextoutputframe(1, [
138 141 (b'foo %s', [b'val1'], []),
139 142 ]))
140 143
141 144 self.assertEqual(val, [
142 145 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
143 146 ])
144 147
145 148 def testtextoutput2arg(self):
146 149 val = list(framing.createtextoutputframe(1, [
147 150 (b'foo %s %s', [b'val', b'value'], []),
148 151 ]))
149 152
150 153 self.assertEqual(val, [
151 154 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
152 155 br'foo %s %svalvalue'),
153 156 ])
154 157
155 158 def testtextoutput1label(self):
156 159 val = list(framing.createtextoutputframe(1, [
157 160 (b'foo', [], [b'label']),
158 161 ]))
159 162
160 163 self.assertEqual(val, [
161 164 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
162 165 ])
163 166
164 167 def testargandlabel(self):
165 168 val = list(framing.createtextoutputframe(1, [
166 169 (b'foo %s', [b'arg'], [b'label']),
167 170 ]))
168 171
169 172 self.assertEqual(val, [
170 173 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
171 174 ])
172 175
173 176 class ServerReactorTests(unittest.TestCase):
174 177 def _sendsingleframe(self, reactor, s):
175 178 results = list(sendframes(reactor, [ffs(s)]))
176 179 self.assertEqual(len(results), 1)
177 180
178 181 return results[0]
179 182
180 183 def assertaction(self, res, expected):
181 184 self.assertIsInstance(res, tuple)
182 185 self.assertEqual(len(res), 2)
183 186 self.assertIsInstance(res[1], dict)
184 187 self.assertEqual(res[0], expected)
185 188
186 189 def assertframesequal(self, frames, framestrings):
187 190 expected = [ffs(s) for s in framestrings]
188 191 self.assertEqual(list(frames), expected)
189 192
190 193 def test1framecommand(self):
191 194 """Receiving a command in a single frame yields request to run it."""
192 195 reactor = makereactor()
193 196 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
194 197 self.assertEqual(len(results), 1)
195 198 self.assertaction(results[0], 'runcommand')
196 199 self.assertEqual(results[0][1], {
197 200 'requestid': 1,
198 201 'command': b'mycommand',
199 202 'args': {},
200 203 'data': None,
201 204 })
202 205
203 206 result = reactor.oninputeof()
204 207 self.assertaction(result, 'noop')
205 208
206 209 def test1argument(self):
207 210 reactor = makereactor()
208 211 results = list(sendcommandframes(reactor, 41, b'mycommand',
209 212 {b'foo': b'bar'}))
210 213 self.assertEqual(len(results), 2)
211 214 self.assertaction(results[0], 'wantframe')
212 215 self.assertaction(results[1], 'runcommand')
213 216 self.assertEqual(results[1][1], {
214 217 'requestid': 41,
215 218 'command': b'mycommand',
216 219 'args': {b'foo': b'bar'},
217 220 'data': None,
218 221 })
219 222
220 223 def testmultiarguments(self):
221 224 reactor = makereactor()
222 225 results = list(sendcommandframes(reactor, 1, b'mycommand',
223 226 {b'foo': b'bar', b'biz': b'baz'}))
224 227 self.assertEqual(len(results), 3)
225 228 self.assertaction(results[0], 'wantframe')
226 229 self.assertaction(results[1], 'wantframe')
227 230 self.assertaction(results[2], 'runcommand')
228 231 self.assertEqual(results[2][1], {
229 232 'requestid': 1,
230 233 'command': b'mycommand',
231 234 'args': {b'foo': b'bar', b'biz': b'baz'},
232 235 'data': None,
233 236 })
234 237
235 238 def testsimplecommanddata(self):
236 239 reactor = makereactor()
237 240 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
238 241 util.bytesio(b'data!')))
239 242 self.assertEqual(len(results), 2)
240 243 self.assertaction(results[0], 'wantframe')
241 244 self.assertaction(results[1], 'runcommand')
242 245 self.assertEqual(results[1][1], {
243 246 'requestid': 1,
244 247 'command': b'mycommand',
245 248 'args': {},
246 249 'data': b'data!',
247 250 })
248 251
249 252 def testmultipledataframes(self):
250 253 frames = [
251 254 ffs(b'1 command-name have-data mycommand'),
252 255 ffs(b'1 command-data continuation data1'),
253 256 ffs(b'1 command-data continuation data2'),
254 257 ffs(b'1 command-data eos data3'),
255 258 ]
256 259
257 260 reactor = makereactor()
258 261 results = list(sendframes(reactor, frames))
259 262 self.assertEqual(len(results), 4)
260 263 for i in range(3):
261 264 self.assertaction(results[i], 'wantframe')
262 265 self.assertaction(results[3], 'runcommand')
263 266 self.assertEqual(results[3][1], {
264 267 'requestid': 1,
265 268 'command': b'mycommand',
266 269 'args': {},
267 270 'data': b'data1data2data3',
268 271 })
269 272
270 273 def testargumentanddata(self):
271 274 frames = [
272 275 ffs(b'1 command-name have-args|have-data command'),
273 276 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
274 277 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
275 278 ffs(b'1 command-data continuation value1'),
276 279 ffs(b'1 command-data eos value2'),
277 280 ]
278 281
279 282 reactor = makereactor()
280 283 results = list(sendframes(reactor, frames))
281 284
282 285 self.assertaction(results[-1], 'runcommand')
283 286 self.assertEqual(results[-1][1], {
284 287 'requestid': 1,
285 288 'command': b'command',
286 289 'args': {
287 290 b'key': b'val',
288 291 b'foo': b'bar',
289 292 },
290 293 'data': b'value1value2',
291 294 })
292 295
293 296 def testunexpectedcommandargument(self):
294 297 """Command argument frame when not running a command is an error."""
295 298 result = self._sendsingleframe(makereactor(),
296 299 b'1 command-argument 0 ignored')
297 300 self.assertaction(result, 'error')
298 301 self.assertEqual(result[1], {
299 302 'message': b'expected command frame; got 2',
300 303 })
301 304
302 305 def testunexpectedcommandargumentreceiving(self):
303 306 """Same as above but the command is receiving."""
304 307 results = list(sendframes(makereactor(), [
305 308 ffs(b'1 command-name have-data command'),
306 309 ffs(b'1 command-argument eoa ignored'),
307 310 ]))
308 311
309 312 self.assertaction(results[1], 'error')
310 313 self.assertEqual(results[1][1], {
311 314 'message': b'received command argument frame for request that is '
312 315 b'not expecting arguments: 1',
313 316 })
314 317
315 318 def testunexpectedcommanddata(self):
316 319 """Command argument frame when not running a command is an error."""
317 320 result = self._sendsingleframe(makereactor(),
318 321 b'1 command-data 0 ignored')
319 322 self.assertaction(result, 'error')
320 323 self.assertEqual(result[1], {
321 324 'message': b'expected command frame; got 3',
322 325 })
323 326
324 327 def testunexpectedcommanddatareceiving(self):
325 328 """Same as above except the command is receiving."""
326 329 results = list(sendframes(makereactor(), [
327 330 ffs(b'1 command-name have-args command'),
328 331 ffs(b'1 command-data eos ignored'),
329 332 ]))
330 333
331 334 self.assertaction(results[1], 'error')
332 335 self.assertEqual(results[1][1], {
333 336 'message': b'received command data frame for request that is not '
334 337 b'expecting data: 1',
335 338 })
336 339
337 340 def testmissingcommandframeflags(self):
338 341 """Command name frame must have flags set."""
339 342 result = self._sendsingleframe(makereactor(),
340 343 b'1 command-name 0 command')
341 344 self.assertaction(result, 'error')
342 345 self.assertEqual(result[1], {
343 346 'message': b'missing frame flags on command frame',
344 347 })
345 348
346 349 def testconflictingrequestid(self):
347 350 """Multiple fully serviced commands with same request ID is allowed."""
348 351 results = list(sendframes(makereactor(), [
349 352 ffs(b'1 command-name eos command'),
350 353 ffs(b'1 command-name eos command'),
351 354 ffs(b'1 command-name eos command'),
352 355 ]))
353 356 for i in range(3):
354 357 self.assertaction(results[i], 'runcommand')
355 358 self.assertEqual(results[i][1], {
356 359 'requestid': 1,
357 360 'command': b'command',
358 361 'args': {},
359 362 'data': None,
360 363 })
361 364
362 365 def testconflictingrequestid(self):
363 366 """Request ID for new command matching in-flight command is illegal."""
364 367 results = list(sendframes(makereactor(), [
365 368 ffs(b'1 command-name have-args command'),
366 369 ffs(b'1 command-name eos command'),
367 370 ]))
368 371
369 372 self.assertaction(results[0], 'wantframe')
370 373 self.assertaction(results[1], 'error')
371 374 self.assertEqual(results[1][1], {
372 375 'message': b'request with ID 1 already received',
373 376 })
374 377
375 378 def testinterleavedcommands(self):
376 379 results = list(sendframes(makereactor(), [
377 380 ffs(b'1 command-name have-args command1'),
378 381 ffs(b'3 command-name have-args command3'),
379 382 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
380 383 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
381 384 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
382 385 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
383 386 ]))
384 387
385 388 self.assertEqual([t[0] for t in results], [
386 389 'wantframe',
387 390 'wantframe',
388 391 'wantframe',
389 392 'wantframe',
390 393 'runcommand',
391 394 'runcommand',
392 395 ])
393 396
394 397 self.assertEqual(results[4][1], {
395 398 'requestid': 3,
396 399 'command': 'command3',
397 400 'args': {b'biz': b'baz', b'key': b'val'},
398 401 'data': None,
399 402 })
400 403 self.assertEqual(results[5][1], {
401 404 'requestid': 1,
402 405 'command': 'command1',
403 406 'args': {b'foo': b'bar', b'key1': b'val'},
404 407 'data': None,
405 408 })
406 409
407 410 def testmissingargumentframe(self):
408 411 # This test attempts to test behavior when reactor has an incomplete
409 412 # command request waiting on argument data. But it doesn't handle that
410 413 # scenario yet. So this test does nothing of value.
411 414 frames = [
412 415 ffs(b'1 command-name have-args command'),
413 416 ]
414 417
415 418 results = list(sendframes(makereactor(), frames))
416 419 self.assertaction(results[0], 'wantframe')
417 420
418 421 def testincompleteargumentname(self):
419 422 """Argument frame with incomplete name."""
420 423 frames = [
421 424 ffs(b'1 command-name have-args command1'),
422 425 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
423 426 ]
424 427
425 428 results = list(sendframes(makereactor(), frames))
426 429 self.assertEqual(len(results), 2)
427 430 self.assertaction(results[0], 'wantframe')
428 431 self.assertaction(results[1], 'error')
429 432 self.assertEqual(results[1][1], {
430 433 'message': b'malformed argument frame: partial argument name',
431 434 })
432 435
433 436 def testincompleteargumentvalue(self):
434 437 """Argument frame with incomplete value."""
435 438 frames = [
436 439 ffs(b'1 command-name have-args command'),
437 440 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
438 441 ]
439 442
440 443 results = list(sendframes(makereactor(), frames))
441 444 self.assertEqual(len(results), 2)
442 445 self.assertaction(results[0], 'wantframe')
443 446 self.assertaction(results[1], 'error')
444 447 self.assertEqual(results[1][1], {
445 448 'message': b'malformed argument frame: partial argument value',
446 449 })
447 450
448 451 def testmissingcommanddataframe(self):
449 452 # The reactor doesn't currently handle partially received commands.
450 453 # So this test is failing to do anything with request 1.
451 454 frames = [
452 455 ffs(b'1 command-name have-data command1'),
453 456 ffs(b'3 command-name eos command2'),
454 457 ]
455 458 results = list(sendframes(makereactor(), frames))
456 459 self.assertEqual(len(results), 2)
457 460 self.assertaction(results[0], 'wantframe')
458 461 self.assertaction(results[1], 'runcommand')
459 462
460 463 def testmissingcommanddataframeflags(self):
461 464 frames = [
462 465 ffs(b'1 command-name have-data command1'),
463 466 ffs(b'1 command-data 0 data'),
464 467 ]
465 468 results = list(sendframes(makereactor(), frames))
466 469 self.assertEqual(len(results), 2)
467 470 self.assertaction(results[0], 'wantframe')
468 471 self.assertaction(results[1], 'error')
469 472 self.assertEqual(results[1][1], {
470 473 'message': b'command data frame without flags',
471 474 })
472 475
473 476 def testframefornonreceivingrequest(self):
474 477 """Receiving a frame for a command that is not receiving is illegal."""
475 478 results = list(sendframes(makereactor(), [
476 479 ffs(b'1 command-name eos command1'),
477 480 ffs(b'3 command-name have-data command3'),
478 481 ffs(b'1 command-argument eoa ignored'),
479 482 ]))
480 483 self.assertaction(results[2], 'error')
481 484 self.assertEqual(results[2][1], {
482 485 'message': b'received frame for request that is not receiving: 1',
483 486 })
484 487
485 488 def testsimpleresponse(self):
486 489 """Bytes response to command sends result frames."""
487 490 reactor = makereactor()
488 491 list(sendcommandframes(reactor, 1, b'mycommand', {}))
489 492
490 493 result = reactor.onbytesresponseready(1, b'response')
491 494 self.assertaction(result, 'sendframes')
492 495 self.assertframesequal(result[1]['framegen'], [
493 496 b'1 bytes-response eos response',
494 497 ])
495 498
496 499 def testmultiframeresponse(self):
497 500 """Bytes response spanning multiple frames is handled."""
498 501 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
499 502 second = b'y' * 100
500 503
501 504 reactor = makereactor()
502 505 list(sendcommandframes(reactor, 1, b'mycommand', {}))
503 506
504 507 result = reactor.onbytesresponseready(1, first + second)
505 508 self.assertaction(result, 'sendframes')
506 509 self.assertframesequal(result[1]['framegen'], [
507 510 b'1 bytes-response continuation %s' % first,
508 511 b'1 bytes-response eos %s' % second,
509 512 ])
510 513
511 514 def testapplicationerror(self):
512 515 reactor = makereactor()
513 516 list(sendcommandframes(reactor, 1, b'mycommand', {}))
514 517
515 518 result = reactor.onapplicationerror(1, b'some message')
516 519 self.assertaction(result, 'sendframes')
517 520 self.assertframesequal(result[1]['framegen'], [
518 521 b'1 error-response application some message',
519 522 ])
520 523
521 524 def test1commanddeferresponse(self):
522 525 """Responses when in deferred output mode are delayed until EOF."""
523 526 reactor = makereactor(deferoutput=True)
524 527 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
525 528 self.assertEqual(len(results), 1)
526 529 self.assertaction(results[0], 'runcommand')
527 530
528 531 result = reactor.onbytesresponseready(1, b'response')
529 532 self.assertaction(result, 'noop')
530 533 result = reactor.oninputeof()
531 534 self.assertaction(result, 'sendframes')
532 535 self.assertframesequal(result[1]['framegen'], [
533 536 b'1 bytes-response eos response',
534 537 ])
535 538
536 539 def testmultiplecommanddeferresponse(self):
537 540 reactor = makereactor(deferoutput=True)
538 541 list(sendcommandframes(reactor, 1, b'command1', {}))
539 542 list(sendcommandframes(reactor, 3, b'command2', {}))
540 543
541 544 result = reactor.onbytesresponseready(1, b'response1')
542 545 self.assertaction(result, 'noop')
543 546 result = reactor.onbytesresponseready(3, b'response2')
544 547 self.assertaction(result, 'noop')
545 548 result = reactor.oninputeof()
546 549 self.assertaction(result, 'sendframes')
547 550 self.assertframesequal(result[1]['framegen'], [
548 551 b'1 bytes-response eos response1',
549 552 b'3 bytes-response eos response2'
550 553 ])
551 554
552 555 def testrequestidtracking(self):
553 556 reactor = makereactor(deferoutput=True)
554 557 list(sendcommandframes(reactor, 1, b'command1', {}))
555 558 list(sendcommandframes(reactor, 3, b'command2', {}))
556 559 list(sendcommandframes(reactor, 5, b'command3', {}))
557 560
558 561 # Register results for commands out of order.
559 562 reactor.onbytesresponseready(3, b'response3')
560 563 reactor.onbytesresponseready(1, b'response1')
561 564 reactor.onbytesresponseready(5, b'response5')
562 565
563 566 result = reactor.oninputeof()
564 567 self.assertaction(result, 'sendframes')
565 568 self.assertframesequal(result[1]['framegen'], [
566 569 b'3 bytes-response eos response3',
567 570 b'1 bytes-response eos response1',
568 571 b'5 bytes-response eos response5',
569 572 ])
570 573
571 574 if __name__ == '__main__':
572 575 import silenttestrunner
573 576 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now